Source code for stelardataprofiler.hierarchical.profiler

from datetime import datetime
import os
from dataprofiler import Data, Profiler
import pandas as pd
import dateutil.parser
from ..utils import write_to_json


[docs]def profile_hierarchical_with_config(config: dict) -> None: """ This method performs profiling on hierarchical data and writes the resulting profile dictionary based on a configuration dictionary. :param config: a dictionary with all configuration settings. :type config: dict :return: None. :rtype: None """ # input file path(s) input_file_path = config['input']['files'] if isinstance(input_file_path, list): if len(input_file_path) == 1: my_file_path = os.path.abspath(input_file_path[0]) else: raise ValueError(f"Invalid input: {input_file_path} must be a valid file path or list with one file path") elif isinstance(input_file_path, str) and os.path.isfile(os.path.abspath(input_file_path)): my_file_path = os.path.abspath(input_file_path) else: raise ValueError(f"Invalid input: {input_file_path} must be a valid file path or list of file paths") # output file path output_json_path = os.path.abspath(config['output']['json']) # Run raster profile profile_dict = profile_hierarchical(my_file_path=my_file_path) # Write resulting profile dictionary write_to_json(profile_dict, output_json_path)
[docs]def profile_hierarchical(my_file_path: str) -> dict: """ This method performs profiling and generates a profiling dictionary for a given json file that exists in the given path. :param my_file_path: the path to a json file. :type my_file_path: str :return: A dict which contains the results of the profiler for the json. :rtype: dict """ profile_dict = { 'analysis': { 'title': 'Profiling Report', 'date_start': '', 'date_end': '', 'duration': '', 'filenames': [my_file_path] }, 'table': { 'profiler_type': 'Hierarchical', 'byte_size': 0, 'num_records': 0, 'depth_distribution': dict() }, 'variables': [] } now = datetime.now() start_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_start'] = start_string # File size profile_dict['table']['byte_size'] = os.path.getsize(my_file_path) data = Data(my_file_path) profile = Profiler(data, profiler_type='structured') readable_report = profile.report(report_options={'output_format': 'pretty'}) profile_dict['table']['num_records'] = readable_report['global_stats']['column_count'] depth = dict() variables = readable_report['data_stats'] for var in variables: attr = { 'name': var['column_name'], 'type': var['data_type'], 'uniqueness': var['statistics']['unique_ratio'], 'nesting_level': 0 } levels = var['column_name'].split('.') attr['nesting_level'] = len(levels) - 1 for level in range(0, attr['nesting_level'] + 1): if level in depth.keys(): depth[level].add(levels[level]) else: depth[level] = {levels[level]} profile_dict['variables'].append(attr) unique_levels = [] for level, names in depth.items(): for name in names: unique_levels.append(level) s = pd.Series(unique_levels) stats = s.describe(percentiles=[.10, .25, .75, .90]) profile_dict['table']['depth_distribution'] = { 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } now = datetime.now() end_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_end'] = end_string profile_dict['analysis']['duration'] = str( dateutil.parser.parse(profile_dict['analysis']['date_end']) - dateutil.parser.parse( profile_dict['analysis']['date_start'])) return profile_dict