Source code for stelardataprofiler.raster.profiler

import os
from datetime import datetime
from pathlib import Path
import pandas as pd
import dateutil.parser
from typing import Union, Tuple, List
import rasterio as rio
from rasterio.warp import transform_bounds
from rasterio.transform import from_origin
from rasterio.io import MemoryFile
from pyproj import CRS
from shapely.geometry import box
import uuid
from ..utils import write_to_json


# -------------- RASTER --------------#
# ----------- SINGLE IMAGE -----------#
def profile_single_raster(my_file_path: str) -> dict:
    """
    This method performs profiling and generates a profiling dictionary for an image file that exists in the given path.

    :param my_file_path: the path to an image file.
    :type my_file_path: str
    :return: A dict which contains the results of the profiler for the image.
    :rtype: dict

    """
    if os.path.isdir(my_file_path):
        print('The input is not a file!')
        return dict()

    filename = get_filename(my_file_path)
    profile_dict = {
        'analysis': {
            'title': 'Profiling Report',
            'date_start': '',
            'date_end': '',
            'duration': '',
            'filenames': [filename]
        },
        'table': {
            'profiler_type': 'Raster',
            'byte_size': 0,
            'n_of_imgs': 1,
            'avg_width': 0.0,
            'avg_height': 0.0,
        },
        'variables': []
    }

    # Start time
    now = datetime.now()
    start_string = now.strftime("%Y-%m-%d %H:%M:%S.%f")
    profile_dict['analysis']['date_start'] = start_string

    # File size
    profile_dict['table']['byte_size'] = os.path.getsize(my_file_path)

    # Create image dictionary
    img_dict = {
        'name': '',
        'type': 'Raster',
        'crs': '',
        'spatial_coverage': '',
        'spatial_resolution': {
            'pixel_size_x': 0,
            'pixel_size_y': 0
        },
        'no_data_value': '',
        'format': ''
    }

    # Read image
    img = rio.open(my_file_path)

    # find image name
    name = Path(my_file_path).stem
    img_dict['name'] = name

    # find general image data
    img_dict.update(img.meta)

    # making transform JSON-serializable
    img_dict['transform'] = list(img_dict['transform'])

    profile_dict['table']['avg_width'] = img_dict['width']
    profile_dict['table']['avg_height'] = img_dict['height']

    # change nodata and driver keys
    img_dict['no_data_value'] = img_dict['nodata']
    del img_dict['nodata']

    img_dict['format'] = img_dict['driver']
    del img_dict['driver']

    # find tags
    img_dict['tags'] = []

    for k, v in img.tags().items():
        tag_dict = {
            'key': k,
            'value': v
        }

        img_dict['tags'].append(tag_dict)

    # change crs format
    if img.crs is not None:
        crs_list = CRS.from_string(str(img_dict['crs']))
        img_dict['crs'] = 'EPSG:' + str(crs_list.to_epsg())
    else:
        img_dict['crs'] = 'EPSG:4326'

    # calculate spatial resolution
    pixelSizeX, pixelSizeY = img.res
    img_dict['spatial_resolution']['pixel_size_x'] = pixelSizeX
    img_dict['spatial_resolution']['pixel_size_y'] = pixelSizeY

    # calculate spatial coverage
    # Bounding box (in the original CRS)
    bounds = img.bounds

    xmin, ymin, xmax, ymax = transform_bounds(CRS.from_string(img_dict['crs']), CRS.from_epsg(4326), *bounds)

    geom = box(xmin, ymin, xmax, ymax)
    img_dict['spatial_coverage'] = geom.wkt

    img_dict['bands'] = []
    # statistics for each band
    for band in range(1, img.count + 1):
        band_data = img.read(band).reshape(1, img.meta['width'] * img.meta['height'])[0].T

        # find band name
        if list(img.descriptions):
            band_name = img.descriptions[band - 1]
            if band_name is None:
                band_name = 'undefined'
        else:
            band_name = 'undefined'

        # find band statistics
        s = pd.Series(band_data)
        stats = s.describe(percentiles=[.10, .25, .75, .90])

        band_dict = {
            'uuid': str(uuid.uuid4()),
            'name': band_name,
            'count': stats[0],
            'min': stats[3],
            'max': stats[9],
            'average': stats[1],
            'stddev': stats[2],
            'median': stats[6],
            'kurtosis': s.kurtosis(),
            'skewness': s.skew(),
            'variance': s.var(),
            'percentile10': stats[4],
            'percentile25': stats[5],
            'percentile75': stats[7],
            'percentile90': stats[8],
        }

        img_dict['bands'].append(band_dict)

    profile_dict['variables'].append(img_dict)

    # End time
    now = datetime.now()
    end_string = now.strftime("%Y-%m-%d %H:%M:%S.%f")
    profile_dict['analysis']['date_end'] = end_string

    # Time Difference
    profile_dict['analysis']['duration'] = str(
        dateutil.parser.parse(profile_dict['analysis']['date_end']) - dateutil.parser.parse(
            profile_dict['analysis']['date_start']))

    return profile_dict


