Source code for stelardataprofiler.profiler

import os
import warnings
from numba.core.errors import NumbaDeprecationWarning, NumbaPendingDeprecationWarning

warnings.simplefilter('ignore', category=NumbaDeprecationWarning)
warnings.simplefilter('ignore', category=NumbaPendingDeprecationWarning)
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=UserWarning)
from pandas_profiling import ProfileReport
from pandas_profiling.model.typeset import ProfilingTypeSet
from pandas_profiling.config import Settings
from pandas_profiling.model.summarizer import PandasProfilingSummarizer
from pandas_profiling.report.presentation.core import Container
import geopandas as gp
from shapely.geometry import box
import pandas as pd
import numpy as np
from tsfresh.feature_extraction import extract_features
from pandas_profiling.utils.paths import get_config
from stelardataprofiler.report import (
    __get_report_structure,
    __get_html_report,
    __to_file,
    __to_json
)
from stelardataprofiler.profile_notebook import __get_notebook_iframe
import yaml
from sklearn.cluster import DBSCAN
from datetime import datetime
import rasterio as rio
from scipy import stats
import dateutil.parser
import json
import shutil
from pathlib import Path
from typing import Union, Any
from IPython.display import display
import nltk

nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)
nltk.download('vader_lexicon', quiet=True)
from nltk.corpus import stopwords
from spacy_language_detection import LanguageDetector, detect_langs, DetectorFactory
from ftlangdetect import detect
import fasttext

fasttext.FastText.eprint = lambda x: None
import spacy
from spacy.language import Language
import string
from nltk.stem import SnowballStemmer
from collections import Counter
import pycountry
from simplemma import lemmatize
import gensim
import math
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from dataprofiler import Data, Profiler
from rdflib import Graph
import networkx as nx
from rdflib import RDF, URIRef
from rdflib.extras.external_graph_libs import rdflib_to_networkx_multidigraph
from pyproj import CRS
import uuid
import re
from rasterio.warp import transform_bounds
from rasterio.transform import from_origin
from rasterio.io import MemoryFile

__all__ = ['run_profile', 'profile_timeseries', 'profile_timeseries_with_config',
           'profile_tabular', 'profile_tabular_with_config',
           'profile_raster', 'profile_raster_with_config',
           'profile_text', 'profile_text_with_config',
           'profile_hierarchical', 'profile_hierarchical_with_config',
           'profile_rdfGraph', 'profile_rdfGraph_with_config',
           'profile_vista_rasters', 'profile_vista_rasters_with_config',
           'prepare_mapping', 'profile_single_raster',
           'profile_multiple_rasters', 'profile_single_text',
           'profile_multiple_texts', 'write_to_json', 'read_config'
           ]

tsfresh_json_file = str(
    os.path.dirname(os.path.abspath(__file__))) + '/json_files/tsfresh_json.json'


