From 5c1f201a70c2f6de188141a03a7ffa53a22c4010 Mon Sep 17 00:00:00 2001 From: Nicoletta De Maio Date: Tue, 24 Oct 2023 15:34:43 +1300 Subject: [PATCH] Moved another function from h3 to common --- raster2dggs/common.py | 72 +++++++++++++++++++++++++++++++++++++++- raster2dggs/h3.py | 76 +++++-------------------------------------- 2 files changed, 79 insertions(+), 69 deletions(-) diff --git a/raster2dggs/common.py b/raster2dggs/common.py index 792c62c..a4b3e5f 100644 --- a/raster2dggs/common.py +++ b/raster2dggs/common.py @@ -1,14 +1,17 @@ import os import errno +import tempfile import logging import click_log import pandas as pd -from typing import Union +from typing import Union, Callable from pathlib import Path from rasterio import crs from rasterio.enums import Resampling +from tqdm.dask import TqdmCallback +import dask.dataframe as dd from urllib.parse import urlparse @@ -118,3 +121,70 @@ def get_parent_res(dggs: str, parent_res: Union[None, int], resolution: int) -> raise RuntimeError( "Unknown dggs {dggs}) - must be one of [ 'h3', 'rhp' ]".format(dggs=dggs) ) + + +def address_boundary_issues( + dggs: str, + parent_groupby: Callable, + pq_input: tempfile.TemporaryDirectory, + output: Path, + resolution: int, + parent_res: int, + **kwargs, +) -> Path: + """ + After "stage 1" processing, there is a DGGS cell and band value/s for each pixel in the input image. Partitions are based + on raster windows. + + This function will re-partition based on parent cell IDs at a fixed offset from the target resolution. + + Once re-partitioned on this basis, values are aggregated at the target resolution, to account for multiple pixels mapping + to the same cell. + + This re-partitioning is necessary to address the issue of the same cell IDs being present in different partitions + of the original (i.e. window-based) partitioning. Using the nested structure of the DGGS is an useful property + to address this problem. + """ + parent_res = get_parent_res(dggs, parent_res, resolution) + + LOGGER.debug( + f"Reading Stage 1 output ({pq_input}) and setting index for parent-based partitioning" + ) + with TqdmCallback(desc="Reading window partitions"): + # Set index as parent cell + ddf = dd.read_parquet(pq_input).set_index(f"{dggs}_{parent_res:02}") + + with TqdmCallback(desc="Counting parents"): + # Count parents, to get target number of partitions + uniqueparents = sorted(list(ddf.index.unique().compute())) + + LOGGER.debug( + "Repartitioning into %d partitions, based on parent cells", + len(uniqueparents) + 1, + ) + LOGGER.debug("Aggregating cell values where conflicts exist") + + with TqdmCallback(desc="Repartioning/aggregating"): + ddf = ( + ddf.repartition( # See "notes" on why divisions expects repetition of the last item https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.repartition.html + divisions=(uniqueparents + [uniqueparents[-1]]) + ) + .map_partitions( + parent_groupby, resolution, kwargs["aggfunc"], kwargs["decimals"] + ) + .to_parquet( + output, + overwrite=kwargs["overwrite"], + engine="pyarrow", + write_index=True, + append=False, + name_function=lambda i: f"{uniqueparents[i]}.parquet", + compression=kwargs["compression"], + ) + ) + + LOGGER.debug( + "Stage 2 (parent cell repartioning) and Stage 3 (aggregation) complete" + ) + + return output diff --git a/raster2dggs/h3.py b/raster2dggs/h3.py index 4eaaa8e..45401db 100644 --- a/raster2dggs/h3.py +++ b/raster2dggs/h3.py @@ -9,7 +9,6 @@ import click import click_log import dask -import dask.dataframe as dd import h3pandas # Necessary import despite lack of explicit use import pandas as pd import pyarrow as pa @@ -20,7 +19,6 @@ from rasterio.warp import calculate_default_transform import rioxarray from tqdm import tqdm -from tqdm.dask import TqdmCallback import xarray as xr import raster2dggs.constants as const @@ -172,8 +170,14 @@ def process(window): pbar.update(1) common.LOGGER.debug("Stage 1 (primary indexing) complete") - return _address_boundary_issues( - tmpdir, output, resolution, parent_res, **kwargs + return common.address_boundary_issues( + "h3", + _h3_parent_groupby, + tmpdir, + output, + resolution, + parent_res, + **kwargs, ) @@ -196,70 +200,6 @@ def _h3_parent_groupby( ) -def _address_boundary_issues( - pq_input: tempfile.TemporaryDirectory, - output: Path, - resolution: int, - parent_res: int, - **kwargs, -) -> Path: - """ - After "stage 1" processing, there is an H3 cell and band value/s for each pixel in the input image. Partitions are based - on raster windows. - - This function will re-partition based on H3 parent cell IDs at a fixed offset from the target resolution. - - Once re-partitioned on this basis, values are aggregated at the target resolution, to account for multiple pixels mapping - to the same H3 cell. - - This re-partitioning is necessary to address the issue of the same H3 cell IDs being present in different partitions - of the original (i.e. window-based) partitioning. Using the nested structure of the DGGS is an useful property - to address this problem. - """ - parent_res = common.get_parent_res("h3", parent_res, resolution) - - common.LOGGER.debug( - f"Reading Stage 1 output ({pq_input}) and setting index for parent-based partitioning" - ) - with TqdmCallback(desc="Reading window partitions"): - # Set index as parent cell - ddf = dd.read_parquet(pq_input).set_index(f"h3_{parent_res:02}") - - with TqdmCallback(desc="Counting parents"): - # Count parents, to get target number of partitions - uniqueh3 = sorted(list(ddf.index.unique().compute())) - - common.LOGGER.debug( - "Repartitioning into %d partitions, based on parent cells", len(uniqueh3) + 1 - ) - common.LOGGER.debug("Aggregating cell values where conflicts exist") - - with TqdmCallback(desc="Repartioning/aggregating"): - ddf = ( - ddf.repartition( # See "notes" on why divisions expects repetition of the last item https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.repartition.html - divisions=(uniqueh3 + [uniqueh3[-1]]) - ) - .map_partitions( - _h3_parent_groupby, resolution, kwargs["aggfunc"], kwargs["decimals"] - ) - .to_parquet( - output, - overwrite=kwargs["overwrite"], - engine="pyarrow", - write_index=True, - append=False, - name_function=lambda i: f"{uniqueh3[i]}.parquet", - compression=kwargs["compression"], - ) - ) - - common.LOGGER.debug( - "Stage 2 (parent cell repartioning) and Stage 3 (aggregation) complete" - ) - - return output - - @click.command(context_settings={"show_default": True}) @click_log.simple_verbosity_option(common.LOGGER) @click.argument("raster_input", type=click.Path(), nargs=1)