# ----------- MULTIPLE IMAGES -----------#
# noinspection PyTypedDict
def profile_multiple_rasters(my_file_paths: List[str]) -> dict:
    """
    This method performs profiling and generates a profiling dictionary for the image files that exist in the given folder path.

    :param my_folder_path: list of paths to image files.
    :type my_folder_path: List[str]
    :return: A dict which contains the results of the profiler for the images.
    :rtype: dict

    """

    profile_dict = {
        'analysis': {
            'title': 'Profiling Report',
            'date_start': '',
            'date_end': '',
            'duration': '',
            'filenames': []
        },
        'table': {
            'profiler_type': 'Raster',
            'byte_size': 0,
            'n_of_imgs': 0,
            'avg_width': 0,
            'avg_height': 0,
            'combined_band_stats': []
        },
        'variables': []
    }

    # in dictionary if same band name in more than one images
    band_images = dict()

    # Start time
    now = datetime.now()
    start_string = now.strftime("%Y-%m-%d %H:%M:%S.%f")
    profile_dict['analysis']['date_start'] = start_string

    for image in my_file_paths:
        filename = get_filename(image)

        profile_dict['analysis']['filenames'].append(filename)

        # Files size
        profile_dict['table']['byte_size'] += os.path.getsize(image)

        # Increase the number of images
        profile_dict['table']['n_of_imgs'] += 1

        # Create image dictionary
        img_dict = {
            'name': '',
            'type': 'Raster',
            'crs': '',
            'spatial_coverage': '',
            'spatial_resolution': {
                'pixel_size_x': 0,
                'pixel_size_y': 0
            },
            'no_data_value': '',
            'format': ''
        }

        # Read image
        img = rio.open(image)

        # find image name
        name = Path(image).stem
        img_dict['name'] = name

        # find general image data
        img_dict.update(img.meta)

        # making transform JSON-serializable
        img_dict['transform'] = list(img_dict['transform'])

        profile_dict['table']['avg_width'] += img_dict['width']
        profile_dict['table']['avg_height'] += img_dict['height']

        # change nodata and driver keys
        img_dict['no_data_value'] = img_dict['nodata']
        del img_dict['nodata']

        img_dict['format'] = img_dict['driver']
        del img_dict['driver']

        # find tags
        img_dict['tags'] = []

        for k, v in img.tags().items():
            tag_dict = {
                'key': k,
                'value': v
            }

            img_dict['tags'].append(tag_dict)

        # change crs format
        if img.crs is not None:
            crs_list = CRS.from_string(str(img_dict['crs']))
            img_dict['crs'] = 'EPSG:' + str(crs_list.to_epsg())
        else:
            img_dict['crs'] = 'EPSG:4326'

        # calculate spatial resolution
        pixelSizeX, pixelSizeY = img.res
        img_dict['spatial_resolution']['pixel_size_x'] = pixelSizeX
        img_dict['spatial_resolution']['pixel_size_y'] = pixelSizeY

        # calculate spatial coverage
        # Bounding box (in the original CRS)
        bounds = img.bounds

        xmin, ymin, xmax, ymax = transform_bounds(CRS.from_string(img_dict['crs']), CRS.from_epsg(4326), *bounds)

        geom = box(xmin, ymin, xmax, ymax)
        img_dict['spatial_coverage'] = geom.wkt

        img_dict['bands'] = []
        # statistics for each band
        for band in range(1, img.count + 1):
            band_data = img.read(band).reshape(1, img.meta['width'] * img.meta['height'])[0].T

            # find band name
            band_name = 'undefined'
            if list(img.descriptions):
                band_name = img.descriptions[band - 1]
                if band_name is None:
                    band_name = 'undefined'
            else:
                band_name = 'undefined'

            # find band statistics
            s = pd.Series(band_data)
            stats = s.describe(percentiles=[.10, .25, .75, .90])

            band_dict = {
                'uuid': str(uuid.uuid4()),
                'name': band_name,
                'count': stats[0],
                'min': stats[3],
                'max': stats[9],
                'average': stats[1],
                'stddev': stats[2],
                'median': stats[6],
                'kurtosis': s.kurtosis(),
                'skewness': s.skew(),
                'variance': s.var(),
                'percentile10': stats[4],
                'percentile25': stats[5],
                'percentile75': stats[7],
                'percentile90': stats[8],
            }

            img_dict['bands'].append(band_dict)

            if band_name != 'undefined':
                if band_name not in band_images:
                    band_images[band_name] = [img_dict['name']]
                else:
                    band_images[band_name].append(img_dict['name'])

        profile_dict['variables'].append(img_dict)

    # calculate combined_band_stats
    for k, v in band_images.items():
        if len(v) > 1:
            combined_band_dict = {
                'name': k,
                'n_of_imgs': len(v),
                'img_names': v,
                'count': 0,
                'min': math.inf,
                'average': 0,
                'max': -math.inf,
                'variance': 0
            }

            for image in profile_dict['variables']:
                if image['name'] in v:
                    for band in image['bands']:
                        if band['name'] == k:
                            combined_band_dict['count'] += band['count']
                            combined_band_dict['average'] += band['average'] * band['count']

                            if band['min'] < combined_band_dict['min']:
                                combined_band_dict['min'] = band['min']

                            if band['max'] > combined_band_dict['max']:
                                combined_band_dict['max'] = band['max']

                            break

            combined_band_dict['average'] = combined_band_dict['average'] / combined_band_dict['count']

            # calculate combined_variance
            # comb_var = (n*std1 + n*d_sqrt1 + m*std2 + m*d_sqrt2 + k*std3 + k*d_sqrt3)/ n + m + k
            for image in profile_dict['variables']:
                if image['name'] in v:
                    for band in image['bands']:
                        if band['name'] == k:
                            count = band['count']
                            std = band['stddev']
                            mean = band['average']
                            comb_mean = combined_band_dict['average']
                            d_sqrt = (mean - comb_mean) * (mean - comb_mean)

                            combined_band_dict['variance'] += count * std + count * d_sqrt

                            break

            combined_band_dict['variance'] = combined_band_dict['variance'] / combined_band_dict['count']

            profile_dict['table']['combined_band_stats'].append(combined_band_dict)

    # fill general image folder data
    profile_dict['table']['avg_width'] = float(profile_dict['table']['avg_width']) / float(
        profile_dict['table']['n_of_imgs'])
    profile_dict['table']['avg_height'] = float(profile_dict['table']['avg_height']) / float(
        profile_dict['table']['n_of_imgs'])

    # End time
    now = datetime.now()
    end_string = now.strftime("%Y-%m-%d %H:%M:%S.%f")
    profile_dict['analysis']['date_end'] = end_string

    # Time Difference
    profile_dict['analysis']['duration'] = str(
        dateutil.parser.parse(profile_dict['analysis']['date_end']) - dateutil.parser.parse(
            profile_dict['analysis']['date_start']))

    return profile_dict


