Skip to content

Commit

Permalink
use default multiprocessing in gdal.py and remove all logging from mo…
Browse files Browse the repository at this point in the history
…dule
  • Loading branch information
JeroenVerstraelen committed Jun 10, 2024
1 parent 5739cb6 commit c1b8676
Showing 1 changed file with 7 additions and 50 deletions.
57 changes: 7 additions & 50 deletions openeogeotrellis/integrations/gdal.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import logging
import multiprocessing
from dataclasses import dataclass
from pathlib import Path
Expand All @@ -11,8 +10,6 @@

from openeogeotrellis.utils import get_s3_binary_file_contents, _make_set_for_key

logger = logging.getLogger(__name__)


"""Output from GDAL.Info.
Expand Down Expand Up @@ -111,9 +108,10 @@ def _extract_gdal_asset_raster_metadata(


def error_handler(e):
logger.warning(f"Error while looking up result metadata, may be incomplete. {str(e)}")
#logger.warning(f"Error while looking up result metadata, may be incomplete. {str(e)}")
pass

pool = multiprocessing.get_context("spawn").Pool(10)
pool = multiprocessing.Pool(10)
job = [pool.apply_async(_get_metadata_callback, (asset_path, asset_md,job_dir,), error_callback=error_handler) for asset_path, asset_md in asset_metadata.items()]
pool.close()
pool.join()
Expand All @@ -139,7 +137,6 @@ def error_handler(e):
is_some_raster_md_missing = True

except Exception as e:
logger.warning("Could not retrieve raster metadata: " + str(e))
is_some_raster_md_missing = True

#
Expand All @@ -149,39 +146,25 @@ def error_handler(e):
def _get_metadata_callback(asset_path: str, asset_md: Dict[str, str], job_dir: Path):

mime_type: str = asset_md.get("type", "")
logger.debug(
f"_export_result_metadata: {asset_path=}, "
+ f"file's MIME type: {mime_type}, "
+ f"job dir (based on output file): {job_dir=}"
)

# Skip assets that are clearly not images.
if asset_path.endswith(".json"):
logger.info(f"_export_result_metadata: Asset file is not an image but JSON, {asset_path=}")
return None

# The asset path should be relative to the job directory.
abs_asset_path: Path = get_abs_path_of_asset(asset_path, job_dir)
logger.debug(f"{asset_path=} maps to absolute path: {abs_asset_path=} , " + f"{abs_asset_path.exists()=}")

asset_href: str = asset_md.get("href", "")
if not abs_asset_path.exists() and asset_href.startswith("s3://"):
try:
abs_asset_path.write_bytes(get_s3_binary_file_contents(asset_href))
except Exception as exc:
logger.error(
"Could not download asset from object storage: "
+ f"asset={asset_path}, href={asset_href!r}, exception: {exc!r}"
)
pass

asset_gdal_metadata: AssetRasterMetadata = read_gdal_raster_metadata(abs_asset_path)
# If gdal could not extract the projection metadata from the file
# (The file is corrupt perhaps?).
if asset_gdal_metadata.could_not_read_file:
logger.warning(
"Could not get projection extension metadata for following asset:"
+ f" '{asset_path}', {abs_asset_path=}"
)
return None
else:
return (asset_path, asset_gdal_metadata.to_dict())
Expand Down Expand Up @@ -219,7 +202,6 @@ def read_gdal_raster_metadata(asset_path: Union[str, Path]) -> AssetRasterMetada
and in that version the gdal.Info function include these properties directly
in the key "stac" of the dictionary it returns.
"""
logger.debug(f"{__name__}.read_projection_extension_metadata: {asset_path=}")
return parse_gdal_raster_metadata(read_gdal_info(str(asset_path)))


Expand Down Expand Up @@ -301,7 +283,6 @@ def _process_gdalinfo_for_netcdf_subdatasets(
sub_datasets_proj[sub_ds_uri] = sub_ds_md

stats_info = _get_raster_statistics(sub_ds_gdal_info, band_name)
logger.info(f"_process_gdalinfo_for_netcdf_subdatasets:: {stats_info=}")
sub_datasets_stats[sub_ds_uri] = stats_info

proj_info = {}
Expand All @@ -318,27 +299,18 @@ def _process_gdalinfo_for_netcdf_subdatasets(
proj_info["proj:epsg"] = epsg_codes.pop()

ds_band_names = [band for bands in sub_datasets_stats.values() for band in bands.keys()]
logger.debug(f"{ds_band_names=}")

all_raster_stats = {}

# We can only copy each band's stats if there are no duplicate bands across
# the subdatasets. If we find duplicate bands there is likely a bug.
# Besides it is not obvious how we would need to merge statistics across
# subdatasets, if the bands occur multiple times.
if sorted(set(ds_band_names)) != sorted(ds_band_names):
logger.warning(f"There are duplicate bands in {ds_band_names=}, Can not merge the bands' statistics.")
else:
logger.info(f"There are no duplicate bands in {ds_band_names=}, Will use all bands' statistics in result.")
for bands in sub_datasets_stats.values():
for band_name, stats in bands.items():
all_raster_stats[band_name] = stats

logger.debug(f"{all_raster_stats=}")
for bands in sub_datasets_stats.values():
for band_name, stats in bands.items():
all_raster_stats[band_name] = stats

result = AssetRasterMetadata(gdal_info=gdal_info, projection=proj_info, statistics=all_raster_stats)
logger.debug(f"_process_gdalinfo_for_netcdf_subdatasets:: returning {result=}")

return AssetRasterMetadata(gdal_info=gdal_info, projection=proj_info, statistics=all_raster_stats)


Expand Down Expand Up @@ -417,12 +389,6 @@ def get_abs_path_of_asset(asset_filename: str, job_dir: Union[str, Path]) -> Pat
:return: the absolute path to the asset file, inside job_dir.
"""
logger.debug(
f"{__name__}.get_abs_path_of_asset: {asset_filename=}, {job_dir=}, {Path.cwd()=}, "
+ f"{Path(job_dir).is_absolute()=}, {Path(job_dir).exists()=}, "
+ f"{Path(asset_filename).exists()=}"
)

abs_asset_path = Path(asset_filename)
if not abs_asset_path.is_absolute():
abs_asset_path = Path(job_dir).resolve() / asset_filename
Expand All @@ -449,8 +415,6 @@ def read_gdal_info(asset_uri: str) -> GDALInfo:
:return:
GDALInfo: which is a dictionary that contains the output from `gdal.Info()`.
"""
logger.debug(f"{__name__}.read_gdal_info: {asset_uri=}")

# By default, gdal does not raise exceptions but returns error codes and prints
# error info on stdout. We don't want that. At the least it should go to the logs.
# See https://gdal.org/api/python_gotchas.html
Expand All @@ -464,15 +428,8 @@ def read_gdal_info(asset_uri: str) -> GDALInfo:
except Exception as exc:
# TODO: Specific exception type(s) would be better but Wasn't able to find what
# specific exceptions gdal.Info might raise.
logger.warning(
"Could not get projection extension metadata, "
+ f"gdal.Info failed for following asset: '{asset_uri}' . "
+ "Either file does not exist or else it is probably not a raster. "
+ f"Exception from GDAL: {exc}"
)
return {}
else:
logger.debug(f"{asset_uri=}, {data_gdalinfo=}")
return data_gdalinfo


Expand Down

0 comments on commit c1b8676

Please sign in to comment.