Source code for Pynomic.io.get_plot_bands

# !/usr/bin/env python
# -*- coding: utf-8 -*-
# License: MIT
# Copyright (c) 2024, Fiore J.Manuel.
# All rights reserved.

"""Provides the functions to read and extract the info from .tiff files."""

# =============================================================================
# IMPORTS
# =============================================================================
import numpy as np
import pandas as pd
import rasterio
import cv2
import os
import json
from shapely.geometry import MultiPolygon, Polygon
from rasterio import mask
from Pynomic.core import core
import re
from PIL import Image
import zarr
import io
import pandas_geojson as pdg
import geopandas as gdp
import shapely

# =============================================================================
# FUNCTIONS
# =============================================================================


def _read_grid(gpath, col_id: str):
    """Reads a geojson file.

    Args:
        gpath: grid path to a geojson file.

    Returns
    -------
        coordinate system of the grid.
        dict-like object with id and coords of each plot.
    """
    dics = {}
    if gpath.split(".")[1] == "geojson":
        plotgrids = open(gpath)
        plotgrids = json.load(plotgrids)
        crs_coords = plotgrids["crs"]["properties"]["name"]

        for p in plotgrids["features"]:
            dics[str(p["properties"][col_id])] = p["geometry"]["coordinates"][
                0
            ]

        return crs_coords, dics

    else:
        raise ValueError("Grid is not a geojson file")


def _read_grid2(gpath, col_id: str):
    """Reads a geojson and shape files.

    Parameters
    ----------
        gpath:str
            grid path to a geojson or shape file.

    Returns
    -------
        geodataframe form the grid.
        dict-like object with id and coords of each plot.
    """
    df = gdp.read_file(gpath)
    df[col_id] = df[col_id].astype(str)
    geodf = df.copy()
    poligons_dict = df.copy().set_index(col_id).loc[:, "geometry"].to_dict()

    return geodf, poligons_dict


def _read_grids(gpath, col_id: str):
    """Reads a geojson file or shape file.

    Parameters
    ----------
        gpath:str
            grid path to a geojson file.

    Returns
    -------
        coordinate system of the grid.
        dict-like object with id and coords of each plot.
    """
    data = gdp.read_file(gpath)
    data_dic = data.loc[:, [col_id, "geometry"]].copy().to_dict(index=False)


def _get_dataframe_from_json(path_gjson):
    data = pdg.read_geojson(path_gjson)
    collist = data.get_properties()
    dfg = data.to_dataframe()
    keep = []
    for c in dfg.columns:
        for m in collist:
            if len(c.split("." + m)) > 1:
                keep.append(c)
    dfa = dfg.loc[:, keep].copy()
    dfa.columns = collist
    return dfa


def _get_tiff_files(fold_path):
    """Makes a list of tiff files of a folder path.

    Parameters
    ----------
        fold_path:str
            folder path with the tiff files.

    Returns
    -------
        list
    """
    tiff_list = []
    for file in os.listdir(fold_path):

        if os.path.basename(file).split(".", 1)[1] == "tif":
            tiff_list.append(file)

    return tiff_list


def auto_fit_image(mtx, hbuffer=2, wbuffer=2):
    """Takes an array and returns the crooping and angle parameters.

    Parameters
    ----------
        mtx:np.array
        hbuffer:int
            height buffer
        wbuffer:int
            with buffer

    Returns
    -------
        tuple with croping parameters. Angle value.
    """
    gray = np.where(mtx <= 0, mtx, 255)

    gray = np.uint8(np.where(gray > 0, gray, 0))
    gray = np.array(gray)

    # edges = cv2.Canny(gray,150,150*2)

    contours, hierarchy = cv2.findContours(
        gray, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE
    )

    rect = cv2.minAreaRect(contours[-1])

    if rect[1][0] < rect[1][1]:
        angle = rect[2]
        gry = np.array(Image.fromarray(gray).rotate(angle, expand=True))
    else:
        angle = rect[2] + 90
        gry = np.array(Image.fromarray(gray).rotate(angle, expand=True))

    blur = cv2.GaussianBlur(gry, (3, 3), 0)
    th = cv2.threshold(blur, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]
    coords = cv2.findNonZero(th)
    x, y, w, h = cv2.boundingRect(coords)

    h1 = y + hbuffer
    h2 = y + (h - hbuffer)
    w1 = x + wbuffer
    w2 = x + (w - wbuffer)
    return (h1, h2, w1, w2), angle


def _extract_bands_from_raster(raster_data, multiplot, alpha_idx=-1):
    """Separates the bands for the maks.

    Parameters
    ----------
        raster_data:np.array
            a masked area of intrest(aoi).
        multiplot:Multiplot
            a multiplot obj form sapely.

    Returns
    -------
        list with the bands array and an array from the mask.
    """
    masked_rast, aff = rasterio.mask.mask(
        raster_data, multiplot.geoms, crop=True
    )

    if alpha_idx == -1:
        true_bands = masked_rast.copy()
        masked_band = masked_rast[-1]
    else:
        true_bands = list(masked_rast)
        masked_band = true_bands.pop(alpha_idx)

    return true_bands, masked_band


