Skip to content

Commit

Permalink
Moved another function from h3 to common
Browse files Browse the repository at this point in the history
  • Loading branch information
ndemaio committed Oct 24, 2023
1 parent 62dad90 commit 5c1f201
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 69 deletions.
72 changes: 71 additions & 1 deletion raster2dggs/common.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import os
import errno
import tempfile
import logging
import click_log

import pandas as pd

from typing import Union
from typing import Union, Callable
from pathlib import Path
from rasterio import crs
from rasterio.enums import Resampling
from tqdm.dask import TqdmCallback
import dask.dataframe as dd

from urllib.parse import urlparse

Expand Down Expand Up @@ -118,3 +121,70 @@ def get_parent_res(dggs: str, parent_res: Union[None, int], resolution: int) ->
raise RuntimeError(
"Unknown dggs {dggs}) - must be one of [ 'h3', 'rhp' ]".format(dggs=dggs)
)


def address_boundary_issues(
dggs: str,
parent_groupby: Callable,
pq_input: tempfile.TemporaryDirectory,
output: Path,
resolution: int,
parent_res: int,
**kwargs,
) -> Path:
"""
After "stage 1" processing, there is a DGGS cell and band value/s for each pixel in the input image. Partitions are based
on raster windows.
This function will re-partition based on parent cell IDs at a fixed offset from the target resolution.
Once re-partitioned on this basis, values are aggregated at the target resolution, to account for multiple pixels mapping
to the same cell.
This re-partitioning is necessary to address the issue of the same cell IDs being present in different partitions
of the original (i.e. window-based) partitioning. Using the nested structure of the DGGS is an useful property
to address this problem.
"""
parent_res = get_parent_res(dggs, parent_res, resolution)

LOGGER.debug(
f"Reading Stage 1 output ({pq_input}) and setting index for parent-based partitioning"
)
with TqdmCallback(desc="Reading window partitions"):
# Set index as parent cell
ddf = dd.read_parquet(pq_input).set_index(f"{dggs}_{parent_res:02}")

with TqdmCallback(desc="Counting parents"):
# Count parents, to get target number of partitions
uniqueparents = sorted(list(ddf.index.unique().compute()))

LOGGER.debug(
"Repartitioning into %d partitions, based on parent cells",
len(uniqueparents) + 1,
)
LOGGER.debug("Aggregating cell values where conflicts exist")

with TqdmCallback(desc="Repartioning/aggregating"):
ddf = (
ddf.repartition( # See "notes" on why divisions expects repetition of the last item https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.repartition.html
divisions=(uniqueparents + [uniqueparents[-1]])
)
.map_partitions(
parent_groupby, resolution, kwargs["aggfunc"], kwargs["decimals"]
)
.to_parquet(
output,
overwrite=kwargs["overwrite"],
engine="pyarrow",
write_index=True,
append=False,
name_function=lambda i: f"{uniqueparents[i]}.parquet",
compression=kwargs["compression"],
)
)

LOGGER.debug(
"Stage 2 (parent cell repartioning) and Stage 3 (aggregation) complete"
)

return output
76 changes: 8 additions & 68 deletions raster2dggs/h3.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import click
import click_log
import dask
import dask.dataframe as dd
import h3pandas # Necessary import despite lack of explicit use
import pandas as pd
import pyarrow as pa
Expand All @@ -20,7 +19,6 @@
from rasterio.warp import calculate_default_transform
import rioxarray
from tqdm import tqdm
from tqdm.dask import TqdmCallback
import xarray as xr

import raster2dggs.constants as const
Expand Down Expand Up @@ -172,8 +170,14 @@ def process(window):
pbar.update(1)

common.LOGGER.debug("Stage 1 (primary indexing) complete")
return _address_boundary_issues(
tmpdir, output, resolution, parent_res, **kwargs
return common.address_boundary_issues(
"h3",
_h3_parent_groupby,
tmpdir,
output,
resolution,
parent_res,
**kwargs,
)


Expand All @@ -196,70 +200,6 @@ def _h3_parent_groupby(
)


def _address_boundary_issues(
pq_input: tempfile.TemporaryDirectory,
output: Path,
resolution: int,
parent_res: int,
**kwargs,
) -> Path:
"""
After "stage 1" processing, there is an H3 cell and band value/s for each pixel in the input image. Partitions are based
on raster windows.
This function will re-partition based on H3 parent cell IDs at a fixed offset from the target resolution.
Once re-partitioned on this basis, values are aggregated at the target resolution, to account for multiple pixels mapping
to the same H3 cell.
This re-partitioning is necessary to address the issue of the same H3 cell IDs being present in different partitions
of the original (i.e. window-based) partitioning. Using the nested structure of the DGGS is an useful property
to address this problem.
"""
parent_res = common.get_parent_res("h3", parent_res, resolution)

common.LOGGER.debug(
f"Reading Stage 1 output ({pq_input}) and setting index for parent-based partitioning"
)
with TqdmCallback(desc="Reading window partitions"):
# Set index as parent cell
ddf = dd.read_parquet(pq_input).set_index(f"h3_{parent_res:02}")

with TqdmCallback(desc="Counting parents"):
# Count parents, to get target number of partitions
uniqueh3 = sorted(list(ddf.index.unique().compute()))

common.LOGGER.debug(
"Repartitioning into %d partitions, based on parent cells", len(uniqueh3) + 1
)
common.LOGGER.debug("Aggregating cell values where conflicts exist")

with TqdmCallback(desc="Repartioning/aggregating"):
ddf = (
ddf.repartition( # See "notes" on why divisions expects repetition of the last item https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.repartition.html
divisions=(uniqueh3 + [uniqueh3[-1]])
)
.map_partitions(
_h3_parent_groupby, resolution, kwargs["aggfunc"], kwargs["decimals"]
)
.to_parquet(
output,
overwrite=kwargs["overwrite"],
engine="pyarrow",
write_index=True,
append=False,
name_function=lambda i: f"{uniqueh3[i]}.parquet",
compression=kwargs["compression"],
)
)

common.LOGGER.debug(
"Stage 2 (parent cell repartioning) and Stage 3 (aggregation) complete"
)

return output


@click.command(context_settings={"show_default": True})
@click_log.simple_verbosity_option(common.LOGGER)
@click.argument("raster_input", type=click.Path(), nargs=1)
Expand Down

0 comments on commit 5c1f201

Please sign in to comment.