Source code for stelardataprofiler.tabular_timeseries.profiler

import pandas as pd
import numpy as np
import os
import geopandas as gpd
from typing import Union, Any
from collections import Counter
from datetime import datetime
from .variables import (describe_datetime, describe_boolean,
                        describe_numeric, describe_timeseries,
                        describe_categorical, describe_textual,
                        describe_geometry, read_tabular_timeseries,
                        calculate_generic_df, calculate_table_stats,
                        find_types)

from ..utils import read_config, write_to_json


[docs]def profile_tabular(input_path: Union[str, pd.DataFrame, gpd.GeoDataFrame], header: Union[str, int] = 0, sep: str = ',', light_mode: bool = False, crs: str = 'EPSG:4326', num_cat_perc_threshold: float = 0.5, max_freq_distr=10, eps_distance=1000, extra_geometry_columns: list = None, types_dict=None) -> dict: """ Profiles a tabular DataFrame (or file path) and returns a profiling report as a dictionary. :param input_path: Path to input file or a DataFrame/GeoDataFrame to profile. :type input_path: str or pandas.DataFrame or geopandas.GeoDataFrame :param header: Row to use as column names (passed to pandas.read). :type header: int or str :param sep: Separator to use for reading CSV files. :type sep: str :param light_mode: If True, skip expensive computations. :type light_mode: bool :param crs: Coordinate reference system for geometry data. :type crs: str :param num_cat_perc_threshold: Threshold for treating numeric as categorical. :type num_cat_perc_threshold: float :param max_freq_distr: Top-K most frequent values to be displayed in the frequency distribution. :type max_freq_distr: int :param eps_distance: Distance tolerance for geometry heatmap calculations. :type eps_distance: int :param extra_geometry_columns: Additional geometry columns to consider. :type extra_geometry_columns: list :param types_dict: Pre-computed types dictionary to use instead of detecting. :type types_dict: dict or None :return: Profiling report dictionary. :rtype: dict """ input_dict = { 'input_path': input_path, 'header': header, 'sep': sep, 'light_mode': light_mode, 'num_cat_perc_threshold': num_cat_perc_threshold, 'max_freq_distr': max_freq_distr, 'ts_mode': False, 'extra_geometry_columns': extra_geometry_columns, 'crs': crs, 'eps_distance': eps_distance } return __profiler_tabular_timeseries(input_dict, types_dict)
[docs]def profile_tabular_with_config(config: dict) -> None: """ This method performs profiling on tabular and/or vector data and write the resulting profile dictionary based on a configuration dictionary. :param config: a dictionary with all configuration settings. :type config: dict :return: None. :rtype: None """ input_config = config.get("input", {}) output_config = config.get("output", {}) input_file_path = input_config.get("files") if isinstance(input_file_path, list): if len(input_file_path) == 1: my_file_path = os.path.abspath(input_file_path[0]) types_dict = None elif len(input_file_path) == 2: my_file_path = os.path.abspath(input_file_path[0]) types_dict = read_config(os.path.abspath(input_file_path[1])) else: raise ValueError("Expected one or two file paths in 'files'.") elif isinstance(input_file_path, str): my_file_path = os.path.abspath(input_file_path) types_dict = None else: raise ValueError("Invalid input path format.") output_json_path = os.path.abspath(output_config.get("json")) # Extract parameters explicitly header = input_config.get("header", 0) sep = input_config.get("sep", ',') light_mode = input_config.get("light_mode", False) crs = input_config.get("crs", 'EPSG:4326') num_cat_perc_threshold = input_config.get("num_cat_perc_threshold", 0.5) max_freq_distr = input_config.get("max_freq_distr", 10) eps_distance = input_config.get("eps_distance", 1000) extra_geometry_columns = input_config.get("extra_geometry_columns", None) profile_dict = profile_tabular( input_path=my_file_path, header=header, sep=sep, light_mode=light_mode, crs=crs, num_cat_perc_threshold=num_cat_perc_threshold, max_freq_distr=max_freq_distr, eps_distance=eps_distance, extra_geometry_columns=extra_geometry_columns, types_dict=types_dict ) write_to_json(profile_dict, output_json_path)
[docs]def profile_timeseries(input_path: Union[str, pd.DataFrame], ts_mode_datetime_col: str = 'date', header: Union[str, int] = 0, sep: str = ',', light_mode: bool = False, num_cat_perc_threshold: float = 0.5, max_freq_distr=10, types_dict=None) -> dict: """ Profiles a timeseries DataFrame or file and returns a profiling report dictionary. :param input_path: Path to input file or DataFrame to profile. :type input_path: str or pandas.DataFrame :param ts_mode_datetime_col: Column name for datetime index. :type ts_mode_datetime_col: str :param header: Row to use as column names. :type header: int or str :param sep: Field separator for CSV. :type sep: str :param light_mode: If True, skip expensive computations. :type light_mode: bool :param num_cat_perc_threshold: Threshold for treating numeric as categorical. :type num_cat_perc_threshold: float :param max_freq_distr: Top-K most frequent values to be displayed in the frequency distribution. :type max_freq_distr: int :param types_dict: Pre-computed types dictionary to use instead of detecting. :type types_dict: dict or None :return: Profiling report dictionary. :rtype: dict """ input_dict = { 'input_path': input_path, 'header': header, 'sep': sep, 'light_mode': light_mode, 'num_cat_perc_threshold': num_cat_perc_threshold, 'max_freq_distr': max_freq_distr, 'ts_mode': True, 'ts_mode_datetime_col': ts_mode_datetime_col, } return __profiler_tabular_timeseries(input_dict, types_dict)
[docs]def profile_timeseries_with_config(config: dict) -> None: """ This method performs profiling on timeseries data and writes the resulting profile dictionary based on a configuration dictionary. :param config: a dictionary with all configuration settings. :type config: dict :return: None. :rtype: None """ input_config = config.get("input", {}) output_config = config.get("output", {}) input_file_path = input_config.get("files") if isinstance(input_file_path, list): if len(input_file_path) == 1: my_file_path = os.path.abspath(input_file_path[0]) types_dict = None elif len(input_file_path) == 2: my_file_path = os.path.abspath(input_file_path[0]) types_dict = read_config(os.path.abspath(input_file_path[1])) else: raise ValueError("Expected one or two file paths in 'files'.") elif isinstance(input_file_path, str): my_file_path = os.path.abspath(input_file_path) types_dict = None else: raise ValueError("Invalid input path format.") output_json_path = os.path.abspath(output_config.get("json")) # Extract parameters explicitly header = input_config.get("header", 0) sep = input_config.get("sep", ',') light_mode = input_config.get("light_mode", False) ts_mode_datetime_col = input_config.get("ts_mode_datetime_col", 'date') num_cat_perc_threshold = input_config.get("num_cat_perc_threshold", 0.5) max_freq_distr = input_config.get("max_freq_distr", 10) profile_dict = profile_timeseries( input_path=my_file_path, header=header, sep=sep, light_mode=light_mode, ts_mode_datetime_col=ts_mode_datetime_col, num_cat_perc_threshold=num_cat_perc_threshold, max_freq_distr=max_freq_distr, types_dict=types_dict ) write_to_json(profile_dict, output_json_path)
[docs]def type_detection(input_path: Union[str, pd.DataFrame, gpd.GeoDataFrame], header: Union[str, int] = 0, sep: str = ',', ts_mode: bool = False, ts_mode_datetime_col: str = None, crs: str = 'EPSG:4326', num_cat_perc_threshold: float = 0.5, max_freq_distr=10, eps_distance: int = 1000, extra_geometry_columns: list = None, **kwargs) -> dict: """ Detects data types for each column in tabular or timeseries data. :param input_path: Path or DataFrame/GeoDataFrame to inspect. :type input_path: str or pandas.DataFrame or geopandas.GeoDataFrame :param header: Header row index or name. :type header: int or str :param sep: Separator character. :type sep: str :param ts_mode: Whether to treat data as timeseries. :type ts_mode: bool :param ts_mode_datetime_col: Datetime column for timeseries. :type ts_mode_datetime_col: str :param crs: Coordinate reference system for geometry data. :type crs: str :param num_cat_perc_threshold: Threshold for treating numeric as categorical. :type num_cat_perc_threshold: float :param max_freq_distr: Top-K most frequent values to be displayed in the frequency distribution. :type max_freq_distr: int :param eps_distance: Distance tolerance for geometry heatmap calculations. :type eps_distance: int :param extra_geometry_columns: Additional geometry columns to consider. :type extra_geometry_columns: list :return: Types dictionary mapping column to detected type info. :rtype: dict """ df, crs = read_tabular_timeseries(input_path=input_path, header=header, sep=sep, crs=crs, ts_mode_datetime_col=ts_mode_datetime_col, extra_geometry_columns=extra_geometry_columns) input_dict = { 'input_path': input_path, 'header': header, 'sep': sep, 'num_cat_perc_threshold': num_cat_perc_threshold, 'max_freq_distr': max_freq_distr, 'ts_mode': ts_mode, 'ts_mode_datetime_col': ts_mode_datetime_col, 'crs': crs, 'extra_geometry_columns': extra_geometry_columns, 'eps_distance': eps_distance} types_dict = find_types(df, input_dict=input_dict) return types_dict
[docs]def type_detection_with_config(config: dict) -> None: """ This method performs type detection on tabular or timeseries data and writes the resulting type detection dictionary based on a configuration dictionary. :param config: a dictionary with all configuration settings. :type config: dict :return: None. :rtype: None """ input_config = config.get("input", {}) output_config = config.get("output", {}) input_file_path = input_config.get("files") if isinstance(input_file_path, list): if len(input_file_path) == 1: my_file_path = os.path.abspath(input_file_path[0]) else: raise ValueError(f"Invalid input: {input_file_path} must be a valid file path or list with one file path") elif isinstance(input_file_path, str) and os.path.isfile(os.path.abspath(input_file_path)): my_file_path = os.path.abspath(input_file_path) else: raise ValueError(f"Invalid input: {input_file_path} must be a valid file path or list of file paths") output_json_path = os.path.abspath(output_config.get("json")) # Extract parameters with defaults if not provided header = input_config.get("header", 0) sep = input_config.get("sep", ',') ts_mode = input_config.get("ts_mode", False) ts_mode_datetime_col = input_config.get("ts_mode_datetime_col", None) crs = input_config.get("crs", 'EPSG:4326') num_cat_perc_threshold = input_config.get("num_cat_perc_threshold", 0.5) max_freq_distr = input_config.get("max_freq_distr", 10) eps_distance = input_config.get("eps_distance", 1000) extra_geometry_columns = input_config.get("extra_geometry_columns", None) profile_dict = type_detection( input_path=my_file_path, header=header, sep=sep, ts_mode=ts_mode, ts_mode_datetime_col=ts_mode_datetime_col, crs=crs, num_cat_perc_threshold=num_cat_perc_threshold, max_freq_distr=max_freq_distr, eps_distance=eps_distance, extra_geometry_columns=extra_geometry_columns ) write_to_json(profile_dict, output_json_path)
def __profiler_tabular_timeseries(input_dict, types_dict=None): df, crs = read_tabular_timeseries(**input_dict) input_dict['crs'] = crs filename_df = False if isinstance(input_dict['input_path'], str): filename_df = True if types_dict is None: types_dict = find_types(df, input_dict) generic_dict = calculate_generic_df(df) profile_dict = { 'analysis': { 'title': 'Profiling Report', 'date_start': datetime.utcnow(), 'date_end': datetime.utcnow(), 'duration': '' }, 'table': { 'profiler_type': '', 'light_mode': input_dict['light_mode'], 'memory_size': 0, 'record_size': 0, 'num_rows': 0, 'num_attributes': 0, 'n_cells_missing': 0, 'p_cells_missing': 0.0, 'types': [] }, 'variables': [] } if filename_df: profile_dict['analysis']['filenames'] = [input_dict['input_path']] else: profile_dict['analysis']['filenames'] = ['Profiler was not given a path'] # calculate table stats if input_dict['ts_mode']: profile_dict['table']['profiler_type'] = 'TimeSeries' else: profile_dict['table']['profiler_type'] = 'Tabular' tables_stats = calculate_table_stats(df, types_dict, generic_dict) common_keys = profile_dict['table'].keys() & tables_stats.keys() # Get common keys profile_dict['table'].update({key: tables_stats[key] for key in common_keys}) # if timeseries add gap stats if input_dict['ts_mode'] and not input_dict['light_mode']: all_gaps_dict = __calculate_gaps(df, types_dict) profile_dict['table']['ts_min_gap'] = all_gaps_dict['table']['ts_min_gap'] profile_dict['table']['ts_max_gap'] = all_gaps_dict['table']['ts_max_gap'] profile_dict['table']['ts_avg_gap'] = all_gaps_dict['table']['ts_avg_gap'] profile_dict['table']['ts_gaps_frequency_distribution'] = [] gaps_freq_distr = [] for gap, count in all_gaps_dict['table']['ts_gaps_frequency_distribution'].items(): profile_dict['table']['ts_gaps_frequency_distribution'].append({'gap_size': gap, "count": count}) # calculate variable stats + time series gap statistics profile_dict['variables'] = __calculate_variable_stats(df, types_dict, generic_dict, all_gaps_dict['variables'], light_mode=input_dict['light_mode']) else: # calculate variable stats profile_dict['variables'] = __calculate_variable_stats(df, types_dict, generic_dict, light_mode=input_dict['light_mode']) profile_dict['analysis']['date_end'] = datetime.utcnow() profile_dict['analysis']['duration'] = str( profile_dict['analysis']['date_end'] - profile_dict['analysis']['date_start']) return profile_dict # calculate variables statistics def __calculate_variable_stats(df: pd.DataFrame, types_dict: dict, generic_dict: dict, gaps_variable_dict: dict = None, light_mode=False) -> dict: all_var_list = [] for column in df: var_general = generic_dict[column] var_dict = { 'name': column, 'type': types_dict[column]['type'], 'count': var_general['count'], 'num_missing': var_general['n_missing'], 'uniqueness': var_general['p_unique'], 'p_missing': var_general['p_missing'], 'memory_size': var_general['memory_size'], 'n_unique': var_general['n_unique'], 'n_distinct': var_general['n_distinct'], 'p_distinct': var_general['p_distinct'], } if var_general['hashable'] and not light_mode: var_type = var_dict['type'].lower() if var_type == 'datetime': describe_datetime(df[column], var_dict) if var_type == 'boolean': describe_boolean(df[column], var_dict, column) if var_type == 'numeric': describe_numeric(df[column], var_dict, column, types_dict[column]['max_freq_distr']) if var_type == 'timeseries': describe_timeseries(df[column], var_dict, column, types_dict[column]['max_freq_distr'], gaps_variable_dict[column]) if var_type == 'categorical': describe_categorical(df[column], var_dict, column) if var_type == 'textual': describe_textual(df[column], var_dict, column) if var_type == 'geometry': describe_geometry(df[column], var_dict, column, types_dict[column]['crs'], types_dict[column]['eps_distance']) all_var_list.append(var_dict) return all_var_list def __calculate_gaps(df: pd.DataFrame, types_dict: dict): all_gap_dict = {'table': {}, 'variables': {}} max_gap_all = -np.Inf min_gap_all = np.Inf average_gap_all = 0 count_gaps = 0 # Dictionary with gap size as key and count as value gaps_dict = Counter() for column in df: if types_dict[column]['type'] == 'TimeSeries': all_gap_dict['variables'][column] = {} gaps = list(df[column].isnull().astype(int).groupby(df[column].notnull().astype(int).cumsum()).sum()) true_gaps = [gap for gap in gaps if gap > 0] if len(true_gaps) != 0: # Calculate the statistics for the observed gap sizes s = pd.Series(true_gaps) stats = s.describe(percentiles=[.10, .25, .75, .90]) gaps_distribution = { 'name': column, 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } all_gap_dict['variables'][column]['gaps_distribution'] = gaps_distribution # increase the global count for each gap size gaps_dict = gaps_dict + Counter(true_gaps) max_gap = max(true_gaps) min_gap = min(true_gaps) if min_gap < min_gap_all: min_gap_all = min_gap if max_gap > max_gap_all: max_gap_all = max_gap length_gaps = len(true_gaps) sum_gaps = sum(true_gaps) count_gaps += length_gaps average_gap_all += sum_gaps avg_gap = sum_gaps / length_gaps else: all_gap_dict['variables'][column]['gaps_distribution'] = {} if count_gaps != 0: average_gap_all = round(average_gap_all / count_gaps) else: min_gap_all = 0 max_gap_all = 0 if len(gaps_dict) != 0: gaps_dict = dict(gaps_dict) else: gaps_dict = {} all_gap_dict['table']['ts_min_gap'] = min_gap_all all_gap_dict['table']['ts_max_gap'] = max_gap_all all_gap_dict['table']['ts_avg_gap'] = average_gap_all all_gap_dict['table']['ts_gaps_frequency_distribution'] = gaps_dict return all_gap_dict