# ----------- MAIN FUNCTION ----------#
[docs]def profile_raster_with_config(config: dict) -> None: """ This method performs profiling on raster data and writes the resulting profile dictionary based on a configuration dictionary. :param config: a dictionary with all configuration settings. :type config: dict :return: None. :rtype: None """ # input file path(s) input_file_paths = config['input']['files'] if isinstance(input_file_paths, list): if len(input_file_paths) == 1: my_path = os.path.abspath(input_file_paths[0]) else: my_path = [] for path in input_file_paths: my_path.append(os.path.abspath(input_file_paths)) elif isinstance(input_file_paths, str) and os.path.isfile(os.path.abspath(input_file_paths)): my_path = os.path.abspath(input_file_paths) else: raise ValueError(f"Invalid input: {input_file_paths} must be a valid file path or list of file paths") # output file path output_json_path = os.path.abspath(config['output']['json']) # Run raster profile profile_dict = profile_raster(my_path=my_path) # Write resulting profile dictionary write_to_json(profile_dict, output_json_path)
[docs]def profile_raster(my_path: Union[str, List[str]]) -> dict: """ This method performs profiling and generates a profiling dictionary for either a single image or many images. :param my_path: either the path to an image file or a list of paths to image files. :type my_path: Union[str, List[str]] :return: A dict which contains the results of the profiler for the image or images. :rtype: dict """ if isinstance(my_path, list): # Handle list of paths return profile_multiple_rasters(my_path) elif isinstance(my_path, str) and os.path.isfile(my_path): # Handle single file path return profile_single_raster(my_path) else: raise ValueError(f"Invalid input: {my_path} must be a valid file path or list of file paths")
# ------ VISTA (RHD, RAS FILES) ------#
[docs]def profile_vista_rasters_with_config(config: dict) -> None: """ This method performs profiling on ras data and writes the resulting profile dictionary based on a configuration dictionary. :param config: a dictionary with all configuration settings. :type config: dict :return: None. :rtype: None """ # 2 input files (ras, rhd) my_rhd_file_path = os.path.abspath(config['input']['rhd_file']) my_ras_file_path = os.path.abspath(config['input']['ras_file']) # output file path output_json_path = os.path.abspath(config['output']['json']) # Run raster profile profile_dict = profile_vista_rasters(rhd_datapath=my_rhd_file_path, ras_datapath=my_ras_file_path) # Write resulting profile dictionary write_to_json(profile_dict, output_json_path)
[docs]def profile_vista_rasters(rhd_datapath: str, ras_datapath: str): """ This method performs profiling and generates a profiling dictionary for a given ras file that exists in the given path using the contents of a rhd file that exists in the given path. :param rhd_datapath: the path to a rhd file. :type rhd_datapath: str :param ras_datapath: the path to a ras file. :type ras_datapath: str :return: A dict which contains the results of the profiler for the ras. :rtype: dict """ def __read_image_rhd(rhd_datapath: str): with open(rhd_datapath, 'r') as f: lines = f.readlines() vista_data_type = int(lines[0]) n_of_LAI = int(lines[1]) split_third_row = " ".join(lines[2].split()).split(' ') columns = int(split_third_row[0]) rows = int(split_third_row[1]) split_fourth_row = " ".join(lines[3].split()).split(' ') resolution = float(split_fourth_row[0]) upper_left_corner_x = float(split_fourth_row[1]) upper_left_corner_y = float(split_fourth_row[2]) UTM_x = float(split_fourth_row[3]) UTM_y = float(split_fourth_row[4]) UTM_zone = str(split_fourth_row[5]) LAI_images = {'vista_data_type': vista_data_type, 'resolution': resolution, 'upper_left_corner_x': upper_left_corner_x, 'upper_left_corner_y': upper_left_corner_y, 'rows': rows, 'columns': columns, 'UTM_x': UTM_x, 'UTM_y': UTM_y, 'UTM_zone': UTM_zone} count_LAI_images = 0 LAI_images['images'] = {} for value_LAI in range(5, n_of_LAI + 5): ras_file_name = rhd_datapath.split('/')[-1].split('.')[0] img_name = ras_file_name + '_' + str(count_LAI_images) prev_img_name = ras_file_name + '_' + str(count_LAI_images - 1) split_row = " ".join(lines[value_LAI].split()).split(' ') LAI_images['images'][img_name] = {} img_bytes = int(split_row[0]) LAI_images['images'][img_name]['bytes'] = img_bytes LAI_images['images'][img_name]['date'] = datetime.strptime( split_row[3] + ' ' + split_row[2] + ' ' + split_row[1], '%d %m %Y').date() record_length = img_bytes * columns LAI_images['images'][img_name]['record_length_bytes'] = record_length if count_LAI_images == 0: LAI_images['images'][img_name]['image_start_pos_bytes'] = 0 else: LAI_images['images'][img_name]['image_start_pos_bytes'] = LAI_images['images'][prev_img_name][ 'image_start_pos_bytes'] + (( record_length / img_bytes) * rows) count_LAI_images += 1 return LAI_images ras_dict = __read_image_rhd(rhd_datapath) profile_dict = { 'analysis': { 'title': 'Profiling Report', 'date_start': '', 'date_end': '', 'duration': '', 'filenames': [rhd_datapath, ras_datapath] }, 'table': { 'profiler_type': 'Vista_Raster', 'byte_size': 0, 'n_of_imgs': len(ras_dict['images']), 'avg_width': 0.0, 'avg_height': 0.0, 'combined_bands': [] }, 'variables': [] } # initialize .ras NODATA value counts ras_zero_count = 0 ras_missing_count = 0 ras_forest_count = 0 ras_urban_count = 0 ras_water_count = 0 ras_snow_count = 0 ras_cloud_shadow_buffer_count = 0 ras_cloud_shadow_count = 0 ras_cloud_buffer_count = 0 ras_cirrus_clouds_count = 0 ras_clouds_count = 0 __lai_f = lambda x: float(str(x)) / 1000 if (x > 0) else x # __lai_f = lambda x: float(str(x)[:-4])/40.0 if(x > 99) else ( x if(x < 0) else -999) # Start time now = datetime.now() start_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_start'] = start_string img_names = [] imgs = [] lai_in_imgs = [] with open(ras_datapath, 'r+') as f: ras_file_name = ras_datapath.split('/')[-1].split('.')[0] if ras_dict['vista_data_type'] == 7: ras_file_array = np.fromfile(f, dtype=np.int16).astype(float) ras_file_array[np.where(ras_file_array > 0)] = list( map(__lai_f, ras_file_array[np.where(ras_file_array > 0)])) n_of_imgs = len(ras_dict['images']) for n_img in range(0, n_of_imgs): # Create image dictionary img_dict = { 'name': '', 'type': 'Raster', 'crs': '', 'date': '', 'spatial_coverage': '', 'spatial_resolution': { 'pixel_size_x': 0, 'pixel_size_y': 0 }, 'no_data_value': '', 'format': '' } img_name = ras_file_name + '_' + str(n_img) img_names.append(img_name) # image name img_dict['name'] = img_name next_img_name = ras_file_name + '_' + str(n_img + 1) if n_img == n_of_imgs - 1: start_pos = int(ras_dict['images'][img_name]['image_start_pos_bytes']) end_pos = len(ras_file_array) else: start_pos = int(ras_dict['images'][img_name]['image_start_pos_bytes']) end_pos = int(ras_dict['images'][next_img_name]['image_start_pos_bytes']) # data of the image img_data = ras_file_array[start_pos:end_pos] img_data = img_data.reshape((ras_dict['rows'], ras_dict['columns'])) # Find Image General Data upper_left_corner_x = ras_dict['upper_left_corner_x'] upper_left_corner_y = ras_dict['upper_left_corner_y'] x_res = ras_dict['resolution'] y_res = ras_dict['resolution'] transform = from_origin(upper_left_corner_x, upper_left_corner_y, x_res, y_res) # create in-memory rasterio image mem_file = MemoryFile() with mem_file.open(driver='GTiff', height=ras_dict['rows'], width=ras_dict['columns'], count=1, dtype=str(ras_file_array.dtype), crs='+proj=utm +zone=' + str(ras_dict['UTM_zone']), transform=transform) as img: img.update_tags(date=ras_dict['images'][img_name]['date']) # image general metadata img_dict.update(img.meta) # image size profile_dict['table']['byte_size'] += img_dict['width'] * img_dict['height'] * 4 # image date img_dict['date'] = ras_dict['images'][img_name]['date'].strftime("%d.%m.%Y") # making transform JSON-serializable img_dict['transform'] = list(img_dict['transform']) profile_dict['table']['avg_width'] += img_dict['width'] profile_dict['table']['avg_height'] += img_dict['height'] # change nodata and driver keys img_dict['no_data_value'] = img_dict['nodata'] del img_dict['nodata'] img_dict['format'] = img_dict['driver'] del img_dict['driver'] # change crs format if img.crs is not None: crs_list = CRS.from_string(str(img_dict['crs'])) img_dict['crs'] = 'EPSG:' + str(crs_list.to_epsg()) else: img_dict['crs'] = 'EPSG:4326' # calculate spatial resolution pixelSizeX, pixelSizeY = img.res img_dict['spatial_resolution']['pixel_size_x'] = pixelSizeX img_dict['spatial_resolution']['pixel_size_y'] = pixelSizeY # calculate spatial coverage # Bounding box (in the original CRS) bounds = img.bounds xmin, ymin, xmax, ymax = transform_bounds(CRS.from_string(img_dict['crs']), CRS.from_epsg(4326), *bounds) geom = box(xmin, ymin, xmax, ymax) img_dict['spatial_coverage'] = geom.wkt img.close() # statistics for LAI band img_dict['bands'] = [] s = pd.Series(img_data[np.where(img_data > 0)]) stats = s.describe(percentiles=[.10, .25, .75, .90]) band_uuid = str(uuid.uuid4()) band_dict = { 'uuid': band_uuid, 'name': 'LAI', 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8], 'no_data_distribution': [] } # percentages of no_data values img_no_data = img_data[np.where(img_data < 0)] width = img_dict['width'] height = img_dict['height'] missing_count = np.count_nonzero(img_no_data == -999) forest_count = np.count_nonzero(img_no_data == -961) urban_count = np.count_nonzero(img_no_data == -950) water_count = np.count_nonzero(img_no_data == -940) snow_count = np.count_nonzero(img_no_data == -930) cloud_shadow_buffer_count = np.count_nonzero(img_no_data == -923) cloud_shadow_count = np.count_nonzero(img_no_data == -920) cloud_buffer_count = np.count_nonzero(img_no_data == -913) cirrus_clouds_count = np.count_nonzero(img_no_data == -911) clouds_count = np.count_nonzero(img_no_data == -910) img_zeros = img_data[np.where(img_data == 0)] zero_count = img_zeros.size # add NODATA value counts to the .ras NODATA value counts ras_missing_count += missing_count ras_forest_count += forest_count ras_urban_count += urban_count ras_water_count += water_count ras_snow_count += snow_count ras_cloud_shadow_buffer_count += cloud_shadow_buffer_count ras_cloud_shadow_count += cloud_shadow_count ras_cloud_buffer_count += cloud_buffer_count ras_cirrus_clouds_count += cirrus_clouds_count ras_clouds_count += clouds_count # add zero value counts to the .ras zero value counts ras_zero_count += zero_count no_data_dict = { 'LAI': (band_dict['count'] / (width * height)) * 100, 'missing': (missing_count / (width * height)) * 100, 'forest': (forest_count / (width * height)) * 100, 'urban': (urban_count / (width * height)) * 100, 'water': (water_count / (width * height)) * 100, 'snow': (snow_count / (width * height)) * 100, 'cloud_shadow_buffer': (cloud_shadow_buffer_count / (width * height)) * 100, 'cloud_shadow': (cloud_shadow_count / (width * height)) * 100, 'cloud_buffer': (cloud_buffer_count / (width * height)) * 100, 'cirrus_clouds': (cirrus_clouds_count / (width * height)) * 100, 'clouds': (clouds_count / (width * height)) * 100, 'zeros': (zero_count / (width * height)) * 100 } for k, v in no_data_dict.items(): band_dict['no_data_distribution'].append( {'uuid': band_uuid, 'value': k, 'percentage': v} ) if k == 'LAI': imgs.append({'raster': img_dict['name'], 'date': img_dict['date'], 'percentage': no_data_dict['LAI']}) lai_in_imgs.append(no_data_dict['LAI']) img_dict['bands'].append(band_dict) profile_dict['variables'].append(img_dict) # calculate combined stats combined_band_stats_dict = { 'name': 'LAI', 'n_of_imgs': profile_dict['table']['n_of_imgs'], 'img_names': img_names, 'imgs': imgs, 'count': 0, 'min': math.inf, 'average': 0, 'max': -math.inf, 'variance': 0, 'no_data_distribution': [], 'lai_distribution': {} } # calculate LAI numeric distribution for the images of the .ras s = pd.Series(lai_in_imgs) stats = s.describe(percentiles=[.10, .25, .75, .90]) lai_dict = { 'name': 'LAI', 'count': stats[0], 'min': stats[3], 'max': stats[9], 'average': stats[1], 'stddev': stats[2], 'median': stats[6], 'kurtosis': s.kurtosis(), 'skewness': s.skew(), 'variance': s.var(), 'percentile10': stats[4], 'percentile25': stats[5], 'percentile75': stats[7], 'percentile90': stats[8] } combined_band_stats_dict['lai_distribution'] = lai_dict for image in profile_dict['variables']: lai_band = image['bands'][0] if lai_band['count'] != 0: combined_band_stats_dict['count'] += lai_band['count'] combined_band_stats_dict['average'] += lai_band['average'] * lai_band['count'] if lai_band['min'] < combined_band_stats_dict['min']: combined_band_stats_dict['min'] = lai_band['min'] if lai_band['max'] > combined_band_stats_dict['max']: combined_band_stats_dict['max'] = lai_band['max'] combined_band_stats_dict['average'] = combined_band_stats_dict['average'] / combined_band_stats_dict[ 'count'] # calculate combined_variance # comb_var = (n*std1 + n*d_sqrt1 + m*std2 + m*d_sqrt2 + k*std3 + k*d_sqrt3)/ n + m + k for image in profile_dict['variables']: lai_band = image['bands'][0] if lai_band['count'] != 0: count = lai_band['count'] std = lai_band['stddev'] mean = lai_band['average'] comb_mean = combined_band_stats_dict['average'] d_sqrt = (mean - comb_mean) * (mean - comb_mean) combined_band_stats_dict['variance'] += count * std + count * d_sqrt combined_band_stats_dict['variance'] = combined_band_stats_dict['variance'] / combined_band_stats_dict[ 'count'] # calculate no_data_distribution for LAI of the .ras width_all = profile_dict['table']['avg_width'] height_all = profile_dict['table']['avg_height'] no_data_dict = { 'LAI': ((combined_band_stats_dict['count'] * n_of_imgs) / (width_all * height_all)) * 100, 'missing': ((ras_missing_count * n_of_imgs) / (width_all * height_all)) * 100, 'forest': ((ras_forest_count * n_of_imgs) / (width_all * height_all)) * 100, 'urban': ((ras_urban_count * n_of_imgs) / (width_all * height_all)) * 100, 'water': ((ras_water_count * n_of_imgs) / (width_all * height_all)) * 100, 'snow': ((ras_snow_count * n_of_imgs) / (width_all * height_all)) * 100, 'cloud_shadow_buffer': ((ras_cloud_shadow_buffer_count * n_of_imgs) / (width_all * height_all)) * 100, 'cloud_shadow': ((ras_cloud_shadow_count * n_of_imgs) / (width_all * height_all)) * 100, 'cloud_buffer': ((ras_cloud_buffer_count * n_of_imgs) / (width_all * height_all)) * 100, 'cirrus_clouds': ((ras_cirrus_clouds_count * n_of_imgs) / (width_all * height_all)) * 100, 'clouds': ((ras_clouds_count * n_of_imgs) / (width_all * height_all)) * 100, 'zeros': ((ras_zero_count * n_of_imgs) / (width_all * height_all)) * 100 } for k, v in no_data_dict.items(): combined_band_stats_dict['no_data_distribution'].append( {'name': 'LAI', 'value': k, 'percentage': v} ) profile_dict['table']['combined_bands'].append(combined_band_stats_dict) # calculate avg_width and avg_height of .ras file profile_dict['table']['avg_width'] = profile_dict['table']['avg_width'] / profile_dict['table']['n_of_imgs'] profile_dict['table']['avg_height'] = profile_dict['table']['avg_height'] / profile_dict['table'][ 'n_of_imgs'] # End time now = datetime.now() end_string = now.strftime("%Y-%m-%d %H:%M:%S.%f") profile_dict['analysis']['date_end'] = end_string # Time Difference profile_dict['analysis']['duration'] = str( dateutil.parser.parse(profile_dict['analysis']['date_end']) - dateutil.parser.parse( profile_dict['analysis']['date_start'])) return profile_dict
def get_filename(path: str) -> Tuple[str, str]: """Helper to split filename and extension""" filename = os.path.basename(path) return filename