diff --git a/.gitignore b/.gitignore index e2f19f1..57f1c4e 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,9 @@ build *.jsonl *.json +## Exclude schemas +!schemas/**/*.json + # Old Submodule Path # Could be used locally pyessv-archive/ diff --git a/Makefile b/Makefile index 1e31d6d..b8622fb 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,7 @@ docker-build: docker build "$(APP_ROOT)" -f "$(APP_ROOT)/docker/Dockerfile" -t "$(DOCKER_TAG)" del_docker_volume: stophost - docker volume rm stac-populator_stac-db + docker volume rm docker_stac-db resethost: del_docker_volume starthost diff --git a/STACpopulator/cli.py b/STACpopulator/cli.py index 11ed00f..5cf9f48 100644 --- a/STACpopulator/cli.py +++ b/STACpopulator/cli.py @@ -14,6 +14,7 @@ def add_parser_args(parser: argparse.ArgumentParser) -> dict[str, Callable]: + """Common CLI arguments for all implementations.""" parser.add_argument( "--version", "-V", diff --git a/STACpopulator/extensions/__init__.py b/STACpopulator/extensions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/STACpopulator/extensions/base.py b/STACpopulator/extensions/base.py new file mode 100644 index 0000000..44e47d8 --- /dev/null +++ b/STACpopulator/extensions/base.py @@ -0,0 +1,360 @@ +""" +# Base classes for STAC extensions + +What we have: + - `Loader`, which returns attributes. + - An external json schema describing a subset of the attributes returned by the Loader. This schema might preclude + additional properties, so it cannot be applied wholesale to the Loader's output. (maybe overkill since not a lot of schemas can be found in the wild...) + - `data model` describing the content we want included in the catalog. It includes a subset of the schema properties, + as well as additional attributes desired by the catalog admins. + +Desiderata: + - Not having to replicate existing validation logic in the schema + - Not having to create a modified schema + - Being able to supplement the schema validation by pydantic validation logic + - Streamline the creation of new data models (reduce boilerplate, allow subclassing) + - Developer-friendly validation error messages + + +How-to: + - Instructions to create basic datamodel from schema (codegen) + + + +""" +from __future__ import annotations + +from datetime import datetime +import json +import jsonschema +import logging +from typing import Any, Dict, Generic, TypeVar, Union, cast, Optional +from pydantic import (BaseModel, create_model, Field, FilePath, field_validator, model_validator, HttpUrl, ConfigDict, + PrivateAttr) +import pystac +from pystac.extensions import item_assets +from pystac.extensions.base import ( + ExtensionManagementMixin, + PropertiesExtension, + SummariesExtension, +) +from pystac import STACValidationError +from pystac.extensions.base import S # generic pystac.STACObject +from STACpopulator.models import AnyGeometry, GeoJSONPolygon +from STACpopulator.stac_utils import ( + ServiceType, + ncattrs_to_bbox, + ncattrs_to_geometry, +) +import types +from STACpopulator.extensions.datacube import DataCubeHelper +from STACpopulator.extensions.thredds import THREDDSHelper + +T = TypeVar("T", pystac.Collection, pystac.Item, pystac.Asset, item_assets.AssetDefinition) + +LOGGER = logging.getLogger(__name__) + + +class ExtensionHelper(BaseModel): + """Base class for dataset properties going into the catalog. + + Subclass this with attributes. + + Attributes + ---------- + _prefix : str + If not None, this prefix is added to ingested data before the jsonschema validation. + _schema_uri : str + URI of the json schema to validate against. + _schema_exclude : list[str] + Properties not meant to be validated by json schema, but still included in the data model. + """ + _prefix: str = PrivateAttr() + _schema_uri: FilePath = PrivateAttr(None) + _schema_exclude: list[str] = PrivateAttr([]) + + model_config = ConfigDict(populate_by_name=True, extra="ignore") + + @model_validator(mode="before") + @classmethod + def validate_jsonschema(cls, data): + """Validate the data model against the json schema, if given.""" + # Load schema + uri = cls._schema_uri.default + if uri is not None: + schema = json.load(open(uri)) + validator_cls = jsonschema.validators.validator_for(schema) + validator_cls.check_schema(schema) + validator = validator_cls(schema) + + attrs = {f"{cls._prefix.default}:{k}": v for (k,v) in data.items() if k not in cls._schema_exclude.default} + errors = list(validator.iter_errors(attrs)) + if errors: + raise ValueError(errors) + + return data + + def apply(self, item, add_if_missing=False): + """Add extension for the properties of the dataset to the STAC item. + The extension class is created dynamically from the properties. + """ + ExtSubCls = metacls_extension(self._prefix, schema_uri=str(self._schema_uri)) + item_ext = ExtSubCls.ext(item, add_if_missing=add_if_missing) + item_ext.apply(self.model_dump(mode="json", by_alias=True)) + return item + + +class BaseSTAC(BaseModel): + """Base class for STAC item data models. + + Attributes + ---------- + geometry : AnyGeometry + The geometry of the dataset. + bbox : list[float] + The bounding box of the dataset. + start_datetime : datetime + The start datetime of the dataset. + end_datetime : datetime + The end datetime of the dataset. + extensions : list[str] + Name of the class attributes that point to STAC extension helper classes. Those extension classes should have an `apply` method. + """ + # STAC item properties + geometry: AnyGeometry | None + bbox: list[float] + start_datetime: datetime + end_datetime: datetime + + extensions: list = [] + + model_config = ConfigDict(populate_by_name=True, extra="ignore", arbitrary_types_allowed=True) + + @property + def uid(self) -> str: + """Return a unique ID. When subclassing, use a combination of properties uniquely identifying a dataset.""" + # TODO: Should this be an abstract method? + import uuid + return str(uuid.uuid4()) + + def stac_item(self) -> "pystac.Item": + """Create a STAC item and add extensions.""" + item = pystac.Item( + id=self.uid, + geometry=self.geometry.model_dump(), + bbox=self.bbox, + properties={ + "start_datetime": str(self.start_datetime), + "end_datetime": str(self.end_datetime), + }, + datetime=None, + ) + + # Add extensions + for ext in self.extensions: + getattr(self, ext).apply(item) + + try: + item.validate() + except STACValidationError as e: + raise Exception("Failed to validate STAC item") from e + + return json.loads(json.dumps(item.to_dict())) + + +class THREDDSCatalogDataModel(BaseSTAC): + """Base class ingesting attributes loaded by `THREDDSLoader` and creating a STAC item. + + This is meant to be subclassed for each extension. + + It includes two validation mechanisms: + - pydantic validation using type hints, and + - json schema validation. + """ + # Data from loader + data: dict + + # Extensions classes + properties: ExtensionHelper + datacube: DataCubeHelper + thredds: THREDDSHelper + + extensions: list = ["properties", "datacube", "thredds"] + + model_config = ConfigDict(populate_by_name=True, extra="ignore", arbitrary_types_allowed=True) + + @classmethod + def from_data(cls, data): + """Instantiate class from data provided by THREDDS Loader. + """ + # This is where we match the Loader's output to the STAC item and extensions inputs. If we had multiple + # loaders, that's probably the only thing that would be different between them. + return cls(data=data, + start_datetime=data["groups"]["CFMetadata"]["attributes"]["time_coverage_start"], + end_datetime=data["groups"]["CFMetadata"]["attributes"]["time_coverage_end"], + geometry=ncattrs_to_geometry(data), + bbox=ncattrs_to_bbox(data), + ) + + @model_validator(mode="before") + @classmethod + def properties_helper(cls, data): + """Instantiate the properties helper.""" + data["properties"] = data['data']['attributes'] + return data + + @model_validator(mode="before") + @classmethod + def datacube_helper(cls, data): + """Instantiate the DataCubeHelper.""" + data["datacube"] = DataCubeHelper(data['data']) + return data + + @model_validator(mode="before") + @classmethod + def thredds_helper(cls, data): + """Instantiate the THREDDSHelper.""" + data["thredds"] = THREDDSHelper(data['data']["access_urls"]) + return data + + +def metacls_extension(name, schema_uri): + """Create an extension class dynamically from the properties.""" + cls_name = f"{name.upper()}Extension" + + bases = (MetaExtension, + Generic[T], + PropertiesExtension, + ExtensionManagementMixin[Union[pystac.Asset, pystac.Item, pystac.Collection]] + ) + + attrs = {"name": name, "schema_uri": schema_uri} + return types.new_class(name=cls_name, bases=bases, kwds=None, exec_body=lambda ns: ns.update(attrs)) + + +class MetaExtension: + name: str + schema_uri: str + + def apply(self, properties: dict[str, Any]) -> None: + """Applies CMIP6 Extension properties to the extended + :class:`~pystac.Item` or :class:`~pystac.Asset`. + """ + for prop, val in properties.items(): + self._set_property(prop, val) + + @classmethod + def get_schema_uri(cls) -> str: + """We have already validated the JSON schema.""" + return cls.schema_uri + + @classmethod + def has_extension(cls, obj: S): + # FIXME: this override should be removed once an official and versioned schema is released + # ignore the original implementation logic for a version regex + # since in our case, the VERSION_REGEX is not fulfilled (ie: using 'main' branch, no tag available...) + ext_uri = cls.get_schema_uri() + return obj.stac_extensions is not None and any(uri == ext_uri for uri in obj.stac_extensions) + + @classmethod + def ext(cls, obj: T, add_if_missing: bool = False) -> "Extension[T]": + """Extends the given STAC Object with properties from the + :stac-ext:`Extension`. + + This extension can be applied to instances of :class:`~pystac.Item` or + :class:`~pystac.Asset`. + + Raises: + + pystac.ExtensionTypeError : If an invalid object type is passed. + """ + cls_map = {pystac.Item: MetaItemExtension} + + for key, meta in cls_map.items(): + if isinstance(obj, key): + # cls.ensure_has_extension(obj, add_if_missing) + kls = extend_type(key, meta, cls[key]) + return cast(cls[T], kls(obj)) + else: + raise pystac.ExtensionTypeError(cls._ext_error_message(obj)) + + +def extend_type(stac, cls, ext): + """Create an extension subclass for different STAC objects. + + Note: This is super confusing... we should come up with some better nomenclature. + + Parameters + ---------- + stac: pystac.Item, pystac.Asset, pystac.Collection + The STAC object. + cls: MetaItemExtension + The generic extension class for the STAC object. + ext: MetaExtension[T] + The meta extension class. + """ + cls_name = f"{stac.__name__ }{ext.__name__}" + return types.new_class(cls_name, (cls, ext), {}, lambda ns: ns) + + +class MetaItemExtension: + """A concrete implementation of :class:`Extension` on an :class:`~pystac.Item` + that extends the properties of the Item to include properties defined in the + :stac-ext:`Extension`. + + This class should generally not be instantiated directly. Instead, call + :meth:`Extension.ext` on an :class:`~pystac.Item` to extend it. + """ + def __init__(self, item: pystac.Item): + self.item = item + self.properties = item.properties + + def get_assets( + self, + service_type: Optional[ServiceType] = None, + ) -> dict[str, pystac.Asset]: + """Get the item's assets where eo:bands are defined. + + Args: + service_type: If set, filter the assets such that only those with a + matching :class:`~STACpopulator.stac_utils.ServiceType` are returned. + + Returns: + Dict[str, Asset]: A dictionary of assets that match ``service_type`` + if set or else all of this item's assets were service types are defined. + """ + return { + key: asset + for key, asset in self.item.get_assets().items() + if (service_type is ServiceType and service_type.value in asset.extra_fields) + or any(ServiceType.from_value(field, default=None) is ServiceType for field in asset.extra_fields) + } + + def __repr__(self) -> str: + return f"<{self.__class__.__name__} Item id={self.item.id}>" + + +# TODO: Add the other STAC item meta extensions + +def schema_properties(schema: dict) -> list[str]: + """Return the list of properties described by schema.""" + out = [] + for key, val in schema["properties"].items(): + prefix, name = key.split(":") if ":" in key else (None, key) + out.append(name) + return out + + +def model_from_schema(model_name, schema: dict): + """Create pydantic BaseModel from JSON schema.""" + type_map = {"string": str, "number": float, "integer": int, "boolean": bool, "array": list, "object": dict, + None: Any} + + fields = {} + for key, val in schema["properties"].items(): + prefix, name = key.split(":") if ":" in key else (None, key) + typ = type_map[val.get("type")] + default = ... if key in schema["required"] else None + fields[name] = (typ, Field(default, alias=key)) + return create_model(model_name, **fields) + diff --git a/STACpopulator/extensions/cmip6.py b/STACpopulator/extensions/cmip6.py index 4f848c4..6953bcb 100644 --- a/STACpopulator/extensions/cmip6.py +++ b/STACpopulator/extensions/cmip6.py @@ -51,7 +51,7 @@ SchemaName = Literal["cmip6"] # FIXME: below reference (used as ID in the schema itself) should be updated once the extension is officially released -# SCHEMA_URI: str = "https://stac-extensions.github.io/cmip6/v1.0.0/schema.json" +# SCHEMA_URI: str = "https://raw.githubusercontent.com/stac-extensions/cmip6/refs/heads/main/json-schema/schema.json" # below is the temporary resolvable URI SCHEMA_URI: str = "https://raw.githubusercontent.com/dchandan/stac-extension-cmip6/main/json-schema/schema.json" PREFIX = f"{get_args(SchemaName)[0]}:" diff --git a/STACpopulator/extensions/cordex6.py b/STACpopulator/extensions/cordex6.py new file mode 100644 index 0000000..e16e14b --- /dev/null +++ b/STACpopulator/extensions/cordex6.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +from pathlib import Path +from pydantic import BaseModel, Field, FilePath +from datetime import datetime + +from importlib import reload +import STACpopulator.extensions.base +reload(STACpopulator.extensions.base) +from STACpopulator.extensions.base import THREDDSCatalogDataModel, ExtensionHelper + + +# This is generated using datamodel-codegen + manual edits +class CordexCmip6(ExtensionHelper): + # Fields from schema + activity_id: str = Field(..., alias='cordex6:activity_id') + contact: str = Field(..., alias='cordex6:contact') + # Conventions: str = Field(..., alias='cordex6:Conventions') + creation_date: datetime = Field(..., alias='cordex6:creation_date') + domain_id: str = Field(..., alias='cordex6:domain_id') + domain: str = Field(..., alias='cordex6:domain') + driving_experiment_id: str = Field(..., alias='cordex6:driving_experiment_id') + driving_experiment: str = Field(..., alias='cordex6:driving_experiment') + driving_institution_id: str = Field(..., alias='cordex6:driving_institution_id') + driving_source_id: str = Field(..., alias='cordex6:driving_source_id') + driving_variant_label: str = Field(..., alias='cordex6:driving_variant_label') + frequency: str = Field(..., alias='cordex6:frequency') + grid: str = Field(..., alias='cordex6:grid') + institution: str = Field(..., alias='cordex6:institution') + institution_id: str = Field(..., alias='cordex6:institution_id') + license: str = Field(..., alias='cordex6:license') + mip_era: str = Field(..., alias='cordex6:mip_era') + product: str = Field(..., alias='cordex6:product') + project_id: str = Field(..., alias='cordex6:project_id') + source: str = Field(..., alias='cordex6:source') + source_id: str = Field(..., alias='cordex6:source_id') + source_type: str = Field(..., alias='cordex6:source_type') + tracking_id: str = Field(..., alias='cordex6:tracking_id') + variable_id: str = Field(..., alias='cordex6:variable_id') + version_realization: str = Field(..., alias='cordex6:version_realization') + + # Extra fields + external_variables: str | list[str] + + _prefix: str = "cordex6" + # Note that this is not a STAC item schema, but a schema for the global attributes of the CMIP6 data. + _schema_uri: FilePath = Path(__file__).parent / "schemas" / "cordex6" / "cmip6-cordex-global-attrs-schema.json" + + +# Customize the THREDDSCatalogDataModel +class Cordex6DataModel(THREDDSCatalogDataModel): + properties: CordexCmip6 + + @property + def uid(self) -> str: + """Return a unique ID for CMIP6 data item.""" + keys = [ + "activity_id", + "driving_institution_id", + "driving_source_id", + "institution_id", + "source_id", + "driving_experiment_id", + "driving_variant_label", + "variable_id", + "domain_id", + ] + values = [getattr(self.properties, k) for k in keys] + values.append(self.start_datetime.strftime("%Y%m%d")) + values.append(self.end_datetime.strftime("%Y%m%d")) + return "_".join(values) + + + diff --git a/STACpopulator/extensions/datacube.py b/STACpopulator/extensions/datacube.py index b394416..35cac46 100644 --- a/STACpopulator/extensions/datacube.py +++ b/STACpopulator/extensions/datacube.py @@ -1,7 +1,7 @@ import functools from typing import Any, MutableMapping, MutableSequence -from pystac.extensions.datacube import Dimension, DimensionType, Variable, VariableType +from pystac.extensions.datacube import Dimension, DimensionType, Variable, VariableType, DatacubeExtension from STACpopulator.stac_utils import ncattrs_to_bbox @@ -248,3 +248,10 @@ def temporal_extent(self) -> MutableSequence[str]: start_datetime = cfmeta["time_coverage_start"] end_datetime = cfmeta["time_coverage_end"] return [start_datetime, end_datetime] + + def apply(self, item, add_if_missing:bool = True): + """Apply the Datacube extension to an item.""" + ext = DatacubeExtension.ext(item, add_if_missing=add_if_missing) + ext.apply(dimensions=self.dimensions, variables=self.variables) + return item + diff --git a/STACpopulator/extensions/schemas/cordex6/cmip6-cordex-global-attrs-schema.json b/STACpopulator/extensions/schemas/cordex6/cmip6-cordex-global-attrs-schema.json new file mode 100644 index 0000000..47ee1f5 --- /dev/null +++ b/STACpopulator/extensions/schemas/cordex6/cmip6-cordex-global-attrs-schema.json @@ -0,0 +1,415 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "cmip6-cordex-global-attrs-schema.json#", + "title": "CORDEX-CMIP6 metadata schema for global attributes", + "description": "JSON schema for global attributes of CORDEX-CMIP6 datasets. This schema is automatically generated from the CVs. Manual edits will be overwritten.", + "type": "object", + "properties": { + "cordex6:activity_id": { + "enum": [ + "DD", + "ESD" + ] + }, + "cordex6:contact": { + "type": "string" + }, + "cordex6:Conventions": { + "type": "string" + }, + "cordex6:creation_date": { + "type": "string", + "format": "date-time" + }, + "cordex6:domain_id": { + "enum": [ + "SAM-50", + "CAM-50", + "NAM-50", + "EUR-50", + "AFR-50", + "WAS-50", + "EAS-50", + "CAS-50", + "AUS-50", + "ANT-50", + "ARC-50", + "MED-50", + "MNA-50", + "MNA-25", + "SAM-12", + "CAM-12", + "NAM-12", + "EUR-12", + "AFR-12", + "WAS-12", + "EAS-12", + "CAS-12", + "AUS-12", + "ANT-12", + "ARC-12", + "MED-12", + "MNA-12", + "SAM-25", + "CAM-25", + "NAM-25", + "EUR-25", + "AFR-25", + "WAS-25", + "EAS-25", + "CAS-25", + "AUS-25", + "SEA-25", + "SAM-50i", + "CAM-50i", + "NAM-50i", + "EUR-50i", + "AFR-50i", + "WAS-50i", + "EAS-50i", + "CAS-50i", + "AUS-50i", + "ANT-50i", + "ARC-50i", + "MED-50i", + "MNA-50i", + "MNA-25i", + "EUR-12i", + "SEA-25i" + ] + }, + "cordex6:domain": { + "type": "string" + }, + "cordex6:driving_experiment_id": { + "enum": [ + "evaluation", + "historical", + "ssp119", + "ssp126", + "ssp245", + "ssp370", + "ssp585" + ] + }, + "cordex6:driving_experiment": { + "type": "string" + }, + "cordex6:driving_institution_id": { + "enum": [ + "AER", + "AS-RCEC", + "AWI", + "BCC", + "BNU", + "CAMS", + "CAS", + "CCCR-IITM", + "CCCma", + "CMCC", + "CNRM-CERFACS", + "CSIR-Wits-CSIRO", + "CSIRO", + "CSIRO-ARCCSS", + "CSIRO-COSIMA", + "DKRZ", + "DWD", + "E3SM-Project", + "EC-Earth-Consortium", + "ECMWF", + "FIO-QLNM", + "HAMMOZ-Consortium", + "INM", + "INPE", + "IPSL", + "KIOST", + "LLNL", + "MESSy-Consortium", + "MIROC", + "MOHC", + "MPI-M", + "MRI", + "NASA-GISS", + "NASA-GSFC", + "NCAR", + "NCC", + "NERC", + "NIMS-KMA", + "NIWA", + "NOAA-GFDL", + "NTU", + "NUIST", + "PCMDI", + "PNNL-WACCEM", + "RTE-RRTMGP-Consortium", + "RUBISCO", + "SNU", + "THU", + "UA", + "UCI", + "UHH", + "UTAS", + "UofT" + ] + }, + "cordex6:driving_source_id": { + "enum": [ + "4AOP-v1-5", + "ACCESS-CM2", + "ACCESS-ESM1-5", + "ACCESS-OM2", + "ACCESS-OM2-025", + "ARTS-2-3", + "AWI-CM-1-1-HR", + "AWI-CM-1-1-LR", + "AWI-CM-1-1-MR", + "AWI-ESM-1-1-LR", + "AWI-ESM-2-1-LR", + "BCC-CSM2-HR", + "BCC-CSM2-MR", + "BCC-ESM1", + "CAM-MPAS-HR", + "CAM-MPAS-LR", + "CAMS-CSM1-0", + "CAS-ESM2-0", + "CESM1-1-CAM5-CMIP5", + "CESM1-CAM5-SE-HR", + "CESM1-CAM5-SE-LR", + "CESM1-WACCM-SC", + "CESM2", + "CESM2-FV2", + "CESM2-WACCM", + "CESM2-WACCM-FV2", + "CIESM", + "CMCC-CM2-HR4", + "CMCC-CM2-SR5", + "CMCC-CM2-VHR4", + "CMCC-ESM2", + "CNRM-CM6-1", + "CNRM-CM6-1-HR", + "CNRM-ESM2-1", + "CanESM5", + "CanESM5-1", + "CanESM5-CanOE", + "E3SM-1-0", + "E3SM-1-1", + "E3SM-1-1-ECA", + "E3SM-2-0", + "E3SM-2-0-NARRM", + "EC-Earth3", + "EC-Earth3-AerChem", + "EC-Earth3-CC", + "EC-Earth3-GrIS", + "EC-Earth3-HR", + "EC-Earth3-LR", + "EC-Earth3-Veg", + "EC-Earth3-Veg-LR", + "EC-Earth3P", + "EC-Earth3P-HR", + "EC-Earth3P-VHR", + "ECMWF-IFS-HR", + "ECMWF-IFS-LR", + "ECMWF-IFS-MR", + "FGOALS-f3-H", + "FGOALS-f3-L", + "FGOALS-g3", + "FIO-ESM-2-0", + "GFDL-AM4", + "GFDL-CM4", + "GFDL-CM4C192", + "GFDL-ESM2M", + "GFDL-ESM4", + "GFDL-GRTCODE", + "GFDL-OM4p5B", + "GFDL-RFM-DISORT", + "GISS-E2-1-G", + "GISS-E2-1-G-CC", + "GISS-E2-1-H", + "GISS-E2-2-G", + "GISS-E2-2-H", + "GISS-E3-G", + "HadGEM3-GC31-HH", + "HadGEM3-GC31-HM", + "HadGEM3-GC31-LL", + "HadGEM3-GC31-LM", + "HadGEM3-GC31-MH", + "HadGEM3-GC31-MM", + "HiRAM-SIT-HR", + "HiRAM-SIT-LR", + "ICON-ESM-LR", + "IITM-ESM", + "INM-CM4-8", + "INM-CM5-0", + "INM-CM5-H", + "IPSL-CM5A2-INCA", + "IPSL-CM6A-ATM-HR", + "IPSL-CM6A-ATM-ICO-HR", + "IPSL-CM6A-ATM-ICO-LR", + "IPSL-CM6A-ATM-ICO-MR", + "IPSL-CM6A-ATM-ICO-VHR", + "IPSL-CM6A-ATM-LR-REPROBUS", + "IPSL-CM6A-LR", + "IPSL-CM6A-LR-INCA", + "IPSL-CM6A-MR1", + "KACE-1-0-G", + "KIOST-ESM", + "LBLRTM-12-8", + "MCM-UA-1-0", + "MIROC-ES2H", + "MIROC-ES2H-NB", + "MIROC-ES2L", + "MIROC6", + "MPI-ESM-1-2-HAM", + "MPI-ESM1-2-HR", + "MPI-ESM1-2-LR", + "MPI-ESM1-2-XR", + "MRI-AGCM3-2-H", + "MRI-AGCM3-2-S", + "MRI-ESM2-0", + "NESM3", + "NICAM16-7S", + "NICAM16-8S", + "NICAM16-9S", + "NorCPM1", + "NorESM1-F", + "NorESM2-LM", + "NorESM2-MM", + "PCMDI-test-1-0", + "RRTMG-LW-4-91", + "RRTMG-SW-4-02", + "RTE-RRTMGP-181204", + "SAM0-UNICON", + "TaiESM1", + "TaiESM1-TIMCOM", + "TaiESM1-TIMCOM2", + "UKESM1-0-LL", + "UKESM1-1-LL", + "UKESM1-ice-LL", + "ERA5" + ] + }, + "cordex6:driving_variant_label": { + "type": "string" + }, + "cordex6:frequency": { + "enum": [ + "1hr", + "3hr", + "6hr", + "day", + "fx", + "mon", + "yr" + ] + }, + "cordex6:grid": { + "type": "string" + }, + "cordex6:institution": { + "type": "string" + }, + "cordex6:institution_id": { + "enum": [ + "BCCR-UCAN", + "BOM", + "CCCma", + "CLMcom-CMCC", + "CLMcom-DWD", + "CLMcom-GERICS", + "CLMcom-KIT", + "CNRM-MF", + "GERICS", + "HCLIMcom-DMI", + "HCLIMcom-METNo", + "HCLIMcom-SMHI", + "ICTP", + "IRD-MF", + "KNMI", + "MOHC", + "OURANOS", + "UBA-CIMA-IFAECI", + "UQ-DEC" + ] + }, + "cordex6:license": { + "enum": [ + "https://cordex.org/data-access/cordex-cmip6-data/cordex-cmip6-terms-of-use" + ] + }, + "cordex6:mip_era": { + "type": "string" + }, + "cordex6:product": { + "type": "string" + }, + "cordex6:project_id": { + "enum": [ + "CORDEX" + ] + }, + "cordex6:source": { + "type": "string" + }, + "cordex6:source_id": { + "enum": [ + "CCAM-v2105", + "CCAM-v2112", + "CCAMoc-v2112", + "CNRM-ALADIN64E1", + "CRCM5-SN", + "CanRCM5-SN", + "HCLIM43-ALADIN", + "HadREM3-GA7-05", + "ICON-CLM-202407-1-1", + "RACMO23E", + "REMO2020", + "RegCM5-0", + "WRF451Q" + ] + }, + "cordex6:source_type": { + "enum": [ + "ARCM", + "AORCM", + "AGCM", + "AOGCM" + ] + }, + "cordex6:tracking_id": { + "type": "string" + }, + "cordex6:variable_id": { + "type": "string" + }, + "cordex6:version_realization": { + "type": "string" + } + }, + "required": [ + "cordex6:activity_id", + "cordex6:contact", + "cordex6:Conventions", + "cordex6:creation_date", + "cordex6:domain_id", + "cordex6:domain", + "cordex6:driving_experiment_id", + "cordex6:driving_experiment", + "cordex6:driving_institution_id", + "cordex6:driving_source_id", + "cordex6:driving_variant_label", + "cordex6:frequency", + "cordex6:grid", + "cordex6:institution", + "cordex6:institution_id", + "cordex6:license", + "cordex6:mip_era", + "cordex6:product", + "cordex6:project_id", + "cordex6:source", + "cordex6:source_id", + "cordex6:source_type", + "cordex6:tracking_id", + "cordex6:variable_id", + "cordex6:version_realization" + ] +} \ No newline at end of file diff --git a/STACpopulator/extensions/thredds.py b/STACpopulator/extensions/thredds.py index 19e2f44..2f51ce8 100644 --- a/STACpopulator/extensions/thredds.py +++ b/STACpopulator/extensions/thredds.py @@ -132,3 +132,18 @@ def links(self) -> list[pystac.Link]: url = self.access_urls[ServiceType.httpserver] link = magpie_resource_link(url) return [link] + + def apply(self, item, add_if_missing:bool = False): + """Apply the THREDDS extension to an item.""" + ext = THREDDSExtension.ext(item, add_if_missing=add_if_missing) + ext.apply(services=self.services, links=self.links) + return item + + +# TODO: Validate services links exist ? +# @field_validator("access_urls") +# @classmethod +# def validate_access_urls(cls, value): +# assert len(set(["HTTPServer", "OPENDAP"]).intersection(value.keys())) >= 1, ( +# "Access URLs must include HTTPServer or OPENDAP keys.") +# return value diff --git a/STACpopulator/implementations/Ouranos_CMIP6-CORDEX/__init__.py b/STACpopulator/implementations/Ouranos_CMIP6-CORDEX/__init__.py new file mode 100644 index 0000000..c58ee05 --- /dev/null +++ b/STACpopulator/implementations/Ouranos_CMIP6-CORDEX/__init__.py @@ -0,0 +1,4 @@ +from .add_CORDEX6 import add_parser_args, runner + +__all__ = ["add_parser_args", "runner"] + diff --git a/STACpopulator/implementations/Ouranos_CMIP6-CORDEX/add_CORDEX6.py b/STACpopulator/implementations/Ouranos_CMIP6-CORDEX/add_CORDEX6.py new file mode 100644 index 0000000..3793e60 --- /dev/null +++ b/STACpopulator/implementations/Ouranos_CMIP6-CORDEX/add_CORDEX6.py @@ -0,0 +1,62 @@ +import logging +from requests.sessions import Session + +from STACpopulator.requests import add_request_options, apply_request_options +from STACpopulator.input import ErrorLoader, THREDDSLoader + +LOGGER = logging.getLogger(__name__) +import argparse +from typing import Any +from STACpopulator.populator_base import STACpopulatorBase +from STACpopulator.extensions.cordex6 import Cordex6DataModel + + +class CORDEX_STAC_Populator(STACpopulatorBase): + data_model = Cordex6DataModel + item_geometry_model = None # Unnecessary, but kept for consistency + + def create_stac_item(self, item_name: str, item_data: dict[str, Any]) -> dict[str, Any]: + dm = self.data_model.from_data(item_data) + return dm.stac_item() + + + +# TODO: This probably doesn't need to be copied for every implementation, right ? +def add_parser_args(parser: argparse.ArgumentParser) -> None: + parser.description="CMIP6-CORDEX STAC populator from a THREDDS catalog or NCML XML." + parser.add_argument("stac_host", help="STAC API URL") + parser.add_argument("href", help="URL to a THREDDS catalog or a NCML XML with CMIP6 metadata.") + parser.add_argument("--update", action="store_true", help="Update collection and its items") + parser.add_argument( + "--mode", + choices=["full", "single"], + default="full", + help="Operation mode, processing the full dataset or only the single reference.", + ) + parser.add_argument( + "--config", + type=str, + help=( + "Override configuration file for the populator. " + "By default, uses the adjacent configuration to the implementation class." + ), + ) + add_request_options(parser) + + +def runner(ns: argparse.Namespace) -> int: + LOGGER.info(f"Arguments to call: {vars(ns)}") + + with Session() as session: + apply_request_options(session, ns) + if ns.mode == "full": + data_loader = THREDDSLoader(ns.href, session=session) + else: + # To be implemented + data_loader = ErrorLoader() + + c = CORDEX_STAC_Populator( + ns.stac_host, data_loader, update=ns.update, session=session, config_file=ns.config, log_debug=ns.debug + ) + c.ingest() + return 0 diff --git a/STACpopulator/implementations/Ouranos_CMIP6-CORDEX/collection_config.yml b/STACpopulator/implementations/Ouranos_CMIP6-CORDEX/collection_config.yml new file mode 100644 index 0000000..bd9c42b --- /dev/null +++ b/STACpopulator/implementations/Ouranos_CMIP6-CORDEX/collection_config.yml @@ -0,0 +1,18 @@ +title: CMIP6-CORDEX +id: Ouranos_CMIP6-CORDEX +description: Coordinated Regional Downscaling Experiment phase 6 +keywords: ['CMIP', 'CMIP6', 'WCRP', 'Climate Change', 'CORDEX'] +license: "CC-BY-4.0" +spatialextent: [-180, -90, 180, 90] +temporalextent: ['1850-01-01', '2500-01-01'] + +links: + - rel: about + title : Project homepage + target : https://cordex.org/experiment-guidelines/cordex-cmip6/ + media_type: text/html + - rel: license + title : License + target : https://cordex.org/data-access/cordex-cmip6-data/cordex-cmip6-terms-of-use + media_type: text/plain + diff --git a/STACpopulator/implementations/__init__.py b/STACpopulator/implementations/__init__.py index 80c732b..00565f9 100644 --- a/STACpopulator/implementations/__init__.py +++ b/STACpopulator/implementations/__init__.py @@ -1,8 +1,8 @@ # By adding modules to __all__, they are discoverable by the cli.implementation_modules method and -# become available to be invoked through the CLI. -# All modules in this list must contain two functions: +# become available to be invoked through the CLI. +# All modules in this list must contain two functions: # - add_parser_args(parser: argparse.ArgumentParser) -> None # - adds additional arguments to the given parser needed to run this implementation # - def runner(ns: argparse.Namespace) -> int: # - runs the implementation given a namespace constructed from the parser arguments supplied -__all__ = ["CMIP6_UofT", "DirectoryLoader"] +__all__ = ["CMIP6_UofT", "DirectoryLoader", "Ouranos_CMIP6-CORDEX"] diff --git a/STACpopulator/populator_base.py b/STACpopulator/populator_base.py index 44341f3..0b7fe2f 100644 --- a/STACpopulator/populator_base.py +++ b/STACpopulator/populator_base.py @@ -1,3 +1,4 @@ +import argparse import functools import inspect import json @@ -7,6 +8,7 @@ from datetime import datetime from typing import Any, Dict, List, MutableMapping, Optional, Type, Union + import pystac from requests.sessions import Session @@ -18,6 +20,9 @@ from STACpopulator.input import GenericLoader from STACpopulator.models import AnyGeometry from STACpopulator.stac_utils import load_config, url_validate +from STACpopulator.requests import add_request_options, apply_request_options +from STACpopulator.input import ErrorLoader, GenericLoader, THREDDSLoader + LOGGER = logging.getLogger(__name__) @@ -221,3 +226,5 @@ def ingest(self) -> None: counter += 1 LOGGER.info(f"Processed {counter} data items. {failures} failures") + + diff --git a/pyproject.toml b/pyproject.toml index e3f5b34..ce0447f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ exclude = [ ] [tool.setuptools.package-data] -STACpopulator = ["**/collection_config.yml"] +STACpopulator = ["**/collection_config.yml", "extensions/schemas/**/*.json"] [tool.pytest.ini_options] @@ -150,7 +150,8 @@ keywords = [ "SpatioTemporal Asset Catalog", "Data Ingestion", "THREDDS", - "CMIP6" + "CMIP6", + "CORDEX" ] [project.scripts] diff --git a/tests/test_cordex.py b/tests/test_cordex.py new file mode 100644 index 0000000..12b42d5 --- /dev/null +++ b/tests/test_cordex.py @@ -0,0 +1,24 @@ +from STACpopulator.extensions.cordex6 import Cordex6DataModel + + +def get_test_data(): + import requests + from siphon.catalog import TDSCatalog + import xncml + from STACpopulator.stac_utils import numpy_to_python_datatypes + + cat = TDSCatalog("https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/catalog/birdhouse/disk2/ouranos/CORDEX/CMIP6/DD/NAM-12/OURANOS/MPI-ESM1-2-LR/ssp370/r1i1p1f1/CRCM5/v1-r1/day/tas/v20231208/catalog.html") + + if cat.datasets.items(): + for item_name, ds in cat.datasets.items(): + url = ds.access_urls["NCML"] + r = requests.get(url) + attrs = xncml.Dataset.from_text(r.text).to_cf_dict() + attrs["attributes"] = numpy_to_python_datatypes(attrs["attributes"]) + attrs["access_urls"] = ds.access_urls + return attrs + +def test_item(): + attrs = get_test_data() + model = Cordex6DataModel.from_data(attrs) + model.stac_item()