def extract_raster_data(raster_path, grid_path, col_id: str, bands_n=None):
    """Extracts the values from the raster file segregating each band and plot.

    Parameters
    ----------
        raster_path:str
            path to raster file.
        grid_path:str
            path to gird.
        bands_n:list
            a list of the bands names and order.

    Returns
    -------
        dict with array of each band and plot.
        DataFrame with date, mean band for each plot.
        list bands name.
    """
    geodf, grids = _read_grid2(grid_path, col_id)
    bands_mean = []
    array_dict = {}
    bands_name = []
    with rasterio.open(raster_path) as src:
        for pos, g in enumerate(grids.keys()):
            if int(pos) == 0:
                coords = src.meta["crs"]
                print(f"Raster Coords system: {coords}")
                print(f"Grid Coords system: {geodf.crs}")

            # Check if contains alpha band
            contains_alpha_band = -1
            for idx, interp in enumerate(src.colorinterp, start=1):
                if interp == rasterio.enums.ColorInterp.alpha:
                    contains_alpha_band = idx - 1

            if grids[g].geom_type == "MultiPolygon":
                figure = grids[g]
            else:
                figure = MultiPolygon([grids[g]])

            if contains_alpha_band != -1:
                # Diferentiate the true bands form the mask band.
                true_bands, masked_band = _extract_bands_from_raster(
                    src, figure, contains_alpha_band
                )
            else:
                # Returns the las band for fiting.
                true_bands, masked_band = _extract_bands_from_raster(
                    src, figure
                )

            # Get the fitting parameters.
            cpv, rangle = auto_fit_image(masked_band)

            # Enumerate the bands and name them.
            if bands_n:
                bands_name = bands_n
            else:
                n_band = np.array(range(0, len(true_bands))) + 1
                bands_name = ["band" + "_" + str(x) for x in n_band]

            # Fit the bands array with the parameters.
            fitted_bands = [
                np.array(Image.fromarray(band).rotate(rangle, expand=True))[
                    cpv[0] : cpv[1], cpv[2] : cpv[3]
                ]
                for band in true_bands
            ]

            # Save the mean for each band
            plot_fitted_bands = [
                np.mean(band.astype(float)) for band in fitted_bands
            ]
            mp_bands = []
            # Numerical id for project.
            id_str = "A" + str(pos + 1)
            mp_bands.append(id_str)
            # Original id from the grid can be numerical or text or both.
            mp_bands.append(g)
            # gets the date
            mp_bands.append(os.path.basename(raster_path).split("_")[0])
            for band in plot_fitted_bands:
                mp_bands.append(band)
            bands_mean.append(mp_bands)

            # Save the values in a dictionary.
            array_dict[pos + 1] = dict(zip(bands_name, fitted_bands))

    df1 = pd.DataFrame(bands_mean, columns=["id", col_id, "date", *bands_name])
    geodat = geodf
    geodat[col_id] = geodat[col_id].astype(str)

    df = geodat.merge(df1, on=col_id)
    df = df.loc[
        :, [*df1.columns.values, *geodat.drop(columns=col_id).columns.values]
    ]
    return array_dict, bands_name, df


[docs] def process_stack_tiff(folder_path, grid_path, col_id: str, bands_n=None): """Process all the .tiff files in a folder. Parameters ---------- folder_path:str folder that contains the .tiff files. grid_path:str path of the geojson grid. col_id:str unique column name identifier from the grid. bands_n:str list like with the bands names ordered. Returns ------- PynomicsProject object. """ tif_list = _get_tiff_files(folder_path) raw_data = zarr.group() raw_data.create_group("dates") dates = [] ldata = [] date_key = "" for tiff_pos, tiff_file in enumerate(tif_list): print(f"{tiff_pos + 1}/{len(tif_list)} : {tiff_file}") if re.search(r"_", tiff_file): date_key = tiff_file.split("_")[0] dates.append(date_key) to_raw_data, bands_n, ldata_bands = extract_raster_data( folder_path + "/" + tiff_file, grid_path, col_id, bands_n ) raw_data["dates"].create_group(date_key) for plot_id in to_raw_data.keys(): c = "A" + str(plot_id) raw_data["dates"][date_key].create_group(c) for band in to_raw_data[plot_id].keys(): raw_data["dates"][date_key][c].create_group(band) raw_data["dates"][date_key][c][band] = to_raw_data[plot_id][ band ] # ldata_bands = ldata_bands.reset_index().drop(columns= 'index', axis = 1) ldata.append(ldata_bands) df_data = pd.concat(ldata, axis=0).reset_index().drop(columns="index") return core.Pynomicproject( raw_data=raw_data, ldata=df_data, dates=dates, n_dates=len(dates), bands_name=bands_n, n_bands=len(bands_n), )
[docs] def read_zarr(path): """Reads a zarr project previously saved from pynomics. Parameters ---------- path:str a path to the directory Returns ------- Pynomicproject object """ store = zarr.open_group(path + "/" + "raw_data", mode="a") data1 = gdp.read_file(path + "/" + "ldata.shp") with open(path + "/" + "obj_properties.json", "r") as file: prop = dict(json.load(file)) return core.Pynomicproject( raw_data=store, ldata=data1.copy(), n_dates=len(prop["dates"]), dates=prop["dates"], n_bands=len(prop["bands"]), bands_name=prop["bands"], )