Source code for stelardataprofiler.rdfGraph.profiler

import os
from datetime import datetime
import dateutil.parser
import pandas as pd
from rdflib import Graph, RDF, URIRef
from rdflib.extras.external_graph_libs import rdflib_to_networkx_multidigraph
import networkx as nx
from ..utils import write_to_json

[docs]def profile_rdfGraph_with_config(config: dict) -> None: """ This method performs profiling on rdfGraph data and writes the resulting profile dictionary based on a configuration dictionary. :param config: a dictionary with all configuration settings. :type config: dict :return: None. :rtype: None """ # input file path(s) input_file_path = config['input']['files'] if isinstance(input_file_path, list): if len(input_file_path) == 1: my_file_path = os.path.abspath(input_file_path[0]) else: raise ValueError(f"Invalid input: {input_file_path} must be a valid file path or list with one file path") elif isinstance(input_file_path, str) and os.path.isfile(os.path.abspath(input_file_path)): my_file_path = os.path.abspath(input_file_path) else: raise ValueError(f"Invalid input: {input_file_path} must be a valid file path or list of file paths") # output file path output_json_path = os.path.abspath(config['output']['json']) # Run raster profile if 'serialization' not in config['input']: print("No rdflib format is specified so the default 'application/rdf+xml' is used.") parse_format: str = 'application/rdf+xml' else: parse_format: str = str(config['input']['serialization']).lower() profile_dict = profile_rdfGraph(my_file_path=my_file_path, parse_format=parse_format) # Write resulting profile dictionary write_to_json(profile_dict, output_json_path)
[docs]def profile_rdfGraph(my_file_path: str, parse_format: str = 'application/rdf+xml'): """ This method performs profiling and generates a profiling dictionary for a given rdf file that exists in the given path. :param my_file_path: the path to a rdf file. :type my_file_path: str :param parse_format: the format of the rdf file. (see rdflib package to find the available formats e.g. 'turtle', 'application/rdf+xml', 'n3', 'nt', etc.) :type parse_format: str, optional :return: A dict which contains the results of the profiler for the rdf. :rtype: dict """ # Calculate the number of nodes def __calc_num_nodes(g: Graph): return len(g.all_nodes()) # Calculate the number of edges def __calc_num_edges(g: Graph): return len(g) # Calculate the number of namespaces def __calc_num_namespaces(g: Graph): v = g.serialize(format="ttl") return v.count('@prefix') # Calculate the number of classes and a class frequency list def __calc_class_features(g: Graph): num_classes = set() classes_distribution = dict() for cl in g.objects(predicate=RDF.type): if str(cl) not in classes_distribution: classes_distribution[str(cl)] = 0 classes_distribution[str(cl)] += 1 num_classes.add(str(cl)) # List of classes and their frequencies in the graph class_distribution_list = [] for c, v in sorted(classes_distribution.items(), key=lambda x: x[1], reverse=True): class_dict = dict({ 'class_name': c, 'count': v }) class_distribution_list.append(class_dict) return len(num_classes), class_distribution_list # Calculate the number of object type properties def __calc_num_object_properties(g: Graph): # Extract set from objects of triples object_list = {x for x in g.objects() if isinstance(x, URIRef)} # Append set extracted from subjects of triples object_list.update({x for x in g.subjects() if isinstance(x, URIRef)}) return len(object_list) # Calculate the number of data type properties def __calc_num_datatype_properties(g: Graph): data_property_list = {x for x in g.objects() if not isinstance(x, URIRef)} return len(data_property_list) # Calculate the number of connected components and a list with each connected component and its number of nodes def __calc_cc_features(nx_g: nx.MultiDiGraph): nx_g_undirected = nx_g.to_undirected() cc = list(nx.connected_components(nx_g_undirected)) cc_list = [] for i, c in enumerate(cc): cc_dict = dict({ 'component_name': i, 'num_nodes': len(c) }) cc_list.append(cc_dict) return len(cc), cc_list # Calculate the density of the graph def __calc_density(nx_g: nx.MultiDiGraph): nx_g_density = nx.density(nx_g) return nx_g_density # Calculate the degree_centrality_distribution def __calc_degree_centrality(nx_g: nx.MultiDiGraph): dc = nx.degree_centrality(nx_g) degrees_centrality = [] for _, v in dc.items(): degrees_centrality.append(v) s = pd.Series(degrees_centrality) stats = s.describe(percentiles=[.10, .25, .75, .90]) degree_centrality_distribution = { 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } return degree_centrality_distribution # Calculate the degree_distribution def __calc_degree(nx_g: nx.MultiDiGraph): degrees = [] for _, v in nx_g.degree: degrees.append(v) s = pd.Series(degrees) stats = s.describe(percentiles=[.10, .25, .75, .90]) degree_distribution = { 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } return degree_distribution # Calculate the in_degree_distribution def __calc_in_degree(nx_g: nx.MultiDiGraph): in_degrees = [] for _, v in nx_g.in_degree: in_degrees.append(v) s = pd.Series(in_degrees) stats = s.describe(percentiles=[.10, .25, .75, .90]) in_degrees_distribution = { 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } return in_degrees_distribution # Calculate the out_degree_distribution def __calc_out_degree(nx_g: nx.MultiDiGraph): out_degrees = [] for _, v in nx_g.out_degree: out_degrees.append(v) s = pd.Series(out_degrees) stats = s.describe(percentiles=[.10, .25, .75, .90]) out_degrees_distribution = { 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], } return out_degrees_distribution profile_dict = { 'analysis': { 'title': 'Profiling Report', 'date_start': '', 'date_end': '', 'duration': '', 'filenames': [my_file_path] }, 'table': { 'profiler_type': 'RDFGraph', 'byte_size': 0, 'num_nodes': 0, 'num_edges': 0, 'num_namespaces': 0, 'num_classes': 0, 'num_object_properties': 0, 'num_datatype_properties': 0, 'density': 0, 'num_connected_components': 0, 'connected_components': [], 'degree_centrality_distribution': dict(), 'degree_distribution': dict(), 'in_degree_distribution': dict(), 'out_degree_distribution': dict(), 'class_distribution': [] }, 'variables': [] } # Start time now = datetime.now() start_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_start'] = start_string # File size profile_dict['table']['byte_size'] = os.path.getsize(my_file_path) g = Graph() g.parse(my_file_path, format=parse_format) # Number of nodes profile_dict['table']['num_nodes'] = __calc_num_nodes(g) # Number of edges profile_dict['table']['num_edges'] = __calc_num_edges(g) # Number of namespaces profile_dict['table']['num_namespaces'] = __calc_num_namespaces(g) # Number of Classes + class_distribution profile_dict['table']['num_classes'], profile_dict['table']['class_distribution'] = __calc_class_features(g) # Number of Object type properties profile_dict['table']['num_object_properties'] = __calc_num_object_properties(g) # Number of Data type properties profile_dict['table']['num_datatype_properties'] = __calc_num_datatype_properties(g) # Create networkx graph nx_g = rdflib_to_networkx_multidigraph(g) # Number of connected components + List of connected components profile_dict['table']['num_connected_components'], profile_dict['table'][ 'connected_components'] = __calc_cc_features( nx_g) # Density profile_dict['table']['density'] = __calc_density(nx_g) # Calculate degree_centrality_distribution profile_dict['table']['degree_centrality_distribution'] = __calc_degree_centrality(nx_g) # Calculate degree_distribution profile_dict['table']['degree_distribution'] = __calc_degree(nx_g) # Calculate in_degree_distribution profile_dict['table']['in_degree_distribution'] = __calc_in_degree(nx_g) # Calculate out_degree_distribution profile_dict['table']['out_degree_distribution'] = __calc_out_degree(nx_g) # End time now = datetime.now() end_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_end'] = end_string # Time Difference profile_dict['analysis']['duration'] = str( dateutil.parser.parse(profile_dict['analysis']['date_end']) - dateutil.parser.parse( profile_dict['analysis']['date_start'])) return profile_dict