# ------------------------------------#
# ------ PROFILER MAIN FUNCTION ------#
# ------------------------------------#
[docs]def run_profile(config: dict) -> None: """ This method executes the specified profiler and writes the resulting profile dictionary, and HTML if specified, based on a configuration dictionary. :param config: a dictionary with all configuration settings. :type config: dict :return: None. :rtype: None """ profile_type: str = config['profile']['type'].lower() if profile_type == 'timeseries': profile_timeseries_with_config(config) elif profile_type in ['tabular', 'vector']: profile_tabular_with_config(config) elif profile_type == 'raster': profile_raster_with_config(config) elif profile_type == 'textual': profile_text_with_config(config) elif profile_type == 'hierarchical': profile_hierarchical_with_config(config) elif profile_type == 'rdfgraph': profile_rdfGraph_with_config(config) elif profile_type == 'vista': profile_vista_rasters_with_config(config) else: print('The profile type is not available!\n' 'Please use one of the following types:\n' "'timeseries', 'tabular', 'vector', 'raster', 'text', 'hierarchical', 'rdfGraph', 'vista")
[docs]def prepare_mapping(config: dict) -> None: """ This method prepares the suitable mapping for subsequent generation of the RDF graph, if "rdf" and "serialization" options are specified in config. :param config: a dictionary with all configuration settings. :type config: dict :return: None. :rtype: None """ import sys # Get parameters required for conversion to RDF output_path = config['output']['path'] json_file = config['output']['json'] rdf_file = config['output']['rdf'] profile_type = config['profile']['type'].lower() rdf_serialization = config['output']['serialization'] # Handle special cases (timeseries, vector) of tabular profile if profile_type == 'vector' or profile_type == 'timeseries': profile_type = 'tabular' # Handle special cases (raster, vista) of raster profile if profile_type == 'raster' or profile_type == 'vista': profile_type = 'raster' # Concatenate path and file names in_file = os.path.join(output_path, json_file) map_template = os.path.join(os.path.dirname(os.path.abspath(__file__)) + '/mappings', profile_type + '_mapping.ttl') map_file = os.path.join(output_path, 'mapping.ttl') out_file = os.path.join(output_path, rdf_file) # Copy mapping template to temporary 'mapping.ttl' if not os.path.isfile(map_template): print('ERROR: Mapping ', map_template, 'not found! Check whether such mapping exists in', os.path.abspath(map_template)) sys.exit(1) else: shutil.copyfile(map_template, map_file) print('Mapping ', map_template, ' copied to', map_file) # Check if mapping file exists if not os.path.isfile(map_file): print('ERROR: Mapping for', profile_type, 'profiles not found! Check whether such mapping exists in', os.path.abspath(map_file)) sys.exit(1) # Edit the mapping file with open(map_file, 'r') as file: filedata = file.read() # Replace the input with the path to actual JSON profile filedata = filedata.replace('./out/profile.json', in_file) # Write the file out again with open(map_file, 'w') as file: file.write(filedata)
# ------------ TIMESERIES ------------#
[docs]def profile_timeseries_with_config(config: dict) -> None: """ This method performs profiling on timeseries data and write the resulting profile dictionary based on a configuration dictionary. :param config: a dictionary with all configuration settings. :type config: dict :return: None. :rtype: None """ input_dir_path = config['input']['path'] input_file_name = config['input']['file'] output_dir_path = config['output']['path'] output_json_name = config['output']['json'] output_html_name = '' if 'html' in config['output']: output_html_name = config['output']['html'] only_directory_path = False # Create input file path my_file_path = '' if input_file_name == '': print('No input file was found for timeseries profile!') return None else: my_file_path = os.path.abspath(os.path.join(input_dir_path, input_file_name)) # Create output file paths output_dir_path = os.path.abspath(output_dir_path) output_json_path = os.path.abspath(os.path.join(output_dir_path, output_json_name)) output_html_path = '' if output_html_name != '': output_html_path = os.path.abspath(os.path.join(output_dir_path, output_html_name)) # Run timeseries profile if 'time' in config['input']['columns']: time_column = config['input']['columns']['time'] header = config['input']['header'] sep = config['input']['separator'] profile_dict = profile_timeseries(my_file_path=my_file_path, time_column=time_column, header=header, sep=sep, html_path=output_html_path) # Write resulting profile dictionary write_to_json(profile_dict, output_json_path) else: print("Please add 'time' as key and the time column name of the input .csv " 'as value in the JSON under input.columns')
[docs]def profile_timeseries(my_file_path: str, time_column: str, header: int = 0, sep: str = ',', html_path: str = '', display_html: bool = False, mode: str = 'verbose') -> dict: """ This method performs profiling and generates a profiling dictionary for a given timeseries .csv file that exists in the given path. :param my_file_path: the path to a .csv file containing a datetime columns and one/multiple timeseries columns. :type my_file_path: str :param time_column: the name of the datetime column. :type time_column: str :param header: row to use to parse column labels. Defaults to the first row. Prior rows will be discarded. :type header: str, optional :param sep: separator character to use for the csv. :type sep: str, optional :param html_path: the file path where the html file will be saved. :type html_path: str, optional :param display_html: a boolean that determines whether the html will be displayed in the output. :type display_html: bool, optional :param mode: 'default' -> calculate tsfresh features for the timeseries and use them as variables (useful if many timeseries columns), 'verbose' -> use the timeseries as variables. :type mode: str, optional :return: A dict which contains the results of the profiler for the timeseries data. :rtype: dict """ profile_dict, config, html_dict, sample_timeseries = __profile_timeseries_main(my_file_path, time_column, header, sep, mode=mode, minimal=True) if html_path.strip() or display_html: html_report = __get_html_report(config, html_dict, sample_timeseries) if display_html: display(__get_notebook_iframe(config, html_report)) if html_path.strip(): if not isinstance(html_path, Path): html_path = Path(str(html_path)) # create parent folders if they do not exist path = Path(str(html_path.parent)) path.mkdir(parents=True, exist_ok=True) __to_file(config, html_report, html_path) return profile_dict
# -------------- TABULAR + VECTOR --------------#
[docs]def profile_tabular_with_config(config: dict) -> None: """ This method performs profiling on tabular and/or vector data and write the resulting profile dictionary based on a configuration dictionary. :param config: a dictionary with all configuration settings. :type config: dict :return: None. :rtype: None """ input_dir_path = config['input']['path'] input_file_name = config['input']['file'] output_dir_path = config['output']['path'] output_json_name = config['output']['json'] output_html_name = '' if 'html' in config['output']: output_html_name = config['output']['html'] only_directory_path = False # Create input file path my_file_path = '' if input_file_name == '': print('No input file was found for tabular and/or vector profiles!') return None else: my_file_path = os.path.abspath(os.path.join(input_dir_path, input_file_name)) # Create output file paths output_dir_path = os.path.abspath(output_dir_path) output_json_path = os.path.abspath(os.path.join(output_dir_path, output_json_name)) output_html_path = '' if output_html_name != '': output_html_path = os.path.abspath(os.path.join(output_dir_path, output_html_name)) # Run tabular/vector profile header = 0 sep = ',' if 'header' in config['input']: header = config['input']['header'] if 'separator' in config['input']: sep = config['input']['separator'] columns_dict: dict = config['input']['columns'] longitude_column: str = None latitude_column: str = None wkt_column: str = None if ('longitude' in columns_dict) and ('latitude' in columns_dict) and ('wkt' in columns_dict): longitude_column = columns_dict['longitude'] latitude_column = columns_dict['latitude'] wkt_column = columns_dict['wkt'] elif ('longitude' in columns_dict) and ('latitude' in columns_dict): longitude_column = columns_dict['longitude'] latitude_column = columns_dict['latitude'] elif 'wkt' in columns_dict: wkt_column = columns_dict['wkt'] profile_dict = profile_tabular(my_file_path=my_file_path, header=header, sep=sep, longitude_column=longitude_column, latitude_column=latitude_column, wkt_column=wkt_column, html_path=output_html_path) # Write resulting profile dictionary write_to_json(profile_dict, output_json_path)
[docs]def profile_tabular(my_file_path: str, header: int = 0, sep: str = ',', crs: str = "EPSG:4326", longitude_column: str = None, latitude_column: str = None, wkt_column: str = None, html_path: str = '', display_html: bool = False) -> dict: """ This method performs profiling and generates a profiling dictionary for a given tabular .csv or .shp file that exists in the given path. :param my_file_path: the path to a .csv or .shp file containing different data types of columns. :type my_file_path: str :param header: row to use to parse column labels. Defaults to the first row. Prior rows will be discarded. :type header: str, optional :param sep: separator character to use for the csv. :type sep: str, optional :param crs: the Coordinate Reference System (CRS) represented as an authority string (eg "EPSG:4326"). :type crs: str, optional :param longitude_column: the name of the longitude column. :type longitude_column: str, optional :param latitude_column: the name of the latitude column. :type latitude_column: str, optional :param wkt_column: the name of the column that has wkt geometries. :type wkt_column: str, optional :param html_path: the file path where the html file will be saved. :type html_path: str, optional :param display_html: a boolean that determines whether the html will be displayed in the output. :type display_html: bool, optional :return: A dict which contains the results of the profiler for the tabular data. :rtype: dict """ profile_dict, config, html_dict = __profile_tabular_main(my_file_path=my_file_path, header=header, sep=sep, longitude_column=longitude_column, latitude_column=latitude_column, wkt_column=wkt_column, minimal=True) if html_path.strip() or display_html: html_report = __get_html_report(config, html_dict, None) if display_html: display(__get_notebook_iframe(config, html_report)) if html_path.strip(): if not isinstance(html_path, Path): html_path = Path(str(html_path)) # create parent folders if they do not exist path = Path(str(html_path.parent)) path.mkdir(parents=True, exist_ok=True) __to_file(config, html_report, html_path) return profile_dict
# -------------- RASTER --------------# # ----------- SINGLE IMAGE -----------#
[docs]def profile_single_raster(my_file_path: str) -> dict: """ This method performs profiling and generates a profiling dictionary for an image file that exists in the given path. :param my_file_path: the path to an image file. :type my_file_path: str :return: A dict which contains the results of the profiler for the image. :rtype: dict """ if os.path.isdir(my_file_path): print('The input is not a file!') return dict() profile_dict = { 'analysis': { 'title': 'Profiling Report', 'date_start': '', 'date_end': '', 'duration': '', 'filenames': [my_file_path] }, 'table': { 'profiler_type': 'Raster', 'byte_size': 0, 'n_of_imgs': 1, 'avg_width': 0, 'avg_height': 0, }, 'variables': [], 'package': { 'pandas_profiling_version': 'v3.5.0', 'pandas_profiling_config': '' } } # Start time now = datetime.now() start_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_start'] = start_string # File size profile_dict['table']['byte_size'] = os.path.getsize(my_file_path) # Create image dictionary img_dict = { 'name': '', 'type': 'Raster', 'crs': '', 'spatial_coverage': '', 'spatial_resolution': { 'pixel_size_x': 0, 'pixel_size_y': 0 }, 'no_data_value': '', 'format': '' } # Read image img = rio.open(my_file_path) # find image name pattern = '[\w-]+?(?=\.)' # searching the pattern a = re.search(pattern, my_file_path) # printing the match img_dict['name'] = a.group() # find general image data img_dict.update(img.meta) # making transform JSON-serializable img_dict['transform'] = list(img_dict['transform']) profile_dict['table']['avg_width'] = img_dict['width'] profile_dict['table']['avg_height'] = img_dict['height'] # change nodata and driver keys img_dict['no_data_value'] = img_dict['nodata'] del img_dict['nodata'] img_dict['format'] = img_dict['driver'] del img_dict['driver'] # find tags img_dict['tags'] = [] for k, v in img.tags().items(): tag_dict = { 'key': k, 'value': v } img_dict['tags'].append(tag_dict) # change crs format if img.crs is not None: crs_list = CRS.from_string(str(img_dict['crs'])) img_dict['crs'] = 'EPSG:' + str(crs_list.to_epsg()) else: img_dict['crs'] = 'EPSG:4326' # calculate spatial resolution pixelSizeX, pixelSizeY = img.res img_dict['spatial_resolution']['pixel_size_x'] = pixelSizeX img_dict['spatial_resolution']['pixel_size_y'] = pixelSizeY # calculate spatial coverage # Bounding box (in the original CRS) bounds = img.bounds xmin, ymin, xmax, ymax = transform_bounds(CRS.from_string(img_dict['crs']), CRS.from_epsg(4326), *bounds) geom = box(xmin, ymin, xmax, ymax) img_dict['spatial_coverage'] = geom.wkt img_dict['bands'] = [] # statistics for each band for band in range(1, img.count + 1): band_data = img.read(band).reshape(1, img.meta['width'] * img.meta['height'])[0].T # find band name if list(img.descriptions): band_name = img.descriptions[band - 1] if band_name is None: band_name = 'undefined' else: band_name = 'undefined' # find band statistics s = pd.Series(band_data) stats = s.describe(percentiles=[.10, .25, .75, .90]) band_dict = { 'uuid': str(uuid.uuid4()), 'name': band_name, 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } img_dict['bands'].append(band_dict) profile_dict['variables'].append(img_dict) # End time now = datetime.now() end_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_end'] = end_string # Time Difference profile_dict['analysis']['duration'] = str( dateutil.parser.parse(profile_dict['analysis']['date_end']) - dateutil.parser.parse( profile_dict['analysis']['date_start'])) return profile_dict
# ----------- MULTIPLE IMAGES -----------#
[docs]def profile_multiple_rasters(my_folder_path: str, image_format: str = '.tif') -> dict: """ This method performs profiling and generates a profiling dictionary for the image files that exist in the given folder path. :param my_folder_path: the path to a folder that has image files. :type my_folder_path: str :param image_format: the suffix of the images that exist in the given folder path. :type image_format: str, optional :return: A dict which contains the results of the profiler for the images. :rtype: dict """ if os.path.isfile(my_folder_path): print('The input is not a folder!') return dict() profile_dict = { 'analysis': { 'title': 'Profiling Report', 'date_start': '', 'date_end': '', 'duration': '', 'filenames': [] }, 'table': { 'profiler_type': 'Raster', 'byte_size': 0, 'n_of_imgs': 0, 'avg_width': 0, 'avg_height': 0, 'combined_band_stats': [] }, 'variables': [], 'package': { 'pandas_profiling_version': 'v3.5.0', 'pandas_profiling_config': '' } } # in dictionary if same band name in more than one images band_images = dict() # Start time now = datetime.now() start_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_start'] = start_string for image in os.listdir(my_folder_path): if image.lower().endswith(image_format.lower()): my_file_path = my_folder_path + '/' + image profile_dict['analysis']['filenames'].append(my_file_path) # Files size profile_dict['table']['byte_size'] += os.path.getsize(my_file_path) # Increase the number of images profile_dict['table']['n_of_imgs'] += 1 # Create image dictionary img_dict = { 'name': '', 'type': 'Raster', 'crs': '', 'spatial_coverage': '', 'spatial_resolution': { 'pixel_size_x': 0, 'pixel_size_y': 0 }, 'no_data_value': '', 'format': '' } # Read image img = rio.open(my_file_path) # find image name pattern = '[\w-]+?(?=\.)' # searching the pattern a = re.search(pattern, my_file_path) # printing the match img_dict['name'] = a.group() # find general image data img_dict.update(img.meta) # making transform JSON-serializable img_dict['transform'] = list(img_dict['transform']) profile_dict['table']['avg_width'] += img_dict['width'] profile_dict['table']['avg_height'] += img_dict['height'] # change nodata and driver keys img_dict['no_data_value'] = img_dict['nodata'] del img_dict['nodata'] img_dict['format'] = img_dict['driver'] del img_dict['driver'] # find tags img_dict['tags'] = [] for k, v in img.tags().items(): tag_dict = { 'key': k, 'value': v } img_dict['tags'].append(tag_dict) # change crs format if img.crs is not None: crs_list = CRS.from_string(str(img_dict['crs'])) img_dict['crs'] = 'EPSG:' + str(crs_list.to_epsg()) else: img_dict['crs'] = 'EPSG:4326' # calculate spatial resolution pixelSizeX, pixelSizeY = img.res img_dict['spatial_resolution']['pixel_size_x'] = pixelSizeX img_dict['spatial_resolution']['pixel_size_y'] = pixelSizeY # calculate spatial coverage # Bounding box (in the original CRS) bounds = img.bounds xmin, ymin, xmax, ymax = transform_bounds(CRS.from_string(img_dict['crs']), CRS.from_epsg(4326), *bounds) geom = box(xmin, ymin, xmax, ymax) img_dict['spatial_coverage'] = geom.wkt img_dict['bands'] = [] # statistics for each band for band in range(1, img.count + 1): band_data = img.read(band).reshape(1, img.meta['width'] * img.meta['height'])[0].T # find band name band_name = 'undefined' if list(img.descriptions): band_name = img.descriptions[band - 1] if band_name is None: band_name = 'undefined' else: band_name = 'undefined' # find band statistics s = pd.Series(band_data) stats = s.describe(percentiles=[.10, .25, .75, .90]) band_dict = { 'uuid': str(uuid.uuid4()), 'name': band_name, 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } img_dict['bands'].append(band_dict) if band_name != 'undefined': if band_name not in band_images: band_images[band_name] = [img_dict['name']] else: band_images[band_name].append(img_dict['name']) profile_dict['variables'].append(img_dict) # calculate combined_band_stats for k, v in band_images.items(): if len(v) > 1: combined_band_dict = { 'name': k, 'n_of_imgs': len(v), 'img_names': v, 'count': 0, 'min': math.inf, 'average': 0, 'max': -math.inf, 'variance': 0 } for image in profile_dict['variables']: if image['name'] in v: for band in image['bands']: if band['name'] == k: combined_band_dict['count'] += band['count'] combined_band_dict['average'] += band['average'] * band['count'] if band['min'] < combined_band_dict['min']: combined_band_dict['min'] = band['min'] if band['max'] > combined_band_dict['max']: combined_band_dict['max'] = band['max'] break combined_band_dict['average'] = combined_band_dict['average'] / combined_band_dict['count'] # calculate combined_variance # comb_var = (n*std1 + n*d_sqrt1 + m*std2 + m*d_sqrt2 + k*std3 + k*d_sqrt3)/ n + m + k for image in profile_dict['variables']: if image['name'] in v: for band in image['bands']: if band['name'] == k: count = band['count'] std = band['stddev'] mean = band['average'] comb_mean = combined_band_dict['average'] d_sqrt = (mean - comb_mean) * (mean - comb_mean) combined_band_dict['variance'] += count * std + count * d_sqrt break combined_band_dict['variance'] = combined_band_dict['variance'] / combined_band_dict['count'] profile_dict['table']['combined_band_stats'].append(combined_band_dict) # fill general image folder data profile_dict['table']['avg_width'] = profile_dict['table']['avg_width'] / profile_dict['table']['n_of_imgs'] profile_dict['table']['avg_height'] = profile_dict['table']['avg_height'] / profile_dict['table']['n_of_imgs'] # End time now = datetime.now() end_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_end'] = end_string # Time Difference profile_dict['analysis']['duration'] = str( dateutil.parser.parse(profile_dict['analysis']['date_end']) - dateutil.parser.parse( profile_dict['analysis']['date_start'])) return profile_dict
# ----------- MAIN FUNCTION ----------#
[docs]def profile_raster_with_config(config: dict) -> None: """ This method performs profiling on raster data and write the resulting profile dictionary based on a configuration dictionary. :param config: a dictionary with all configuration settings. :type config: dict :return: None. :rtype: None """ input_dir_path = config['input']['path'] input_file_name = '' if 'file' in config['input']: input_file_name = config['input']['file'] output_dir_path = config['output']['path'] output_json_name = config['output']['json'] # Create input file path only_directory_path = False if input_file_name == '': my_path = os.path.abspath(input_dir_path) only_directory_path = True else: my_path = os.path.abspath(os.path.join(input_dir_path, input_file_name)) # Create output file paths output_dir_path = os.path.abspath(output_dir_path) output_json_path = os.path.abspath(os.path.join(output_dir_path, output_json_name)) # Run raster profile if only_directory_path: print('You are running raster profile for multiple image files!\n' 'Please make sure you have the right format for the image files.') if 'format' not in config['input']: print("No format is specified so the default '.tif' is used.") image_format: str = '.tif' else: image_format: str = str(config['input']['format']).lower() profile_dict = profile_raster(my_path=my_path, image_format=image_format) else: profile_dict = profile_raster(my_path=my_path) # Write resulting profile dictionary write_to_json(profile_dict, output_json_path)
[docs]def profile_raster(my_path: str, image_format: str = '.tif') -> dict: """ This method performs profiling and generates a profiling dictionary for either a single image or many images. :param my_path: the path to either an image file or a folder that has image files. :type my_path: str :param image_format: the suffix of the images that exist in the folder if the given path is a folder path. :type image_format: str, optional :return: A dict which contains the results of the profiler for the image or images. :rtype: dict """ if os.path.isfile(my_path): profile_dict = profile_single_raster(my_path) elif os.path.isdir(my_path): profile_dict = profile_multiple_rasters(my_path, image_format) else: profile_dict = dict() return profile_dict
# -------------- TEXTUAL -------------# # ----------- SINGLE TEXT -----------#
[docs]def profile_single_text(my_file_path: str) -> dict: """ This method performs profiling and generates a profiling dictionary for a text file that exists in the given path. :param my_file_path: the path to a text file. :type my_file_path: str :return: A dict which contains the results of the profiler for the text. :rtype: dict """ # Used in language detection def __get_lang_detector(nlp, name): return LanguageDetector(seed=2023) # Calculate TermFrequency and generate a matrix def __create_tf_matrix(freq_matrix): tf_matrix = {} for sent, f_table in freq_matrix.items(): tf_table = {} count_words_in_sentence = len(f_table) for word, count in f_table.items(): tf_table[word] = count / count_words_in_sentence tf_matrix[sent] = tf_table return tf_matrix # Create a table for documents per words def __create_documents_per_words(freq_matrix): word_per_doc_table = {} for sent, f_table in freq_matrix.items(): for word, count in f_table.items(): if word in word_per_doc_table: word_per_doc_table[word] += 1 else: word_per_doc_table[word] = 1 return word_per_doc_table # Calculate IDF and generate a matrix def __create_idf_matrix(freq_matrix, count_doc_per_words, total_documents): idf_matrix = {} for sent, f_table in freq_matrix.items(): idf_table = {} for word in f_table.keys(): idf_table[word] = math.log10(total_documents / float(count_doc_per_words[word])) idf_matrix[sent] = idf_table return idf_matrix # Calculate TF-IDF and generate a matrix def __create_tf_idf_matrix(tf_matrix, idf_matrix): tf_idf_matrix = {} for (sent1, f_table1), (sent2, f_table2) in zip(tf_matrix.items(), idf_matrix.items()): tf_idf_table = {} for (word1, value1), (word2, value2) in zip(f_table1.items(), f_table2.items()): # here, keys are the same in both the table tf_idf_table[word1] = float(value1 * value2) tf_idf_matrix[sent1] = tf_idf_table return tf_idf_matrix # Important Algorithm: score the sentences def __score_sentences(tf_idf_matrix) -> dict: """ score a sentence by its word's TF Basic algorithm: adding the TF frequency of every non-stop word in a sentence divided by total no of words in a sentence. :rtype: dict """ sentenceValue = {} for sent, f_table in tf_idf_matrix.items(): total_score_per_sentence = 0 count_words_in_sentence = len(f_table) for word, score in f_table.items(): total_score_per_sentence += score if count_words_in_sentence != 0: sentenceValue[sent] = total_score_per_sentence / count_words_in_sentence else: sentenceValue[sent] = 0 return sentenceValue # Find the threshold def __find_average_score(sentenceValue) -> int: """ Find the average score from the sentence value dictionary :rtype: int """ sumValues = 0 for entry in sentenceValue: sumValues += sentenceValue[entry] # Average value of a sentence from original summary_text average = (sumValues / len(sentenceValue)) return average # Important Algorithm: Generate the summary def __generate_summary(sentences, sentenceValue, threshold): sentence_count = 0 summary = '' for sentence in sentences: if sentence[:15] in sentenceValue and sentenceValue[sentence[:15]] >= threshold: summary += " " + sentence sentence_count += 1 return summary.strip() if os.path.isdir(my_file_path): print('The input is not a file!') return dict() profile_dict = { 'analysis': { 'title': 'Profiling Report', 'date_start': '', 'date_end': '', 'duration': '', 'filenames': [my_file_path] }, 'table': { 'profiler_type': 'Textual', 'num_texts': 1, 'num_words': 0, 'num_sentences': 0, 'num_distinct_words': 0, 'num_characters': 0, 'ratio_uppercase': 0, 'ratio_digits': 0, 'ratio_special_characters': 0, 'language': '', 'language_distribution': [], 'sentiment': 0, 'named_entities': [], 'term_frequency': [] }, 'variables': [], 'package': { 'pandas_profiling_version': 'v3.5.0', 'pandas_profiling_config': '' } } now = datetime.now() start_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_start'] = start_string with open(my_file_path, 'r+') as text: text_dict = { 'name': '', 'type': 'Text', 'num_words': 0, 'num_sentences': 0, 'num_distinct_words': 0, 'num_characters': 0, 'ratio_uppercase': 0, 'ratio_digits': 0, 'ratio_special_characters': 0, 'language': '', 'language_distribution': [], 'summary': '', 'topics': [], 'sentiment': 0, 'named_entities': [], 'term_frequency': [], 'special_characters_distribution': [], 'sentence_length_distribution': dict(), 'word_length_distribution': dict(), } # key is a special character and how many times is has been found in the text special_chars = {} # add the length of each word in the list to be used in the calculation of word_length_distribution word_length_list = [] # add the length of each sentence in the list to be used in the calculation of sentence_length_distribution sentence_length_list = [] # find text name pattern = '[\w-]+?(?=\.)' # searching the pattern a = re.search(pattern, my_file_path) text_dict['name'] = a.group() file_contents = text.read() file_contents = ' '.join(file_contents.split()) string_encode = file_contents.encode("ascii", "ignore") file_contents = string_encode.decode() # Find number of words words = nltk.word_tokenize(file_contents.lower()) words_count = 0 for word in words: words_count += 1 word_length_list.append(len(word)) profile_dict['table']['num_words'] = words_count text_dict['num_words'] = words_count # Find number of sentences sentences = nltk.sent_tokenize(file_contents) sentences_count = 0 for sentence in sentences: sentences_count += 1 sentence_length_list.append(len(sentence)) profile_dict['table']['num_sentences'] = sentences_count text_dict['num_sentences'] = sentences_count # Find Distinct/Unique words unique_words = sorted(set(words)) unique_words_count = len(unique_words) # set_of_unique_words.update(unique_words) profile_dict['table']['num_distinct_words'] = unique_words_count text_dict['num_distinct_words'] = unique_words_count # Find number of characters numCharacters = len(file_contents) text_dict['num_characters'] = numCharacters profile_dict['table']['num_characters'] = numCharacters # ratio_uppercase, ratio_digits, ratio_special_characters ratioUppercase = 0 ratioDigits = 0 ratioSpecialChars = 0 for c in file_contents: if c.isupper(): ratioUppercase += 1 if c.isdigit(): ratioDigits += 1 if not c.isalnum(): ratioSpecialChars += 1 if c not in special_chars: special_chars[c] = 1 else: special_chars[c] += 1 text_dict['ratio_uppercase'] = ratioUppercase / numCharacters text_dict['ratio_digits'] = ratioDigits / numCharacters text_dict['ratio_special_characters'] = ratioSpecialChars / numCharacters profile_dict['table']['ratio_uppercase'] = text_dict['ratio_uppercase'] profile_dict['table']['ratio_digits'] = text_dict['ratio_digits'] profile_dict['table']['ratio_special_characters'] = text_dict['ratio_special_characters'] # Find languages try: nlp = spacy.load('en_core_web_sm') except OSError: print('Downloading language model for the spaCy POS tagger\n' "(don't worry, this will only happen once)") from spacy.cli import download download('en') nlp = spacy.load('en_core_web_sm') if not Language.has_factory("language_detector"): Language.factory("language_detector", func=__get_lang_detector) nlp.add_pipe('language_detector', last=True) doc = nlp(file_contents) languages = {} cleaned_text = ' ' lemma_text = ' ' freq_matrix = Counter() for i, sent in enumerate(doc.sents): if sent.text: sentence = sent.text if pycountry.languages.get(alpha_2=sent._.language['language']) is not None: language = pycountry.languages.get(alpha_2=sent._.language['language']).name.lower() else: language = 'english' length_sent = len(sentence) if language not in languages: languages[language] = float(sent._.language[ 'score'] * length_sent / sentences_count * numCharacters) else: languages[language] += float(sent._.language[ 'score'] * length_sent / sentences_count * numCharacters) # Clean the sentence using the detecting language # Punctuation Removal cleaned_sentence = sentence.lower() for val in string.punctuation: if val not in "'": if val in "-": cleaned_sentence = cleaned_sentence.replace(val, " ") else: cleaned_sentence = cleaned_sentence.replace(val, "") cleaned_sentence = ' '.join(cleaned_sentence.split()).strip() words = cleaned_sentence.split() # Stopword Removal if language in stopwords.fileids(): stop_words = set(stopwords.words(language)) cleaned_words = [w for w in words if not w in stop_words] else: cleaned_words = words # Stemming stemmed_words = [] if language in list(SnowballStemmer.languages): stemmer = SnowballStemmer(language=language) for word in cleaned_words: word = stemmer.stem(word) stemmed_words.append(word) else: stemmed_words = cleaned_words # Lemma lemmatized_words = [] if pycountry.languages.get(name=language) is not None: for word in cleaned_words: word = lemmatize(word, pycountry.languages.get(name=language).alpha_2) lemmatized_words.append(word) else: lemmatized_words = cleaned_words # freq_matrix will be used in summary extraction freq_matrix[sentence[:15]] = dict(Counter(stemmed_words)) # add stemmed sentence to the cleaned_text cleaned_sentence = " ".join(stemmed_words) cleaned_text += cleaned_sentence.strip() cleaned_text += ' ' # lemmatized text will be used in topic extraction lemmatized_text = " ".join(lemmatized_words) lemma_text += lemmatized_text.strip() lemma_text += ' ' # Normalize language percentages total = sum(languages.values(), float(0)) n_languages = {k: v * 100 / total for k, v in languages.items()} languages = n_languages # Find language most used in the text text_dict['language'] = max(languages, key=languages.get) profile_dict['table']['language'] = text_dict['language'] # calculate language_distribution where all languages have percentages based on the sentences each language was detected total = sum(languages.values(), float(0)) unknown_language_perc = 100 for k, v in languages.items(): if total >= 100: new_v = v * 100 / total text_dict['language_distribution'].append( {'name': text_dict['name'], 'language': k, "percentage": new_v}) profile_dict['table']['language_distribution'].append({'language': k, "percentage": new_v}) else: text_dict['language_distribution'].append({'name': text_dict['name'], 'language': k, "percentage": v}) profile_dict['table']['language_distribution'].append({'language': k, "percentage": v}) unknown_language_perc -= v # Summary Extraction if len(file_contents.replace(" ", "")) > 300: ''' Term frequency (TF) is how often a word appears in a document, divided by how many words are there in a document. ''' # Calculate TermFrequency and generate a matrix tf_matrix = __create_tf_matrix(freq_matrix) # creating table for documents per words count_doc_per_words = __create_documents_per_words(freq_matrix) ''' Inverse document frequency (IDF) is how unique or rare a word is. ''' # Calculate IDF and generate a matrix idf_matrix = __create_idf_matrix(freq_matrix, count_doc_per_words, sentences_count) # Calculate TF-IDF and generate a matrix tf_idf_matrix = __create_tf_idf_matrix(tf_matrix, idf_matrix) # Important Algorithm: score the sentences sentence_scores = __score_sentences(tf_idf_matrix) # Find the threshold threshold = __find_average_score(sentence_scores) # Important Algorithm: Generate the summary summary = __generate_summary(sentences, sentence_scores, 1.8 * threshold) if not summary: summary = __generate_summary(sentences, sentence_scores, threshold) text_dict['summary'] = summary else: text_dict['summary'] = summary else: text_dict['summary'] = file_contents # Topic Extraction corpus = [lemma_text.split(' ')] dic = gensim.corpora.Dictionary(corpus) bow_corpus = [dic.doc2bow(doc) for doc in corpus] lda_model = gensim.models.LdaModel(bow_corpus, num_topics=1, id2word=dic, passes=100, iterations=100, random_state=2023, alpha='asymmetric') text_dict['topics'] = list( [token for token, score in lda_model.show_topic(i, topn=10)] for i in range(0, lda_model.num_topics))[0] # Sentiment Analysis sia = SentimentIntensityAnalyzer() compound_score = sia.polarity_scores(file_contents)['compound'] text_dict['sentiment'] = compound_score profile_dict['table']['sentiment'] = compound_score # Named Entity Extraction named_entities = {} for X in doc.ents: sentence = X.text for val in string.punctuation: if val not in "'": if val in "-": sentence = sentence.replace(val, " ") else: sentence = sentence.replace(val, "") sentence = ' '.join(sentence.split()).strip() named_entities[sentence] = X.label_ for ne, neType in named_entities.items(): text_dict['named_entities'].append({'named_entity': ne, "type": neType}) profile_dict['table']['named_entities'].append({'named_entity': ne, "type": neType}) # Term Frequency data_analysis = dict( sorted(nltk.FreqDist(nltk.word_tokenize(cleaned_text)).items(), key=lambda item: item[1], reverse=True)) for term, v in data_analysis.items(): text_dict['term_frequency'].append({'name': text_dict['name'], 'term': term, "count": v}) profile_dict['table']['term_frequency'].append({'term': term, "count": v}) # text_dict['term_frequency'] = data_analysis # profile_dict['table']['term_frequency'] = data_analysis # calculate special_characters_distribution (FrequencyDistr) for k, v in special_chars.items(): text_dict['special_characters_distribution'].append({'name': text_dict['name'], 'type': k, "count": v}) # calculate sentence_length_distribution s = pd.Series(sentence_length_list) stats = s.describe(percentiles=[.10, .25, .75, .90]) text_dict['sentence_length_distribution'] = { 'name': text_dict['name'], 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } # calculate word_length_distribution s = pd.Series(word_length_list) stats = s.describe(percentiles=[.10, .25, .75, .90]) text_dict['word_length_distribution'] = { 'name': text_dict['name'], 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } profile_dict['variables'].append(text_dict) now = datetime.now() end_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_end'] = end_string profile_dict['analysis']['duration'] = str( dateutil.parser.parse(profile_dict['analysis']['date_end']) - dateutil.parser.parse( profile_dict['analysis']['date_start'])) return profile_dict
# ----------- MULTIPLE TEXTS -----------#
[docs]def profile_multiple_texts(my_folder_path: str, text_format: str = 'txt') -> dict: """ This method performs profiling and generates a profiling dictionary for the text files that exist in the given folder path. :param my_folder_path: the path to a folder that has text files. :type my_folder_path: str :param text_format: the suffix of the texts that exist in the given folder path. :type text_format: str, optional :return: A dict which contains the results of the profiler for the texts. :rtype: dict """ # Used in language detection def __get_lang_detector(nlp, name): return LanguageDetector(seed=2023) # Calculate TermFrequency and generate a matrix def __create_tf_matrix(freq_matrix): tf_matrix = {} for sent, f_table in freq_matrix.items(): tf_table = {} count_words_in_sentence = len(f_table) for word, count in f_table.items(): tf_table[word] = count / count_words_in_sentence tf_matrix[sent] = tf_table return tf_matrix # Create a table for documents per words def __create_documents_per_words(freq_matrix): word_per_doc_table = {} for sent, f_table in freq_matrix.items(): for word, count in f_table.items(): if word in word_per_doc_table: word_per_doc_table[word] += 1 else: word_per_doc_table[word] = 1 return word_per_doc_table # Calculate IDF and generate a matrix def __create_idf_matrix(freq_matrix, count_doc_per_words, total_documents): idf_matrix = {} for sent, f_table in freq_matrix.items(): idf_table = {} for word in f_table.keys(): idf_table[word] = math.log10(total_documents / float(count_doc_per_words[word])) idf_matrix[sent] = idf_table return idf_matrix # Calculate TF-IDF and generate a matrix def __create_tf_idf_matrix(tf_matrix, idf_matrix): tf_idf_matrix = {} for (sent1, f_table1), (sent2, f_table2) in zip(tf_matrix.items(), idf_matrix.items()): tf_idf_table = {} for (word1, value1), (word2, value2) in zip(f_table1.items(), f_table2.items()): # here, keys are the same in both the table tf_idf_table[word1] = float(value1 * value2) tf_idf_matrix[sent1] = tf_idf_table return tf_idf_matrix # Important Algorithm: score the sentences def __score_sentences(tf_idf_matrix) -> dict: """ score a sentence by its word's TF Basic algorithm: adding the TF frequency of every non-stop word in a sentence divided by total no of words in a sentence. :rtype: dict """ sentenceValue = {} for sent, f_table in tf_idf_matrix.items(): total_score_per_sentence = 0 count_words_in_sentence = len(f_table) for word, score in f_table.items(): total_score_per_sentence += score if count_words_in_sentence != 0: sentenceValue[sent] = total_score_per_sentence / count_words_in_sentence else: sentenceValue[sent] = 0 return sentenceValue # Find the threshold def __find_average_score(sentenceValue) -> int: """ Find the average score from the sentence value dictionary :rtype: int """ sumValues = 0 for entry in sentenceValue: sumValues += sentenceValue[entry] # Average value of a sentence from original summary_text average = (sumValues / len(sentenceValue)) return average # Important Algorithm: Generate the summary def __generate_summary(sentences, sentenceValue, threshold): sentence_count = 0 summary = '' for sentence in sentences: if sentence[:15] in sentenceValue and sentenceValue[sentence[:15]] >= threshold: summary += " " + sentence sentence_count += 1 return summary.strip() if os.path.isfile(my_folder_path): print('The input is not a folder!') return dict() profile_dict = { 'analysis': { 'title': 'Profiling Report', 'date_start': '', 'date_end': '', 'duration': '', 'filenames': [] }, 'table': { 'profiler_type': 'Textual', 'num_texts': 0, 'num_words': 0, 'num_sentences': 0, 'num_distinct_words': 0, 'num_characters': 0, 'ratio_uppercase': 0, 'ratio_digits': 0, 'ratio_special_characters': 0, 'language': '', 'language_distribution': [], 'sentiment': 0, 'sentiment_analysis': { 'compound_mean': 0.0, 'compound_levels': { '(-1, -0.5)': 0, '(-0.5, 0)': 0, '(0, 0.5)': 0, '(0.5, 1)': 0 } }, 'term_frequency': [] }, 'variables': [], 'package': { 'pandas_profiling_version': 'v3.5.0', 'pandas_profiling_config': '' } } now = datetime.now() start_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_start'] = start_string corpus_languages = dict() set_of_unique_words = set() dict_term_freq = dict() compound_scores = { '(-1, -0.5)': 0, '(-0.5, 0)': 0, '(0, 0.5)': 0, '(0.5, 1)': 0 } for text_file in os.listdir(my_folder_path): if text_file.lower().endswith(text_format.lower()): filepath = my_folder_path + '/' + text_file profile_dict['analysis']['filenames'].append(filepath) with open(filepath, 'r+') as text: text_dict = { 'name': text_file.split('.')[0], 'type': 'Text', 'num_words': 0, 'num_sentences': 0, 'num_distinct_words': 0, 'num_characters': 0, 'ratio_uppercase': 0, 'ratio_digits': 0, 'ratio_special_characters': 0, 'language': '', 'language_distribution': [], 'summary': '', 'topics': [], 'sentiment': 0, 'named_entities': [], 'term_frequency': [], 'special_characters_distribution': [], 'sentence_length_distribution': dict(), 'word_length_distribution': dict(), } # key is a special character and how many times is has been found in the text special_chars = {} # add the length of each word in the list to be used in the calculation of word_length_distribution word_length_list = [] # add the length of each sentence in the list to be used in the calculation of sentence_length_distribution sentence_length_list = [] file_contents = text.read() file_contents = ' '.join(file_contents.split()) string_encode = file_contents.encode("ascii", "ignore") file_contents = string_encode.decode() if file_contents: profile_dict['table']['num_texts'] += 1 # Find number of words words = nltk.word_tokenize(file_contents.lower()) words_count = 0 for word in words: words_count += 1 word_length_list.append(len(word)) profile_dict['table']['num_words'] += words_count text_dict['num_words'] = words_count # Find number of sentences sentences = nltk.sent_tokenize(file_contents) sentences_count = 0 for sentence in sentences: sentences_count += 1 sentence_length_list.append(len(sentence)) profile_dict['table']['num_sentences'] += sentences_count text_dict['num_sentences'] = sentences_count # Find Distinct/Unique words unique_words = sorted(set(words)) unique_words_count = len(unique_words) set_of_unique_words.update(unique_words) text_dict['num_distinct_words'] = unique_words_count # Find number of characters numCharacters = len(file_contents) text_dict['num_characters'] = numCharacters profile_dict['table']['num_characters'] += numCharacters # ratio_uppercase, ratio_digits, ratio_special_characters ratioUppercase = 0 ratioDigits = 0 ratioSpecialChars = 0 for c in file_contents: if c.isupper(): ratioUppercase += 1 if c.isdigit(): ratioDigits += 1 if not c.isalnum(): ratioSpecialChars += 1 if c not in special_chars: special_chars[c] = 1 else: special_chars[c] += 1 text_dict['ratio_uppercase'] = ratioUppercase / numCharacters text_dict['ratio_digits'] = ratioDigits / numCharacters text_dict['ratio_special_characters'] = ratioSpecialChars / numCharacters profile_dict['table']['ratio_uppercase'] += ratioUppercase profile_dict['table']['ratio_digits'] += ratioDigits profile_dict['table']['ratio_special_characters'] += ratioSpecialChars # Find languages try: nlp = spacy.load('en_core_web_sm') except OSError: print('Downloading language model for the spaCy POS tagger\n' "(don't worry, this will only happen once)") from spacy.cli import download download('en') nlp = spacy.load('en_core_web_sm') if not Language.has_factory("language_detector"): Language.factory("language_detector", func=__get_lang_detector) nlp.add_pipe('language_detector', last=True) doc = nlp(file_contents) languages = {} cleaned_text = '' lemma_text = '' freq_matrix = Counter() for i, sent in enumerate(doc.sents): if sent.text: sentence = sent.text if pycountry.languages.get(alpha_2=sent._.language['language']) is not None: language = pycountry.languages.get(alpha_2=sent._.language['language']).name.lower() else: language = 'english' length_sent = len(sentence) if language not in languages: languages[language] = float(sent._.language[ 'score'] * length_sent / sentences_count * numCharacters) else: languages[language] += float(sent._.language[ 'score'] * length_sent / sentences_count * numCharacters) # Clean the sentence using the detecting language # Punctuation Removal cleaned_sentence = sentence.lower() for val in string.punctuation: if val not in "'": if val in "-": cleaned_sentence = cleaned_sentence.replace(val, " ") else: cleaned_sentence = cleaned_sentence.replace(val, "") cleaned_sentence = ' '.join(cleaned_sentence.split()).strip() words = cleaned_sentence.split() # Stopword Removal if language in stopwords.fileids(): stop_words = set(stopwords.words(language)) cleaned_words = [w for w in words if not w in stop_words] else: cleaned_words = words # Stemming stemmed_words = [] if language in list(SnowballStemmer.languages): stemmer = SnowballStemmer(language=language) for word in cleaned_words: word = stemmer.stem(word) stemmed_words.append(word) else: stemmed_words = cleaned_words # Lemma lemmatized_words = [] if pycountry.languages.get(name=language) is not None: for word in cleaned_words: word = lemmatize(word, pycountry.languages.get(name=language).alpha_2) lemmatized_words.append(word) else: lemmatized_words = cleaned_words # freq_matrix will be used in summary extraction freq_matrix[sentence[:15]] = dict(Counter(stemmed_words)) # add stemmed sentence to the cleaned_text cleaned_sentence = " ".join(stemmed_words) cleaned_text += cleaned_sentence.strip() cleaned_text += ' ' # lemmatized text will be used in topic extraction lemmatized_text = " ".join(lemmatized_words) lemma_text += lemmatized_text.strip() lemma_text += ' ' # Normalize language percentages total = sum(languages.values(), float(0)) n_languages = {k: v * 100 / total for k, v in languages.items()} languages = n_languages # Add languages dictionary to the corpus dictionary if corpus_languages is not {}: corpus_languages = dict(Counter(corpus_languages) + Counter(languages)) else: corpus_languages = languages # Find language most used in the text text_dict['language'] = max(languages, key=languages.get) # calculate language_distribution where all languages have percentages based on the sentences each language was detected total = sum(languages.values(), float(0)) unknown_language_perc = 100 for k, v in languages.items(): if total >= 100: new_v = v * 100 / total text_dict['language_distribution'].append( {'name': text_dict['name'], 'language': k, "percentage": new_v}) else: text_dict['language_distribution'].append( {'name': text_dict['name'], 'language': k, "percentage": v}) unknown_language_perc -= v # Summary Extraction if len(file_contents.replace(" ", "")) > 300: ''' Term frequency (TF) is how often a word appears in a document, divided by how many words are there in a document. ''' # Calculate TermFrequency and generate a matrix tf_matrix = __create_tf_matrix(freq_matrix) # creating table for documents per words count_doc_per_words = __create_documents_per_words(freq_matrix) ''' Inverse document frequency (IDF) is how unique or rare a word is. ''' # Calculate IDF and generate a matrix idf_matrix = __create_idf_matrix(freq_matrix, count_doc_per_words, sentences_count) # Calculate TF-IDF and generate a matrix tf_idf_matrix = __create_tf_idf_matrix(tf_matrix, idf_matrix) # Important Algorithm: score the sentences sentence_scores = __score_sentences(tf_idf_matrix) # Find the threshold threshold = __find_average_score(sentence_scores) # Important Algorithm: Generate the summary summary = __generate_summary(sentences, sentence_scores, 1.8 * threshold) if not summary: summary = __generate_summary(sentences, sentence_scores, threshold) text_dict['summary'] = summary else: text_dict['summary'] = summary else: text_dict['summary'] = file_contents # Topic Extraction corpus = [lemma_text.split(' ')] dic = gensim.corpora.Dictionary(corpus) bow_corpus = [dic.doc2bow(doc) for doc in corpus] lda_model = gensim.models.LdaModel(bow_corpus, num_topics=1, id2word=dic, passes=100, iterations=100, random_state=2023, alpha='asymmetric') text_dict['topics'] = list( [token for token, score in lda_model.show_topic(i, topn=10)] for i in range(0, lda_model.num_topics))[0] # Sentiment Analysis sia = SentimentIntensityAnalyzer() compound_score = sia.polarity_scores(file_contents)['compound'] text_dict['sentiment'] = compound_score profile_dict['table']['sentiment'] += compound_score if compound_score > 0: if compound_score >= 0.5: compound_scores['(0.5, 1)'] += 1 else: compound_scores['(0, 0.5)'] += 1 elif compound_score < 0: if compound_score <= -0.5: compound_scores['(-1, -0.5)'] += 1 else: compound_scores['(-0.5, 0)'] += 1 profile_dict['table']['sentiment_analysis']['compound_mean'] += compound_score # Named Entity Extraction named_entities = {} for X in doc.ents: sentence = X.text for val in string.punctuation: if val not in "'": if val in "-": sentence = sentence.replace(val, " ") else: sentence = sentence.replace(val, "") sentence = ' '.join(sentence.split()).strip() named_entities[sentence] = X.label_ for ne, neType in named_entities.items(): text_dict['named_entities'].append({'named_entity': ne, "type": neType}) # Term Frequency data_analysis = dict( sorted(nltk.FreqDist(nltk.word_tokenize(cleaned_text)).items(), key=lambda item: item[1], reverse=True)) dict_term_freq = dict(Counter(dict_term_freq) + Counter(data_analysis)) for term, v in data_analysis.items(): text_dict['term_frequency'].append({'term': term, "count": v}) # calculate special_characters_distribution (FrequencyDistr) for k, v in special_chars.items(): text_dict['special_characters_distribution'].append( {'name': text_dict['name'], 'type': k, "count": v}) # calculate sentence_length_distribution s = pd.Series(sentence_length_list) stats = s.describe(percentiles=[.10, .25, .75, .90]) text_dict['sentence_length_distribution'] = { 'name': text_dict['name'], 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } # calculate word_length_distribution s = pd.Series(word_length_list) stats = s.describe(percentiles=[.10, .25, .75, .90]) text_dict['word_length_distribution'] = { 'name': text_dict['name'], 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } profile_dict['variables'].append(text_dict) # Calculate number of distinct words in the corpus profile_dict['table']['num_distinct_words'] = len(set_of_unique_words) # Calculate ratio_uppercase, ratio_digits, ratio_special_characters in the corpus profile_dict['table']['ratio_uppercase'] /= profile_dict['table']['num_characters'] profile_dict['table']['ratio_digits'] /= profile_dict['table']['num_characters'] profile_dict['table']['ratio_special_characters'] /= profile_dict['table']['num_characters'] # Calculate language distribution in the corpus languages = {k: v / profile_dict['table']['num_texts'] for k, v in corpus_languages.items()} total = sum(languages.values(), float(0)) unknown_language_perc = 100 for k, v in languages.items(): if total >= 100: new_v = v * 100 / total profile_dict['table']['language_distribution'].append({'language': k, "percentage": new_v}) else: profile_dict['table']['language_distribution'].append({'language': k, "percentage": v}) unknown_language_perc -= v if total < 100: profile_dict['table']['language_distribution'].append( {'language': "unknown", "percentage": unknown_language_perc}) # Calculate Sentiment analysis for the corpus profile_dict['table']['sentiment'] /= profile_dict['table']['num_texts'] profile_dict['table']['sentiment_analysis']['compound_levels'] = compound_scores profile_dict['table']['sentiment_analysis']['compound_mean'] /= profile_dict['table']['num_texts'] # Calculate term frequency for the corpus data_analysis = dict(sorted(dict_term_freq.items(), key=lambda item: item[1], reverse=True)) for term, v in data_analysis.items(): profile_dict['table']['term_frequency'].append({'term': term, "count": v}) now = datetime.now() end_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_end'] = end_string profile_dict['analysis']['duration'] = str( dateutil.parser.parse(profile_dict['analysis']['date_end']) - dateutil.parser.parse( profile_dict['analysis']['date_start'])) return profile_dict
# ----------- MAIN FUNCTION ----------#
[docs]def profile_text_with_config(config: dict) -> None: """ This method performs profiling on text data and write the resulting profile dictionary based on a configuration dictionary. :param config: a dictionary with all configuration settings. :type config: dict :return: None. :rtype: None """ input_dir_path = config['input']['path'] input_file_name = config['input']['file'] output_dir_path = config['output']['path'] output_json_name = config['output']['json'] # Create input file path only_directory_path = False if input_file_name == '': my_path = os.path.abspath(input_dir_path) only_directory_path = True else: my_path = os.path.abspath(os.path.join(input_dir_path, input_file_name)) # Create output file paths output_dir_path = os.path.abspath(output_dir_path) output_json_path = os.path.abspath(os.path.join(output_dir_path, output_json_name)) # Run raster profile if only_directory_path: print('You are running text profile for multiple text files!\n' 'Please make sure you have the right format for the text files.') if 'format' not in config['input']: print("No format is specified so the default '.txt' is used.") text_format: str = '.txt' else: text_format: str = str(config['input']['format']).lower() profile_dict = profile_text(my_path=my_path, text_format=text_format) else: profile_dict = profile_text(my_path=my_path) # Write resulting profile dictionary write_to_json(profile_dict, output_json_path)
[docs]def profile_text(my_path: str, text_format: str = '.txt'): """ This method performs profiling and generates a profiling dictionary for either a single text or many texts. :param my_path: the path to either a text file or a folder that has text files. :type my_path: str :param text_format: the suffix of the texts that exist in the folder if the given path is a folder path. :type text_format: str, optional :return: A dict which contains the results of the profiler for the text or texts. :rtype: dict """ if os.path.isfile(my_path): profile_dict = profile_single_text(my_path) elif os.path.isdir(my_path): profile_dict = profile_multiple_texts(my_path, text_format) else: profile_dict = dict() return profile_dict
# ---------- HIERARCHICAL ---------#
[docs]def profile_hierarchical_with_config(config: dict) -> None: """ This method performs profiling on hierarchical data and write the resulting profile dictionary based on a configuration dictionary. :param config: a dictionary with all configuration settings. :type config: dict :return: None. :rtype: None """ input_dir_path = config['input']['path'] input_file_name = config['input']['file'] output_dir_path = config['output']['path'] output_json_name = config['output']['json'] # Create input file path my_file_path = '' if input_file_name == '': print('No input file was found for hierarchical profile!') return None else: my_file_path = os.path.abspath(os.path.join(input_dir_path, input_file_name)) # Create output file paths output_dir_path = os.path.abspath(output_dir_path) output_json_path = os.path.abspath(os.path.join(output_dir_path, output_json_name)) # Run raster profile profile_dict = profile_hierarchical(my_file_path=my_file_path) # Write resulting profile dictionary write_to_json(profile_dict, output_json_path)
# TODO: Add num_attributes (number of distinct tags)
[docs]def profile_hierarchical(my_file_path: str) -> dict: """ This method performs profiling and generates a profiling dictionary for a given json file that exists in the given path. :param my_file_path: the path to a json file. :type my_file_path: str :return: A dict which contains the results of the profiler for the json. :rtype: dict """ profile_dict = { 'analysis': { 'title': 'Profiling Report', 'date_start': '', 'date_end': '', 'duration': '', 'filenames': [my_file_path] }, 'table': { 'profiler_type': 'Hierarchical', 'byte_size': 0, 'num_records': 0, 'depth_distribution': dict() }, 'variables': [], 'package': { 'pandas_profiling_version': 'v3.5.0', 'pandas_profiling_config': '' } } now = datetime.now() start_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_start'] = start_string # File size profile_dict['table']['byte_size'] = os.path.getsize(my_file_path) data = Data(my_file_path) profile = Profiler(data, profiler_type='structured') readable_report = profile.report(report_options={'output_format': 'pretty'}) profile_dict['table']['num_records'] = readable_report['global_stats']['column_count'] depth = dict() variables = readable_report['data_stats'] for var in variables: attr = { 'name': var['column_name'], 'type': var['data_type'], 'uniqueness': var['statistics']['unique_ratio'], 'nesting_level': 0 } levels = var['column_name'].split('.') attr['nesting_level'] = len(levels) - 1 for level in range(0, attr['nesting_level'] + 1): if level in depth.keys(): depth[level].add(levels[level]) else: depth[level] = {levels[level]} profile_dict['variables'].append(attr) unique_levels = [] for level, names in depth.items(): for name in names: unique_levels.append(level) s = pd.Series(unique_levels) stats = s.describe(percentiles=[.10, .25, .75, .90]) profile_dict['table']['depth_distribution'] = { 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } now = datetime.now() end_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_end'] = end_string profile_dict['analysis']['duration'] = str( dateutil.parser.parse(profile_dict['analysis']['date_end']) - dateutil.parser.parse( profile_dict['analysis']['date_start'])) return profile_dict
# ---------- RDF-GRAPH ---------#
[docs]def profile_rdfGraph_with_config(config: dict) -> None: """ This method performs profiling on rdfGraph data and write the resulting profile dictionary based on a configuration dictionary. :param config: a dictionary with all configuration settings. :type config: dict :return: None. :rtype: None """ input_dir_path = config['input']['path'] input_file_name = config['input']['file'] output_dir_path = config['output']['path'] output_json_name = config['output']['json'] # Create input file path my_file_path = '' if input_file_name == '': print('No input file was found for rdfGraph profile!') return None else: my_file_path = os.path.abspath(os.path.join(input_dir_path, input_file_name)) # Create output file paths output_dir_path = os.path.abspath(output_dir_path) output_json_path = os.path.abspath(os.path.join(output_dir_path, output_json_name)) # Run raster profile if 'serialization' not in config['input']: print("No rdflib format is specified so the default 'application/rdf+xml' is used.") parse_format: str = 'application/rdf+xml' else: parse_format: str = str(config['input']['serialization']).lower() profile_dict = profile_rdfGraph(my_file_path=my_file_path, parse_format=parse_format) # Write resulting profile dictionary write_to_json(profile_dict, output_json_path)
[docs]def profile_rdfGraph(my_file_path: str, parse_format: str = 'application/rdf+xml'): """ This method performs profiling and generates a profiling dictionary for a given rdf file that exists in the given path. :param my_file_path: the path to a rdf file. :type my_file_path: str :param parse_format: the format of the rdf file. (see rdflib package to find the available formats e.g. 'turtle', 'application/rdf+xml', 'n3', 'nt', etc.) :type parse_format: str, optional :return: A dict which contains the results of the profiler for the rdf. :rtype: dict """ # Calculate the number of nodes def __calc_num_nodes(g: Graph): return len(g.all_nodes()) # Calculate the number of edges def __calc_num_edges(g: Graph): return len(g) # Calculate the number of namespaces def __calc_num_namespaces(g: Graph): v = g.serialize(format="ttl") return v.count('@prefix') # Calculate the number of classes and a class frequency list def __calc_class_features(g: Graph): num_classes = set() classes_distribution = dict() for cl in g.objects(predicate=RDF.type): if str(cl) not in classes_distribution: classes_distribution[str(cl)] = 0 classes_distribution[str(cl)] += 1 num_classes.add(str(cl)) # List of classes and their frequencies in the graph class_distribution_list = [] for c, v in sorted(classes_distribution.items(), key=lambda x: x[1], reverse=True): class_dict = dict({ 'class_name': c, 'count': v }) class_distribution_list.append(class_dict) return len(num_classes), class_distribution_list # Calculate the number of object type properties def __calc_num_object_properties(g: Graph): # Extract set from objects of triples object_list = {x for x in g.objects() if isinstance(x, URIRef)} # Append set extracted from subjects of triples object_list.update({x for x in g.subjects() if isinstance(x, URIRef)}) return len(object_list) # Calculate the number of data type properties def __calc_num_datatype_properties(g: Graph): data_property_list = {x for x in g.objects() if not isinstance(x, URIRef)} return len(data_property_list) # Calculate the number of connected components and a list with each connected component and its number of nodes def __calc_cc_features(nx_g: nx.MultiDiGraph): nx_g_undirected = nx_g.to_undirected() cc = list(nx.connected_components(nx_g_undirected)) cc_list = [] for i, c in enumerate(cc): cc_dict = dict({ 'component_name': i, 'num_nodes': len(c) }) cc_list.append(cc_dict) return len(cc), cc_list # Calculate the density of the graph def __calc_density(nx_g: nx.MultiDiGraph): nx_g_density = nx.density(nx_g) return nx_g_density # Calculate the degree_centrality_distribution def __calc_degree_centrality(nx_g: nx.MultiDiGraph): dc = nx.degree_centrality(nx_g) degrees_centrality = [] for _, v in dc.items(): degrees_centrality.append(v) s = pd.Series(degrees_centrality) stats = s.describe(percentiles=[.10, .25, .75, .90]) degree_centrality_distribution = { 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } return degree_centrality_distribution # Calculate the degree_distribution def __calc_degree(nx_g: nx.MultiDiGraph): degrees = [] for _, v in nx_g.degree: degrees.append(v) s = pd.Series(degrees) stats = s.describe(percentiles=[.10, .25, .75, .90]) degree_distribution = { 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } return degree_distribution # Calculate the in_degree_distribution def __calc_in_degree(nx_g: nx.MultiDiGraph): in_degrees = [] for _, v in nx_g.in_degree: in_degrees.append(v) s = pd.Series(in_degrees) stats = s.describe(percentiles=[.10, .25, .75, .90]) in_degrees_distribution = { 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } return in_degrees_distribution # Calculate the out_degree_distribution def __calc_out_degree(nx_g: nx.MultiDiGraph): out_degrees = [] for _, v in nx_g.out_degree: out_degrees.append(v) s = pd.Series(out_degrees) stats = s.describe(percentiles=[.10, .25, .75, .90]) out_degrees_distribution = { 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } return out_degrees_distribution profile_dict = { 'analysis': { 'title': 'Profiling Report', 'date_start': '', 'date_end': '', 'duration': '', 'filenames': [my_file_path] }, 'table': { 'profiler_type': 'RDFGraph', 'byte_size': 0, 'num_nodes': 0, 'num_edges': 0, 'num_namespaces': 0, 'num_classes': 0, 'num_object_properties': 0, 'num_datatype_properties': 0, 'density': 0, 'num_connected_components': 0, 'connected_components': [], 'degree_centrality_distribution': dict(), 'degree_distribution': dict(), 'in_degree_distribution': dict(), 'out_degree_distribution': dict(), 'class_distribution': [] }, 'variables': [], 'package': { 'pandas_profiling_version': 'v3.5.0', 'pandas_profiling_config': '' } } # Start time now = datetime.now() start_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_start'] = start_string # File size profile_dict['table']['byte_size'] = os.path.getsize(my_file_path) g = Graph() g.parse(my_file_path, format=parse_format) # Number of nodes profile_dict['table']['num_nodes'] = __calc_num_nodes(g) # Number of edges profile_dict['table']['num_edges'] = __calc_num_edges(g) # Number of namespaces profile_dict['table']['num_namespaces'] = __calc_num_namespaces(g) # Number of Classes + class_distribution profile_dict['table']['num_classes'], profile_dict['table']['class_distribution'] = __calc_class_features(g) # Number of Object type properties profile_dict['table']['num_object_properties'] = __calc_num_object_properties(g) # Number of Data type properties profile_dict['table']['num_datatype_properties'] = __calc_num_datatype_properties(g) # Create networkx graph nx_g = rdflib_to_networkx_multidigraph(g) # Number of connected components + List of connected components profile_dict['table']['num_connected_components'], profile_dict['table'][ 'connected_components'] = __calc_cc_features( nx_g) # Density profile_dict['table']['density'] = __calc_density(nx_g) # Calculate degree_centrality_distribution profile_dict['table']['degree_centrality_distribution'] = __calc_degree_centrality(nx_g) # Calculate degree_distribution profile_dict['table']['degree_distribution'] = __calc_degree(nx_g) # Calculate in_degree_distribution profile_dict['table']['in_degree_distribution'] = __calc_in_degree(nx_g) # Calculate out_degree_distribution profile_dict['table']['out_degree_distribution'] = __calc_out_degree(nx_g) # End time now = datetime.now() end_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_end'] = end_string # Time Difference profile_dict['analysis']['duration'] = str( dateutil.parser.parse(profile_dict['analysis']['date_end']) - dateutil.parser.parse( profile_dict['analysis']['date_start'])) return profile_dict
# ------ VISTA (RHD, RAS FILES) ------#
[docs]def profile_vista_rasters_with_config(config: dict) -> None: """ This method performs profiling on ras data and write the resulting profile dictionary based on a configuration dictionary. :param config: a dictionary with all configuration settings. :type config: dict :return: None. :rtype: None """ input_ras_path = config['input']['ras_path'] input_ras_file = config['input']['ras_file'] input_rhd_path = config['input']['rhd_path'] input_rhd_file = config['input']['rhd_file'] output_dir_path = config['output']['path'] output_json_name = config['output']['json'] # Create input ras and rhd file paths my_ras_file_path = '' if input_ras_file == '': print('No input ras file was found for vista profile!') return None else: my_ras_file_path = os.path.abspath(os.path.join(input_ras_path, input_ras_file)) my_rhd_file_path = '' if input_rhd_file == '': print('No input rhd file was found for vista profile!') return None else: my_rhd_file_path = os.path.abspath(os.path.join(input_rhd_path, input_rhd_file)) # Create output file paths output_dir_path = os.path.abspath(output_dir_path) output_json_path = os.path.abspath(os.path.join(output_dir_path, output_json_name)) # Run raster profile profile_dict = profile_vista_rasters(rhd_datapath=my_rhd_file_path, ras_datapath=my_ras_file_path) # Write resulting profile dictionary write_to_json(profile_dict, output_json_path)
[docs]def profile_vista_rasters(rhd_datapath: str, ras_datapath: str): """ This method performs profiling and generates a profiling dictionary for a given ras file that exists in the given path using the contents of a rhd file that exists in the given path. :param rhd_datapath: the path to a rhd file. :type rhd_datapath: str :param ras_datapath: the path to a ras file. :type ras_datapath: str :return: A dict which contains the results of the profiler for the ras. :rtype: dict """ def __read_image_rhd(rhd_datapath: str): with open(rhd_datapath, 'r') as f: lines = f.readlines() vista_data_type = int(lines[0]) n_of_LAI = int(lines[1]) split_third_row = " ".join(lines[2].split()).split(' ') columns = int(split_third_row[0]) rows = int(split_third_row[1]) split_fourth_row = " ".join(lines[3].split()).split(' ') resolution = float(split_fourth_row[0]) upper_left_corner_x = float(split_fourth_row[1]) upper_left_corner_y = float(split_fourth_row[2]) UTM_x = float(split_fourth_row[3]) UTM_y = float(split_fourth_row[4]) UTM_zone = str(split_fourth_row[5]) LAI_images = {'vista_data_type': vista_data_type, 'resolution': resolution, 'upper_left_corner_x': upper_left_corner_x, 'upper_left_corner_y': upper_left_corner_y, 'rows': rows, 'columns': columns, 'UTM_x': UTM_x, 'UTM_y': UTM_y, 'UTM_zone': UTM_zone} count_LAI_images = 0 LAI_images['images'] = {} for value_LAI in range(5, n_of_LAI + 5): ras_file_name = rhd_datapath.split('/')[-1].split('.')[0] img_name = ras_file_name + '_' + str(count_LAI_images) prev_img_name = ras_file_name + '_' + str(count_LAI_images - 1) split_row = " ".join(lines[value_LAI].split()).split(' ') LAI_images['images'][img_name] = {} img_bytes = int(split_row[0]) LAI_images['images'][img_name]['bytes'] = img_bytes LAI_images['images'][img_name]['date'] = datetime.strptime( split_row[3] + ' ' + split_row[2] + ' ' + split_row[1], '%d %m %Y').date() record_length = img_bytes * columns LAI_images['images'][img_name]['record_length_bytes'] = record_length if count_LAI_images == 0: LAI_images['images'][img_name]['image_start_pos_bytes'] = 0 else: LAI_images['images'][img_name]['image_start_pos_bytes'] = LAI_images['images'][prev_img_name][ 'image_start_pos_bytes'] + (( record_length / img_bytes) * rows) count_LAI_images += 1 return LAI_images ras_dict = __read_image_rhd(rhd_datapath) profile_dict = { 'analysis': { 'title': 'Profiling Report', 'date_start': '', 'date_end': '', 'duration': '', 'filenames': [rhd_datapath, ras_datapath] }, 'table': { 'profiler_type': 'Vista_Raster', 'byte_size': 0, 'n_of_imgs': len(ras_dict['images']), 'avg_width': 0, 'avg_height': 0, 'combined_bands': [] }, 'variables': [], 'package': { 'pandas_profiling_version': 'v3.5.0', 'pandas_profiling_config': '' } } # initialize .ras NODATA value counts ras_missing_count = 0 ras_forest_count = 0 ras_urban_count = 0 ras_water_count = 0 ras_snow_count = 0 ras_cloud_shadow_buffer_count = 0 ras_cloud_shadow_count = 0 ras_cloud_buffer_count = 0 ras_cirrus_clouds_count = 0 ras_clouds_count = 0 __lai_f = lambda x: float(str(x)[:-4]) / 40.0 if (x > 99) else (x if (x < 0) else -999) # Start time now = datetime.now() start_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_start'] = start_string img_names = [] with open(ras_datapath, 'r+') as f: ras_file_name = ras_datapath.split('/')[-1].split('.')[0] if ras_dict['vista_data_type'] == 7: ras_file_array = np.fromfile(f, dtype=np.int16).astype(float) ras_file_array[np.where(ras_file_array > 0)] = list( map(__lai_f, ras_file_array[np.where(ras_file_array > 0)])) n_of_imgs = len(ras_dict['images']) for n_img in range(0, n_of_imgs): # Create image dictionary img_dict = { 'name': '', 'type': 'Raster', 'crs': '', 'date': '', 'spatial_coverage': '', 'spatial_resolution': { 'pixel_size_x': 0, 'pixel_size_y': 0 }, 'no_data_value': '', 'format': '' } img_name = ras_file_name + '_' + str(n_img) img_names.append(img_name) # image name img_dict['name'] = img_name next_img_name = ras_file_name + '_' + str(n_img + 1) if n_img == n_of_imgs - 1: start_pos = int(ras_dict['images'][img_name]['image_start_pos_bytes']) end_pos = len(ras_file_array) else: start_pos = int(ras_dict['images'][img_name]['image_start_pos_bytes']) end_pos = int(ras_dict['images'][next_img_name]['image_start_pos_bytes']) # data of the image img_data = ras_file_array[start_pos:end_pos] img_data = img_data.reshape((ras_dict['rows'], ras_dict['columns'])) # Find Image General Data upper_left_corner_x = ras_dict['upper_left_corner_x'] upper_left_corner_y = ras_dict['upper_left_corner_y'] UTM_x = ras_dict['UTM_x'] UTM_y = ras_dict['UTM_y'] transform = from_origin(upper_left_corner_x, upper_left_corner_y, UTM_x, UTM_y) # create in-memory rasterio image mem_file = MemoryFile() with mem_file.open(driver='GTiff', height=ras_dict['rows'], width=ras_dict['columns'], count=1, dtype=str(ras_file_array.dtype), crs='+proj=utm +zone=32', transform=transform) as img: img.update_tags(date=ras_dict['images'][img_name]['date']) # image general metadata img_dict.update(img.meta) # image size profile_dict['table']['byte_size'] += img_dict['width'] * img_dict['height'] * 4 # image date img_dict['date'] = ras_dict['images'][img_name]['date'].strftime("%Y-%m-%d %H:%M:%S.%f") # making transform JSON-serializable img_dict['transform'] = list(img_dict['transform']) profile_dict['table']['avg_width'] += img_dict['width'] profile_dict['table']['avg_height'] += img_dict['height'] # change nodata and driver keys img_dict['no_data_value'] = img_dict['nodata'] del img_dict['nodata'] img_dict['format'] = img_dict['driver'] del img_dict['driver'] # change crs format if img.crs is not None: crs_list = CRS.from_string(str(img_dict['crs'])) img_dict['crs'] = 'EPSG:' + str(crs_list.to_epsg()) else: img_dict['crs'] = 'EPSG:4326' # calculate spatial resolution pixelSizeX, pixelSizeY = img.res img_dict['spatial_resolution']['pixel_size_x'] = pixelSizeX img_dict['spatial_resolution']['pixel_size_y'] = pixelSizeY # calculate spatial coverage # Bounding box (in the original CRS) bounds = img.bounds xmin, ymin, xmax, ymax = transform_bounds(CRS.from_string(img_dict['crs']), CRS.from_epsg(4326), *bounds) geom = box(xmin, ymin, xmax, ymax) img_dict['spatial_coverage'] = geom.wkt img.close() # statistics for LAI band img_dict['bands'] = [] s = pd.Series(img_data[np.where(img_data > 0)]) stats = s.describe(percentiles=[.10, .25, .75, .90]) band_uuid = str(uuid.uuid4()) band_dict = { 'uuid': band_uuid, 'name': 'LAI', 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], 'no_data_distribution': [] } # percentages of no_data values img_no_data = img_data[np.where(img_data < 0)] width = img_dict['width'] height = img_dict['height'] missing_count = np.count_nonzero(img_no_data == -999) forest_count = np.count_nonzero(img_no_data == -961) urban_count = np.count_nonzero(img_no_data == -950) water_count = np.count_nonzero(img_no_data == -940) snow_count = np.count_nonzero(img_no_data == -930) cloud_shadow_buffer_count = np.count_nonzero(img_no_data == -923) cloud_shadow_count = np.count_nonzero(img_no_data == -920) cloud_buffer_count = np.count_nonzero(img_no_data == -913) cirrus_clouds_count = np.count_nonzero(img_no_data == -911) clouds_count = np.count_nonzero(img_no_data == -910) # add NODATA value counts to the .ras NODATA value counts ras_missing_count += missing_count ras_forest_count += forest_count ras_urban_count += urban_count ras_water_count += water_count ras_snow_count += snow_count ras_cloud_shadow_buffer_count += cloud_shadow_buffer_count ras_cloud_shadow_count += cloud_shadow_count ras_cloud_buffer_count += cloud_buffer_count ras_cirrus_clouds_count += cirrus_clouds_count ras_clouds_count += clouds_count no_data_dict = { 'missing': (missing_count / (width * height)) * 100, 'forest': (forest_count / (width * height)) * 100, 'urban': (urban_count / (width * height)) * 100, 'water': (water_count / (width * height)) * 100, 'snow': (snow_count / (width * height)) * 100, 'cloud_shadow_buffer': (cloud_shadow_buffer_count / (width * height)) * 100, 'cloud_shadow': (cloud_shadow_count / (width * height)) * 100, 'cloud_buffer': (cloud_buffer_count / (width * height)) * 100, 'cirrus_clouds': (cirrus_clouds_count / (width * height)) * 100, 'clouds': (clouds_count / (width * height)) * 100 } for k, v in no_data_dict.items(): band_dict['no_data_distribution'].append( {'uuid': band_uuid, 'value': k, 'percentage': v} ) img_dict['bands'].append(band_dict) profile_dict['variables'].append(img_dict) # calculate combined stats combined_band_stats_dict = { 'name': 'LAI', 'n_of_imgs': profile_dict['table']['n_of_imgs'], 'img_names': img_names, 'count': 0, 'min': math.inf, 'average': 0, 'max': -math.inf, 'variance': 0, 'no_data_distribution': [] } for image in profile_dict['variables']: lai_band = image['bands'][0] combined_band_stats_dict['count'] += lai_band['count'] combined_band_stats_dict['average'] += lai_band['average'] * lai_band['count'] if lai_band['min'] < combined_band_stats_dict['min']: combined_band_stats_dict['min'] = lai_band['min'] if lai_band['max'] > combined_band_stats_dict['max']: combined_band_stats_dict['max'] = lai_band['max'] combined_band_stats_dict['average'] = combined_band_stats_dict['average'] / combined_band_stats_dict['count'] # calculate combined_variance # comb_var = (n*std1 + n*d_sqrt1 + m*std2 + m*d_sqrt2 + k*std3 + k*d_sqrt3)/ n + m + k for image in profile_dict['variables']: lai_band = image['bands'][0] count = lai_band['count'] std = lai_band['stddev'] mean = lai_band['average'] comb_mean = combined_band_stats_dict['average'] d_sqrt = (mean - comb_mean) * (mean - comb_mean) combined_band_stats_dict['variance'] += count * std + count * d_sqrt # calculate no_data_distribution for LAI of the .ras width_all = profile_dict['table']['avg_width'] height_all = profile_dict['table']['avg_height'] no_data_dict = { 'missing': ((ras_missing_count * n_of_imgs) / (width_all * height_all)) * 100, 'forest': ((ras_forest_count * n_of_imgs) / (width_all * height_all)) * 100, 'urban': ((ras_urban_count * n_of_imgs) / (width_all * height_all)) * 100, 'water': ((ras_water_count * n_of_imgs) / (width_all * height_all)) * 100, 'snow': ((ras_snow_count * n_of_imgs) / (width_all * height_all)) * 100, 'cloud_shadow_buffer': ((ras_cloud_shadow_buffer_count * n_of_imgs) / (width_all * height_all)) * 100, 'cloud_shadow': ((ras_cloud_shadow_count * n_of_imgs) / (width_all * height_all)) * 100, 'cloud_buffer': ((ras_cloud_buffer_count * n_of_imgs) / (width_all * height_all)) * 100, 'cirrus_clouds': ((ras_cirrus_clouds_count * n_of_imgs) / (width_all * height_all)) * 100, 'clouds': ((ras_clouds_count * n_of_imgs) / (width_all * height_all)) * 100 } for k, v in no_data_dict.items(): combined_band_stats_dict['no_data_distribution'].append( {'name': 'LAI', 'value': k, 'percentage': v} ) profile_dict['table']['combined_bands'].append(combined_band_stats_dict) # calculate avg_width and avg_height of .ras file profile_dict['table']['avg_width'] = profile_dict['table']['avg_width'] / profile_dict['table']['n_of_imgs'] profile_dict['table']['avg_height'] = profile_dict['table']['avg_height'] / profile_dict['table']['n_of_imgs'] # End time now = datetime.now() end_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_end'] = end_string # Time Difference profile_dict['analysis']['duration'] = str( dateutil.parser.parse(profile_dict['analysis']['date_end']) - dateutil.parser.parse( profile_dict['analysis']['date_start'])) return profile_dict
# ---------- OTHER FUNCTIONS ---------#
[docs]def read_config(json_file: str) -> dict: """ This method reads configuration settings from a json file. Configuration includes all parameters for input/output. :param json_file: path to .json file that contains the configuration parameters. :type json_file: str :return: A dictionary with all configuration settings. :rtype: dict """ try: config_dict: dict = json.loads(json_file) except ValueError as e: with open(json_file) as f: config_dict: dict = json.load(f) return config_dict return config_dict
[docs]def write_to_json(output_dict: dict, output_file: Union[str, Path]) -> None: """ Write the profile dictionary to a file. :param output_dict: the profile dictionary that will writen. :type output_dict: dict :param output_file: The name or the path of the file to generate including the extension (.json). :type output_file: Union[str, Path] :return: a dict which contains the results of the profiler for the texts. :rtype: dict """ if not isinstance(output_file, Path): output_file = Path(str(output_file)) # create image folder if it doesn't exist path = Path(str(output_file.parent)) path.mkdir(parents=True, exist_ok=True) if output_file.suffix == ".json": with open(output_file, "w") as outfile: def encode_it(o: Any) -> Any: if isinstance(o, dict): return {encode_it(k): encode_it(v) for k, v in o.items()} else: if isinstance(o, (bool, int, float, str)): return o elif isinstance(o, list): return [encode_it(v) for v in o] elif isinstance(o, set): return {encode_it(v) for v in o} elif isinstance(o, (pd.DataFrame, pd.Series)): return encode_it(o.to_dict('records')) elif isinstance(o, np.ndarray): return encode_it(o.tolist()) elif isinstance(o, np.generic): return o.item() else: return str(o) output_dict = encode_it(output_dict) json.dump(output_dict, outfile, indent=3) else: suffix = output_file.suffix warnings.warn( f"Extension {suffix} not supported. For now we assume .json was intended. " f"To remove this warning, please use .json." )
# --------------- READ ---------------# def __read_files(my_file, header=None, sep=',', encoding='UTF-8'): try: df = pd.read_csv(my_file, header=header, sep=sep, encoding=encoding) except: return pd.DataFrame() return df def __profile_timeseries_main(my_file_path: str, time_column: str, header: int = 0, sep: str = ',', mode: str = "default", minimal: bool = True): df = __read_files(my_file_path, header, sep) df[time_column] = pd.to_datetime(df[time_column]) if minimal: config_file = get_config("config_minimal.yaml") with open(config_file) as f: data = yaml.safe_load(f) config: Settings = Settings().parse_obj(data) else: config: Settings = Settings() config.progress_bar = False config.vars.num.quantiles.append(0.10) config.vars.num.quantiles.append(0.90) sample_timeseries: Container = None html_dict = None if mode == 'default' and len(df.columns) > 2: sample_time_series = __create_sample_df(df, time_column) config_file = get_config("config_minimal.yaml") with open(config_file) as f: data = yaml.safe_load(f) new_config: Settings = Settings().parse_obj(data) new_config.progress_bar = False new_config.vars.timeseries.active = True # if autocorrelation test passes then numeric timeseries else 'real' numeric new_config.vars.timeseries.autocorrelation = 0.3 typeset = ProfilingTypeSet(new_config) custom_summarizer = PandasProfilingSummarizer(typeset) custom_summarizer.mapping['TimeSeries'].append(__new_numeric_summary) profile = ProfileReport(sample_time_series, tsmode=True, title="Profiling Report", sortby=time_column, summarizer=custom_summarizer, config=new_config, progress_bar=False) html_dict = profile.description_set html_dict['table']['profiler_type'] = 'TimeSeries' html_dict['analysis']['title'] = 'Profiling Report' html_dict['analysis']['filenames'] = list(my_file_path) # Create a container of timeseries samples which will be used in the html report = __get_report_structure(new_config, html_dict) variables = report.content['body'].content['items'][1] item = variables.content['item'].content['items'] sample_timeseries = Container( item, sequence_type="accordion", name="Sample TimeSeries", anchor_id="sample-timeseries-variables", ) # Fill missing values as tsfresh cannot handle them time_series_stacked = df.melt(id_vars=[time_column], value_vars=df.columns[1:], value_name='value', var_name='id') time_series_stacked = time_series_stacked.reindex(columns=[time_column, 'value', 'id']) time_series_stacked.rename(columns={time_column: 'time'}, inplace=True) time_series_stacked['time'] = pd.to_datetime(time_series_stacked['time']).apply(lambda x: x.value) if __is_not_finite(time_series_stacked['value']).any(): time_series_stacked['value'] = __replace_missing_inf_values(time_series_stacked['value']) # Run tsfresh json_decoded = __read_json_file_tsfresh(tsfresh_json_file) ts_fresh_results = __ts_fresh_json(time_series_stacked, json_decoded, no_time=False) config.progress_bar = True profile = ProfileReport(ts_fresh_results, config=config, title="Profiling Report", minimal=minimal) html_dict = profile.description_set html_dict['table']['profiler_type'] = 'TimeSeries' html_dict['analysis']['title'] = 'Profiling Report' html_dict['analysis']['filenames'] = [my_file_path] # Files size html_dict['table']['byte_size'] = os.path.getsize(my_file_path) elif mode == 'verbose' or len(df.columns) == 2: config.vars.timeseries.active = True config.progress_bar = True # if autocorrelation test passes then numeric timeseries else 'real' numeric config.vars.timeseries.autocorrelation = 0.3 typeset = ProfilingTypeSet(config) custom_summarizer = PandasProfilingSummarizer(typeset) custom_summarizer.mapping['TimeSeries'].append(__new_numeric_summary) profile = ProfileReport(df, tsmode=True, title="Profiling Report", sortby=time_column, summarizer=custom_summarizer, config=config, progress_bar=True) html_dict = profile.description_set html_dict['table']['profiler_type'] = 'TimeSeries' html_dict['analysis']['title'] = 'Profiling Report' html_dict['analysis']['filenames'] = [my_file_path] # Files size html_dict['table']['byte_size'] = os.path.getsize(my_file_path) texts_column_names = [] for var_name, info in html_dict['variables'].items(): if info['type'] == 'Categorical' and info['p_unique'] > 0.6: texts_column_names.append(var_name) if len(texts_column_names) != 0: df = df[texts_column_names] profile_dict = __create_profile_dict(html_dict, df) html_dict = __extend_textual_html(profile_dict, html_dict) else: profile_dict = __create_profile_dict(html_dict) return profile_dict, config, html_dict, sample_timeseries def __create_sample_df(df, time_column): sample_time_series = df[[time_column]] temp_df = df.loc[:, df.columns != time_column] sample_count = 3 if len(temp_df.columns) < sample_count: sample_count = len(temp_df.columns) for i in range(0, sample_count): sample_time_series[temp_df.columns[i]] = temp_df[temp_df.columns[i]] return sample_time_series def __new_numeric_summary(config: Settings, series: pd.Series, summary: dict = None): if summary is None: summary = {} df = pd.DataFrame() dates_float = range(len(series)) df['time'] = dates_float df['id'] = series.name df['value'] = series.values json_decoded = __read_json_file_tsfresh(tsfresh_json_file) ts_fresh_results = __ts_fresh_json(df, json_decoded, no_time=False) summary['tsfresh_features'] = ts_fresh_results.to_dict(orient='records')[0] return config, series, summary # TODO: Add language distribution def __extend_textual_attributes(texts_list: list, var_name: str, info: dict): # Used in language detection DetectorFactory.seed = 2023 var_dict = { 'name': var_name, 'type': 'Textual', 'count': info['count'], 'num_missing': info['n_missing'], 'uniqueness': info['p_unique'], 'ratio_uppercase': 0, 'ratio_digits': 0, 'ratio_special_characters': 0, 'num_chars_distribution': {}, 'num_words_distribution': {}, 'language_distribution': [], 'n_distinct': info['n_distinct'], 'p_distinct': info['p_distinct'], 'p_missing': info['p_missing'], 'memory_size': info['memory_size'], 'n_unique': info['n_unique'] } num_chars = 0 ratio_uppercase = 0 ratio_digits = 0 ratio_special_characters = 0 num_chars_list = [] num_words_list = [] corpus_languages = dict() for text in texts_list: if not pd.isnull(text): text_num_chars = len(text) num_chars += text_num_chars num_chars_list.append(text_num_chars) for c in text: if c.isupper(): ratio_uppercase += 1 if c.isdigit(): ratio_digits += 1 if not c.isalnum(): ratio_special_characters += 1 words = nltk.word_tokenize(text.lower()) words_count = 0 for word in words: num_words_list.append(len(word)) # Find number of sentences sentences = nltk.sent_tokenize(text) sentences_count = 0 for sentence in sentences: sentences_count += 1 # Find languages try: languages = detect_langs(text) for language in languages: if pycountry.languages.get(alpha_2=language.lang) is not None: lang = pycountry.languages.get(alpha_2=language.lang).name.lower() else: lang = 'english' if lang not in corpus_languages: corpus_languages[lang] = language.prob else: corpus_languages[lang] += language.prob except: language = detect(text) if pycountry.languages.get(alpha_2=language['lang']) is not None: lang = pycountry.languages.get(alpha_2=language['lang']).name.lower() else: lang = 'english' if lang not in corpus_languages: corpus_languages[lang] = language['score'] else: corpus_languages[lang] += language['score'] # Calculate language distribution in the corpus corpus_languages = {k: v / var_dict['count'] for k, v in corpus_languages.items()} total = sum(corpus_languages.values(), float(0)) * 100 if total < 100: corpus_languages['unknown'] = (100 - total) / 100 corpus_languages = dict(sorted(corpus_languages.items(), key=lambda item: item[1], reverse=True)) for k, v in corpus_languages.items(): var_dict['language_distribution'].append({'language': k, "percentage": v * 100}) if num_chars != 0: var_dict['ratio_uppercase'] = ratio_uppercase / num_chars var_dict['ratio_digits'] = ratio_digits / num_chars var_dict['ratio_special_characters'] = ratio_special_characters / num_chars if len(num_chars_list) != 0: s = pd.Series(num_chars_list) stats = s.describe(percentiles=[.10, .25, .75, .90]) var_dict['num_chars_distribution'] = { 'name': var_name, 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } if len(num_words_list) != 0: s = pd.Series(num_words_list) stats = s.describe(percentiles=[.10, .25, .75, .90]) var_dict['num_words_distribution'] = { 'name': var_name, 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } return var_dict def __create_profile_dict(html_dict: dict, df: pd.DataFrame = pd.DataFrame()): profile_dict = { 'analysis': { 'title': 'Profiling Report', 'date_start': '', 'date_end': '', 'duration': '', 'filenames': '' }, 'table': { 'profiler_type': '', 'byte_size': 0, 'memory_size': 0, 'record_size': 0, 'num_rows': 0, 'num_attributes': 0, 'n_cells_missing': 0, 'p_cells_missing': 0.0, 'types': [] }, 'variables': [], 'package': html_dict['package'] } # Fill analysis section start_string = str(html_dict['analysis']['date_start']) profile_dict['analysis']['date_start'] = start_string end_string = str(html_dict['analysis']['date_end']) profile_dict['analysis']['date_end'] = end_string profile_dict['analysis']['duration'] = str(html_dict['analysis']['duration']) profile_dict['analysis']['filenames'] = html_dict['analysis']['filenames'] # Fill table section profile_dict['table']['profiler_type'] = html_dict['table']['profiler_type'] profile_dict['table']['byte_size'] = html_dict['table']['byte_size'] profile_dict['table']['num_rows'] = html_dict['table']['n'] profile_dict['table']['num_attributes'] = html_dict['table']['n_var'] profile_dict['table']['n_cells_missing'] = html_dict['table']['n_cells_missing'] profile_dict['table']['p_cells_missing'] = html_dict['table']['p_cells_missing'] profile_dict['table']['memory_size'] = html_dict['table']['memory_size'] profile_dict['table']['record_size'] = html_dict['table']['record_size'] profile_types = {} # Fill variables for var_name, info in html_dict['variables'].items(): if info['type'] == 'DateTime': if info['type'] in profile_types: profile_types[info['type']] += 1 else: profile_types[info['type']] = 1 var_dict = { 'name': var_name, 'type': 'DateTime', 'count': info['count'], 'n_distinct': info['n_distinct'], 'p_distinct': info['p_distinct'], 'num_missing': info['n_missing'], 'uniqueness': info['p_unique'], 'p_missing': info['p_missing'], 'memory_size': info['memory_size'], 'start': str(info['min']), 'end': str(info['max']), 'date_range': str(info['range']), 'histogram_counts': info['histogram'][0], 'histogram_bins': info['histogram'][1] } profile_dict['variables'].append(var_dict) elif info['type'] == 'TimeSeries': if info['type'] in profile_types: profile_types[info['type']] += 1 else: profile_types[info['type']] = 1 var_dict = { 'name': var_name, 'type': 'TimeSeries', 'count': info['count'], 'num_missing': info['n_missing'], 'uniqueness': info['p_unique'], 'min': info['min'], 'max': info['max'], 'average': info['mean'], 'stddev': info['std'], 'median': info['50%'], 'kurtosis': info['kurtosis'], 'skewness': info['skewness'], 'variance': info['variance'], 'percentile5': info['5%'], 'percentile10': info['10%'], 'percentile25': info['25%'], 'percentile75': info['75%'], 'percentile90': info['90%'], 'percentile95': info['95%'], 'seasonal': info['seasonal'], 'stationary': info['stationary'], 'add_fuller': info['addfuller'], 'abs_energy': info['tsfresh_features']['abs energy'], 'abs_sum_changes': info['tsfresh_features']['absolute sum of changes'], 'len_above_mean': info['tsfresh_features']['count above mean'], 'len_below_mean': info['tsfresh_features']['count below mean'], 'num_peaks': info['tsfresh_features']['number cwt peaks n 10'], 'n_distinct': info['n_distinct'], 'p_distinct': info['p_distinct'], 'p_missing': info['p_missing'], 'memory_size': info['memory_size'], 'n_unique': info['n_unique'], 'n_infinite': info['n_infinite'], 'p_infinite': info['p_infinite'], 'n_zeros': info['n_zeros'], 'p_zeros': info['p_zeros'], 'n_negative': info['n_negative'], 'p_negative': info['p_negative'], 'monotonic': info['monotonic'], 'range': info['range'], 'iqr': info['iqr'], 'cv': info['cv'], 'mad': info['mad'], 'sum': info['sum'], 'histogram_counts': info['histogram'][0], 'histogram_bins': info['histogram'][1], 'value_counts_without_nan': [], 'value_counts_index_sorted': [], 'series': [] } for value, count in info['value_counts_without_nan'].items(): var_dict['value_counts_without_nan'].append({'value': value, "count": count}) for value, count in info['value_counts_index_sorted'].items(): var_dict['value_counts_index_sorted'].append({'value': value, "count": count}) for key, value in info['series'].items(): var_dict['series'].append({'key': key, "value": value}) profile_dict['variables'].append(var_dict) elif info['type'] == 'Numeric': if info['type'] in profile_types: profile_types[info['type']] += 1 else: profile_types[info['type']] = 1 var_dict = { 'name': var_name, 'type': 'Numeric', 'count': info['count'], 'num_missing': info['n_missing'], 'uniqueness': info['p_unique'], 'min': info['min'], 'max': info['max'], 'average': info['mean'], 'stddev': info['std'], 'median': info['50%'], 'kurtosis': info['kurtosis'], 'skewness': info['skewness'], 'variance': info['variance'], 'percentile5': info['5%'], 'percentile10': info['10%'], 'percentile25': info['25%'], 'percentile75': info['75%'], 'percentile90': info['90%'], 'percentile95': info['95%'], 'n_distinct': info['n_distinct'], 'p_distinct': info['p_distinct'], 'p_missing': info['p_missing'], 'memory_size': info['memory_size'], 'n_unique': info['n_unique'], 'n_infinite': info['n_infinite'], 'p_infinite': info['p_infinite'], 'n_zeros': info['n_zeros'], 'p_zeros': info['p_zeros'], 'n_negative': info['n_negative'], 'p_negative': info['p_negative'], 'monotonic': info['monotonic'], 'range': info['range'], 'iqr': info['iqr'], 'cv': info['cv'], 'mad': info['mad'], 'sum': info['sum'], 'histogram_counts': info['histogram'][0], 'histogram_bins': info['histogram'][1], 'value_counts_without_nan': [], 'value_counts_index_sorted': [] } for value, count in info['value_counts_without_nan'].items(): var_dict['value_counts_without_nan'].append({'value': value, "count": count}) for value, count in info['value_counts_index_sorted'].items(): var_dict['value_counts_index_sorted'].append({'value': value, "count": count}) profile_dict['variables'].append(var_dict) elif info['type'] == 'Categorical': if info['p_unique'] > 0.6: if 'Textual' in profile_types: profile_types['Textual'] += 1 else: profile_types['Textual'] = 1 texts_list = df[var_name].to_list() var_dict = __extend_textual_attributes(texts_list, var_name, info) else: if info['type'] in profile_types: profile_types[info['type']] += 1 else: profile_types[info['type']] = 1 var_dict = { 'name': var_name, 'type': 'Categorical', 'count': info['count'], 'num_missing': info['n_missing'], 'uniqueness': info['p_unique'], 'frequency_distribution': [], 'n_distinct': info['n_distinct'], 'p_distinct': info['p_distinct'], 'p_missing': info['p_missing'], 'memory_size': info['memory_size'], 'n_unique': info['n_unique'], 'samples': [] } for cat, count in info['first_rows'].items(): var_dict['samples'].append({'row': cat, "cat": count}) for cat, count in info['value_counts_without_nan'].items(): var_dict['frequency_distribution'].append({'name': var_name, 'type': cat, 'count': count}) profile_dict['variables'].append(var_dict) elif info['type'] == 'Geometry': if info['type'] in profile_types: profile_types[info['type']] += 1 else: profile_types[info['type']] = 1 var_dict = { 'name': var_name, 'type': 'Geometry', 'count': info['count'], 'num_missing': info['n_missing'], 'uniqueness': info['p_unique'], 'mbr': info['mbr'], 'centroid': info['centroid'], 'crs': info['crs'], 'union_convex_hull': info['union_convex_hull'], 'length_distribution': info['length_distribution'], 'area_distribution': info['area_distribution'], 'geom_type_distribution': [], 'value_counts_without_nan': [], 'n_distinct': info['n_distinct'], 'p_distinct': info['p_distinct'], 'p_missing': info['p_missing'], 'memory_size': info['memory_size'], 'n_unique': info['n_unique'], 'samples': [], 'heatmap': info['heatmap'] } for geom_type, frequency in info['geom_types'].items(): var_dict['geom_type_distribution'].append({'name': var_name, 'type': geom_type, 'count': frequency}) for value, count in info['value_counts_without_nan'].items(): var_dict['value_counts_without_nan'].append({'name': var_name, 'value': value, 'count': count}) for row, value in info['first_rows'].items(): var_dict['samples'].append({'row': row, "value": value}) profile_dict['variables'].append(var_dict) else: if info['type'] in profile_types: profile_types[info['type']] += 1 else: profile_types[info['type']] = 1 var_dict = { 'name': var_name, 'type': info['type'], 'count': info['count'], 'num_missing': info['n_missing'], 'uniqueness': info['p_unique'], 'p_missing': info['p_missing'], 'memory_size': info['memory_size'] } profile_dict['variables'].append(var_dict) for k, v in sorted(profile_types.items(), key=lambda x: x[1], reverse=True): profile_dict['table']['types'].append({'type': k, 'count': v}) return profile_dict def __extend_textual_html(profile_dict: dict, html_dict: dict): for variable in profile_dict['variables']: if variable['type'] == 'Textual': var_dict = { 'type': variable['type'], 'count': variable['count'], 'num_missing': variable['num_missing'], 'uniqueness': variable['uniqueness'], 'ratio_uppercase': variable['ratio_uppercase'], 'ratio_digits': variable['ratio_digits'], 'ratio_special_characters': variable['ratio_special_characters'], 'num_chars_distribution': variable['num_chars_distribution'], 'num_words_distribution': variable['num_words_distribution'], 'language_distribution': {language['language']: language['percentage'] for language in variable['language_distribution']} } html_dict['variables'][variable['name']].update(var_dict) if not html_dict['table']['types'].__contains__('Textual'): html_dict['table']['types']['Categorical'] -= 1 html_dict['table']['types']['Textual'] = 1 else: html_dict['table']['types']['Categorical'] -= 1 html_dict['table']['types']['Textual'] += 1 return html_dict def __profile_tabular_main(my_file_path: str, header: int = 0, sep: str = ',', crs: str = "EPSG:4326", longitude_column: str = None, latitude_column: str = None, wkt_column: str = None, minimal: bool = True): if my_file_path.__contains__('.shp'): pois = gp.read_file(my_file_path) crs = pois.crs df = pd.DataFrame(pois) df.geometry = df.geometry.astype(str) else: df = __read_files(my_file_path, header, sep) if minimal: config_file = get_config("config_minimal.yaml") with open(config_file) as f: data = yaml.safe_load(f) config: Settings = Settings().parse_obj(data) else: config: Settings = Settings() config.vars.num.quantiles.append(0.10) config.vars.num.quantiles.append(0.90) if longitude_column is not None and latitude_column is not None: geom_lon_lat = "geometry_" + longitude_column + "_" + latitude_column s = gp.GeoSeries.from_xy(df[longitude_column], df[latitude_column], crs=crs) s = s.to_crs("EPSG:4326") df[geom_lon_lat] = s.to_wkt() if wkt_column is not None: s = gp.GeoSeries.from_wkt(data=df[wkt_column], crs=crs) s = s.to_crs("EPSG:4326") df[wkt_column] = s.to_wkt() profile = ProfileReport(df, config=config, progress_bar=True) html_dict = profile.description_set html_dict['table']['profiler_type'] = 'Tabular' html_dict['analysis']['filenames'] = [my_file_path] html_dict['analysis']['title'] = 'Profiling Report' if wkt_column is not None: if not html_dict['table']['types'].__contains__('Geometry'): html_dict['table']['types']['Categorical'] -= 1 html_dict['table']['types']['Geometry'] = 1 else: html_dict['table']['types']['Categorical'] -= 1 html_dict['table']['types']['Geometry'] += 1 s = gp.GeoSeries.from_wkt(data=df[wkt_column], crs="EPSG:4326") html_dict['variables'][wkt_column]['type'] = 'Geometry' html_dict['variables'][wkt_column]['mbr'] = box(*s.total_bounds).wkt html_dict['variables'][wkt_column]['union_convex_hull'] = s.unary_union.convex_hull.wkt html_dict['variables'][wkt_column]['centroid'] = s.unary_union.centroid.wkt html_dict['variables'][wkt_column]['length'] = s.unary_union.length if len(s) > 1000: html_dict['variables'][wkt_column]['heatmap'] = __get_clusters_dict(s[:2000], wkt_column) else: html_dict['variables'][wkt_column]['heatmap'] = __get_clusters_dict(s, wkt_column) missing = s.isna().tolist() if any(missing): html_dict['variables'][wkt_column]['missing'] = True html_dict['variables'][wkt_column]['n_missing'] = sum(missing) html_dict['variables'][wkt_column]['p_missing'] = sum(missing) * 100 / len(missing) else: html_dict['variables'][wkt_column]['missing'] = False html_dict['variables'][wkt_column]['n_missing'] = 0 html_dict['variables'][wkt_column]['p_missing'] = 0.0 if crs is not None: crs_list = CRS.from_string(str(crs)) html_dict['variables'][wkt_column]['crs'] = 'EPSG:' + str(crs_list.to_epsg()) else: html_dict['variables'][wkt_column]['crs'] = 'EPSG:4326' count_geom_types = s.geom_type.value_counts() html_dict['variables'][wkt_column]['geom_types'] = count_geom_types # calculate area distribution s_area = s.area stats = s_area.describe(percentiles=[.10, .25, .75, .90]) html_dict['variables'][wkt_column]['area_distribution'] = { 'name': wkt_column, 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s_area.kurtosis(), 'skewness': s_area.skew(), 'variance': s_area.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } # calculate length distribution s_length = s.length stats = s_length.describe(percentiles=[.10, .25, .75, .90]) html_dict['variables'][wkt_column]['length_distribution'] = { 'name': wkt_column, 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s_length.kurtosis(), 'skewness': s_length.skew(), 'variance': s_length.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } if longitude_column is not None and latitude_column is not None: if not html_dict['table']['types'].__contains__('Geometry'): html_dict['table']['types']['Categorical'] -= 1 html_dict['table']['types']['Geometry'] = 1 else: html_dict['table']['types']['Categorical'] -= 1 html_dict['table']['types']['Geometry'] += 1 geom_lon_lat = "geometry_" + longitude_column + "_" + latitude_column html_dict['variables'][geom_lon_lat]['type'] = 'Geometry' s = gp.GeoSeries.from_wkt(df[geom_lon_lat], crs="EPSG:4326") html_dict['variables'][geom_lon_lat]['mbr'] = box(*s.total_bounds).wkt html_dict['variables'][geom_lon_lat]['union_convex_hull'] = s.unary_union.convex_hull.wkt html_dict['variables'][geom_lon_lat]['centroid'] = s.unary_union.centroid.wkt html_dict['variables'][geom_lon_lat]['length'] = s.unary_union.length if len(s) > 2000: html_dict['variables'][geom_lon_lat]['heatmap'] = __get_clusters_dict(s[:2000], geom_lon_lat) else: html_dict['variables'][geom_lon_lat]['heatmap'] = __get_clusters_dict(s, geom_lon_lat) missing = s.isna().tolist() if any(missing): html_dict['variables'][geom_lon_lat]['missing'] = True html_dict['variables'][geom_lon_lat]['n_missing'] = sum(missing) html_dict['variables'][geom_lon_lat]['p_missing'] = sum(missing) * 100 / len(missing) else: html_dict['variables'][geom_lon_lat]['missing'] = False html_dict['variables'][geom_lon_lat]['n_missing'] = 0 html_dict['variables'][geom_lon_lat]['p_missing'] = 0.0 if crs is not None: crs_list = CRS.from_string(str(crs)) html_dict['variables'][geom_lon_lat]['crs'] = 'EPSG:' + str(crs_list.to_epsg()) else: html_dict['variables'][geom_lon_lat]['crs'] = 'EPSG:4326' count_geom_types = s.geom_type.value_counts() html_dict['variables'][geom_lon_lat]['geom_types'] = count_geom_types # calculate area distribution s_area = s.area stats = s_area.describe(percentiles=[.10, .25, .75, .90]) html_dict['variables'][geom_lon_lat]['area_distribution'] = { 'name': geom_lon_lat, 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s_area.kurtosis(), 'skewness': s_area.skew(), 'variance': s_area.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } # calculate length distribution s_length = s.length stats = s_length.describe(percentiles=[.10, .25, .75, .90]) html_dict['variables'][geom_lon_lat]['length_distribution'] = { 'name': geom_lon_lat, 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s_length.kurtosis(), 'skewness': s_length.skew(), 'variance': s_length.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } # Files size html_dict['table']['byte_size'] = os.path.getsize(my_file_path) texts_column_names = [] for var_name, info in html_dict['variables'].items(): if info['type'] == 'Categorical' and info['p_unique'] > 0.6: texts_column_names.append(var_name) if len(texts_column_names) != 0: df = df[texts_column_names] profile_dict = __create_profile_dict(html_dict, df) html_dict = __extend_textual_html(profile_dict, html_dict) else: profile_dict = __create_profile_dict(html_dict) return profile_dict, config, html_dict # TODO: EPS_DISTANCE MUST BE DATA DRIVEN def __get_clusters_dict(geo_data: gp.GeoSeries, geometry_column: str = None): EPS_DISTANCE = 0.018 MIN_SAMPLE_POLYGONS = 5 wkt = gp.GeoDataFrame(geo_data) wkt.columns = [geometry_column, *wkt.columns[1:]] # preparation for dbscan wkt['x'] = wkt[geometry_column].centroid.x wkt['y'] = wkt[geometry_column].centroid.y coords = wkt[['x', 'y']].values # dbscan dbscan = DBSCAN(eps=EPS_DISTANCE, min_samples=MIN_SAMPLE_POLYGONS) clusters = dbscan.fit(coords) # add labels back to dataframe labels = pd.Series(clusters.labels_).rename('Clusters') wkt = pd.concat([wkt, labels], axis=1) data = wkt[['y', 'x', 'Clusters']] dict1 = data.to_dict() return dict1 def __replace_missing_inf_values(feature_array): """ This method is used to replace the NaN , infinity and -infinity values of an array of numbers. The NaN is replaced by the mean of the numbers in the array, the infinity with mean + 3*std (standard deviation) and the -infinity with mean - 3*std. :param feature_array: An array that contains the values of a feature. :type feature_array: numpy.array :return: -feature_array (numpy.array) - A numpy array with no NaN, infinity and -infinity values. """ feature_array_finite = feature_array[np.isfinite(feature_array)] mean_feature_array = np.nanmean(feature_array_finite) std_feature_array = np.nanstd(feature_array_finite) replace_pos_inf = mean_feature_array + 3 * std_feature_array replace_neg_inf = mean_feature_array - 3 * std_feature_array feature_array = np.nan_to_num(feature_array, copy=False, nan=mean_feature_array, posinf=replace_pos_inf, neginf=replace_neg_inf) return feature_array def __is_not_finite(arr): """ This method returns an array of booleans that have 'True' in the positions where we do not have finite numbers. :param arr: An array of numbers. :type arr: numpy.array :return: -res (numpy.array) - A numpy array where 'True' if we have non-finite (NaN, infinity and -infinity) values. """ res = np.isfinite(arr) np.bitwise_not(res, out=res) # in-place return res def __read_json_file_tsfresh(json_path: str): """ Read the json file from the given path that contains the features to be calculated by tsfresh package. :param json_path: The path containing the json file. :type json_path: string :return: -json_decoded (dict) - A dictionary containing the tsfresh features. """ with open(json_path, "r") as jf: json_decoded = json.load(jf) return json_decoded def __ts_fresh_json(df, json_decoded, no_time=False) -> pd.DataFrame: """ This method uses tsfresh to calculate a comprehensive number of features. :param df: A pandas Dataframe with 3 columns (time, value, id) or 2 columns (value, id) as required to extract features from tsfresh. :type df: pandas.DataFrame :param json_decoded: A dictionary containing the tsfresh features. :type json_decoded: dict :param no_time: A boolean that if 'True' means that the 'time' column doesn't exist on the pandas Dataframe. :type no_time: bool :return: -tf (pandas.DataFrame) - A pandas DataFrame containing the loaded time series as rows and the extracted features as columns. """ if no_time: tf = extract_features(df, column_id="id", column_value="value", default_fc_parameters=json_decoded, n_jobs=0, disable_progressbar=True) else: tf = extract_features(df, column_id="id", column_sort="time", column_value="value", default_fc_parameters=json_decoded, n_jobs=0, disable_progressbar=True) return tf.rename(columns=lambda x: x.split("value__")[1]).rename(columns=lambda x: x.replace("_", " "))