diff --git a/STACpopulator/extensions/cmip6.py b/STACpopulator/extensions/cmip6.py new file mode 100644 index 0000000..1957571 --- /dev/null +++ b/STACpopulator/extensions/cmip6.py @@ -0,0 +1,368 @@ +import json +from datetime import datetime +from typing import ( + Any, + Generic, + Iterable, + List, + Literal, + MutableMapping, + Type, + TypeVar, + Union, + Optional, + cast, + get_args +) + +import pystac +import pyessv +from pydantic import AnyHttpUrl, BaseModel, ConfigDict, Field, FieldValidationInfo, field_validator +from pydantic.fields import FieldInfo +from pystac.extensions import item_assets +from pystac.extensions.base import ( + ExtensionManagementMixin, + PropertiesExtension, + SummariesExtension, + S, # generic pystac.STACObject +) + +from STACpopulator.models import AnyGeometry +from STACpopulator.stac_utils import ServiceType, collection2literal, ncattrs_to_bbox, ncattrs_to_geometry + +T = TypeVar("T", pystac.Collection, pystac.Item, pystac.Asset, item_assets.AssetDefinition) + +SchemaName = Literal["cmip6"] +# FIXME: below reference (used as ID in the schema itself) should be updated once the extension is officially released +# SCHEMA_URI: str = "https://stac-extensions.github.io/cmip6/v1.0.0/schema.json" +# below is the temporary resolvable URI +SCHEMA_URI: str = "https://raw.githubusercontent.com/TomAugspurger/cmip6/main/json-schema/schema.json" +PREFIX = f"{get_args(SchemaName)[0]}:" + +# CMIP6 controlled vocabulary (CV) +CV = pyessv.WCRP.CMIP6 # noqa + +# Enum classes built from the pyessv' CV +ActivityID = collection2literal(CV.activity_id) +ExperimentID = collection2literal(CV.experiment_id) +Frequency = collection2literal(CV.frequency) +GridLabel = collection2literal(CV.grid_label) +InstitutionID = collection2literal(CV.institution_id) +NominalResolution = collection2literal(CV.nominal_resolution) +Realm = collection2literal(CV.realm) +SourceID = collection2literal(CV.source_id, "source_id") +SourceType = collection2literal(CV.source_type) +SubExperimentID = collection2literal(CV.sub_experiment_id) +TableID = collection2literal(CV.table_id) + + +def add_cmip6_prefix(name: str) -> str: + return PREFIX + name if "datetime" not in name else name + + +class CMIP6Properties(BaseModel, validate_assignment=True): + """Data model for CMIP6 Controlled Vocabulary.""" + + Conventions: str + activity_id: ActivityID + creation_date: datetime + data_specs_version: str + experiment: str + experiment_id: ExperimentID + frequency: Frequency + further_info_url: AnyHttpUrl + grid_label: GridLabel + institution: str + institution_id: InstitutionID + nominal_resolution: NominalResolution + realm: List[Realm] + source: str + source_id: SourceID + source_type: List[SourceType] + sub_experiment: Union[str, Literal["none"]] + sub_experiment_id: SubExperimentID | Literal["none"] + table_id: TableID + variable_id: str + variant_label: str + initialization_index: int + physics_index: int + realization_index: int + forcing_index: int + tracking_id: str = Field("") + version: str = Field("") + product: str + license: str + grid: str + mip_era: str + + model_config = ConfigDict(alias_generator=add_cmip6_prefix, populate_by_name=True, extra="ignore") + + @field_validator("initialization_index", "physics_index", "realization_index", "forcing_index", mode="before") + @classmethod + def only_item(cls, v: list[int], info: FieldValidationInfo): + """Pick single item from list.""" + assert len(v) == 1, f"{info.field_name} must have one item only." + return v[0] + + @field_validator("realm", "source_type", mode="before") + @classmethod + def split(cls, v: str, __info: FieldValidationInfo): + """Split string into list.""" + return v.split(" ") + + @field_validator("version") + @classmethod + def validate_version(cls, v: str, __info: FieldValidationInfo): + assert v[0] == "v", "Version string should begin with a lower case 'v'" + assert v[1:].isdigit(), "All characters in version string, except first, should be digits" + return v + + +class CMIP6Helper: + def __init__(self, attrs: MutableMapping[str, Any], geometry_model: Type[AnyGeometry]): + self.attrs = attrs + self.cmip6_attrs = attrs["attributes"] + self.cfmeta = attrs["groups"]["CFMetadata"]["attributes"] + self.geometry_model = geometry_model + + @property + def uid(self) -> str: + """Return a unique ID for CMIP6 data item.""" + keys = [ + "activity_id", + "institution_id", + "source_id", + "experiment_id", + "variant_label", + "table_id", + "variable_id", + "grid_label", + ] + name = "_".join(self.cmip6_attrs[k] for k in keys) + return name + + @property + def geometry(self) -> AnyGeometry: + return self.geometry_model(**ncattrs_to_geometry(self.attrs)) + + @property + def bbox(self) -> list[float]: + return ncattrs_to_bbox(self.attrs) + + @property + def start_datetime(self) -> datetime: + return self.cfmeta["time_coverage_start"] + + @property + def end_datetime(self) -> datetime: + return self.cfmeta["time_coverage_end"] + + @property + def properties(self) -> CMIP6Properties: + props = CMIP6Properties(**self.cmip6_attrs) + return props + + def stac_item(self) -> "pystac.Item": + item = pystac.Item( + id=self.uid, + geometry=self.geometry.model_dump(), + bbox=self.bbox, + properties={ + "start_datetime": self.start_datetime, + "end_datetime": self.end_datetime, + }, + datetime=None, + ) + item_cmip6 = CMIP6Extension.ext(item, add_if_missing=True) + item_cmip6.apply(self.properties) + return item + + +class CMIP6Extension( + Generic[T], + PropertiesExtension, + ExtensionManagementMixin[Union[pystac.Asset, pystac.Item, pystac.Collection]], +): + @property + def name(self) -> SchemaName: + return get_args(SchemaName)[0] + + def apply( + self, + properties: Union[CMIP6Properties, dict[str, Any]], + ) -> None: + """Applies CMIP6 Extension properties to the extended + :class:`~pystac.Item` or :class:`~pystac.Asset`. + """ + if isinstance(properties, dict): + properties = CMIP6Properties(**properties) + data_json = json.loads(properties.model_dump_json(by_alias=True)) + for prop, val in data_json.items(): + self._set_property(prop, val) + + @classmethod + def get_schema_uri(cls) -> str: + return SCHEMA_URI + + @classmethod + def has_extension(cls, obj: S): + # FIXME: this override should be removed once an official and versioned schema is released + # ignore the original implementation logic for a version regex + # since in our case, the VERSION_REGEX is not fulfilled (ie: using 'main' branch, no tag available...) + ext_uri = cls.get_schema_uri() + return obj.stac_extensions is not None and any(uri == ext_uri for uri in obj.stac_extensions) + + @classmethod + def ext(cls, obj: T, add_if_missing: bool = False) -> "CMIP6Extension[T]": + """Extends the given STAC Object with properties from the + :stac-ext:`CMIP6 Extension `. + + This extension can be applied to instances of :class:`~pystac.Item` or + :class:`~pystac.Asset`. + + Raises: + + pystac.ExtensionTypeError : If an invalid object type is passed. + """ + if isinstance(obj, pystac.Collection): + cls.ensure_has_extension(obj, add_if_missing) + return cast(CMIP6Extension[T], CollectionCMIP6Extension(obj)) + elif isinstance(obj, pystac.Item): + cls.ensure_has_extension(obj, add_if_missing) + return cast(CMIP6Extension[T], ItemCMIP6Extension(obj)) + elif isinstance(obj, pystac.Asset): + cls.ensure_owner_has_extension(obj, add_if_missing) + return cast(CMIP6Extension[T], AssetCMIP6Extension(obj)) + elif isinstance(obj, item_assets.AssetDefinition): + cls.ensure_owner_has_extension(obj, add_if_missing) + return cast(CMIP6Extension[T], ItemAssetsCMIP6Extension(obj)) + else: + raise pystac.ExtensionTypeError(cls._ext_error_message(obj)) + + @classmethod + def summaries( + cls, obj: pystac.Collection, add_if_missing: bool = False + ) -> "SummariesCMIP6Extension": + """Returns the extended summaries object for the given collection.""" + cls.ensure_has_extension(obj, add_if_missing) + return SummariesCMIP6Extension(obj) + + +class ItemCMIP6Extension(CMIP6Extension[pystac.Item]): + """A concrete implementation of :class:`CMIP6Extension` on an :class:`~pystac.Item` + that extends the properties of the Item to include properties defined in the + :stac-ext:`CMIP6 Extension `. + + This class should generally not be instantiated directly. Instead, call + :meth:`CMIP6Extension.ext` on an :class:`~pystac.Item` to extend it. + """ + + def __init__(self, item: pystac.Item): + self.item = item + self.properties = item.properties + + def get_assets( + self, + service_type: Optional[ServiceType] = None, + ) -> dict[str, pystac.Asset]: + """Get the item's assets where eo:bands are defined. + + Args: + service_type: If set, filter the assets such that only those with a + matching :class:`~STACpopulator.stac_utils.ServiceType` are returned. + + Returns: + Dict[str, Asset]: A dictionary of assets that match ``service_type`` + if set or else all of this item's assets were service types are defined. + """ + return { + key: asset + for key, asset in self.item.get_assets().items() + if ( + service_type is ServiceType and service_type.value in asset.extra_fields + ) + or any( + ServiceType.from_value(field, default=None) is ServiceType + for field in asset.extra_fields + ) + } + + def __repr__(self) -> str: + return f"" + + +class ItemAssetsCMIP6Extension(CMIP6Extension[item_assets.AssetDefinition]): + properties: dict[str, Any] + asset_defn: item_assets.AssetDefinition + + def __init__(self, item_asset: item_assets.AssetDefinition): + self.asset_defn = item_asset + self.properties = item_asset.properties + + +class AssetCMIP6Extension(CMIP6Extension[pystac.Asset]): + """A concrete implementation of :class:`CMIP6Extension` on an :class:`~pystac.Asset` + that extends the Asset fields to include properties defined in the + :stac-ext:`CMIP6 Extension `. + + This class should generally not be instantiated directly. Instead, call + :meth:`CMIP6Extension.ext` on an :class:`~pystac.Asset` to extend it. + """ + + asset_href: str + """The ``href`` value of the :class:`~pystac.Asset` being extended.""" + + properties: dict[str, Any] + """The :class:`~pystac.Asset` fields, including extension properties.""" + + additional_read_properties: Optional[Iterable[dict[str, Any]]] = None + """If present, this will be a list containing 1 dictionary representing the + properties of the owning :class:`~pystac.Item`.""" + + def __init__(self, asset: pystac.Asset): + self.asset_href = asset.href + self.properties = asset.extra_fields + if asset.owner and isinstance(asset.owner, pystac.Item): + self.additional_read_properties = [asset.owner.properties] + + def __repr__(self) -> str: + return f"" + + +class SummariesCMIP6Extension(SummariesExtension): + """A concrete implementation of :class:`~SummariesExtension` that extends + the ``summaries`` field of a :class:`~pystac.Collection` to include properties + defined in the :stac-ext:`CMIP6 `. + """ + def _check_cmip6_property(self, prop: str) -> FieldInfo: + try: + return CMIP6Properties.model_fields[prop] + except KeyError: + raise AttributeError(f"Name '{prop}' is not a valid CMIP6 property.") + + def _validate_cmip6_property(self, prop: str, summaries: list[Any]) -> None: + model = CMIP6Properties.model_construct() + validator = CMIP6Properties.__pydantic_validator__ + for value in summaries: + validator.validate_assignment(model, prop, value) + + def get_cmip6_property(self, prop: str) -> list[Any]: + self._check_cmip6_property(prop) + return self.summaries.get_list(prop) + + def set_cmip6_property(self, prop: str, summaries: list[Any]) -> None: + self._check_cmip6_property(prop) + self._validate_cmip6_property(prop, summaries) + self._set_summary(prop, summaries) + + def __getattr__(self, prop): + return self.get_cmip6_property(prop) + + def __setattr__(self, prop, value): + self.set_cmip6_property(prop, value) + + +class CollectionCMIP6Extension(CMIP6Extension[pystac.Collection]): + + def __init__(self, collection: pystac.Collection): + self.collection = collection diff --git a/STACpopulator/implementations/CMIP6_UofT/extensions.py b/STACpopulator/extensions/datacube.py similarity index 92% rename from STACpopulator/implementations/CMIP6_UofT/extensions.py rename to STACpopulator/extensions/datacube.py index 8027cf9..17fd112 100644 --- a/STACpopulator/implementations/CMIP6_UofT/extensions.py +++ b/STACpopulator/extensions/datacube.py @@ -1,4 +1,5 @@ import functools +from typing import Any, MutableMapping, MutableSequence from pystac.extensions.datacube import Dimension, DimensionType, Variable, VariableType @@ -10,18 +11,14 @@ class DataCubeHelper: axis = {"X": "x", "Y": "y", "Z": "z", "T": None, "longitude": "x", "latitude": "y", "vertical": "z", "time": "t"} - def __init__(self, attrs: dict): + def __init__(self, attrs: MutableMapping[str, Any]): """ Create STAC Item from CF JSON metadata. Parameters ---------- - iid : str - Unique item ID. attrs: dict CF JSON metadata returned by `xncml.Dataset.to_cf_dict`. - datamodel : pydantic.BaseModel, optional - Data model for validating global attributes. """ self.attrs = attrs @@ -142,10 +139,10 @@ def __init__(self, attrs: dict): @property @functools.cache - def dimensions(self) -> dict: - """Return Dimension objects required for Datacube extension.""" - - + def dimensions(self) -> dict[str, Dimension]: + """ + Return Dimension objects required for Datacube extension. + """ dims = {} for name, length in self.attrs["dimensions"].items(): v = self.attrs["variables"].get(name) @@ -172,7 +169,10 @@ def dimensions(self) -> dict: properties = dict( type=type_.value, extent=extent, - description=v.get("description", v.get("long_name", criteria["standard_name"][0])) or "", + description=v.get( + "description", + v.get("long_name", criteria["standard_name"][0]) + ) or "", ) if type_ == DimensionType.SPATIAL: properties["axis"] = axis @@ -183,7 +183,7 @@ def dimensions(self) -> dict: @property @functools.cache - def variables(self) -> dict: + def variables(self) -> dict[str, Variable]: """Return Variable objects required for Datacube extension.""" variables = {} @@ -202,9 +202,7 @@ def variables(self) -> dict: ) return variables - # @property - # @functools.cache - def is_coordinate(self, attrs: dict) -> bool: + def is_coordinate(self, attrs: MutableMapping[str, Any]) -> bool: """Return whether variable is a coordinate.""" for key, criteria in self.coordinate_criteria.items(): for criterion, expected in criteria.items(): @@ -212,7 +210,7 @@ def is_coordinate(self, attrs: dict) -> bool: return True return False - def temporal_extent(self): + def temporal_extent(self) -> MutableSequence[str]: cfmeta = self.attrs["groups"]["CFMetadata"]["attributes"] start_datetime = cfmeta["time_coverage_start"] end_datetime = cfmeta["time_coverage_end"] diff --git a/STACpopulator/extensions/thredds.py b/STACpopulator/extensions/thredds.py new file mode 100644 index 0000000..19e2f44 --- /dev/null +++ b/STACpopulator/extensions/thredds.py @@ -0,0 +1,134 @@ +from typing import Generic, TypeVar, Union, cast + +import pystac +from pystac.extensions.base import ( + ExtensionManagementMixin, + PropertiesExtension, +) + +from STACpopulator.stac_utils import ServiceType, magpie_resource_link + +T = TypeVar("T", pystac.Collection, pystac.Item) + + +class THREDDSMetadata: + media_types = { + ServiceType.httpserver: "application/x-netcdf", + ServiceType.opendap: pystac.MediaType.HTML, + ServiceType.wcs: pystac.MediaType.XML, + ServiceType.wms: pystac.MediaType.XML, + ServiceType.netcdfsubset: "application/x-netcdf", + } + + asset_roles = { + ServiceType.httpserver: ["data"], + ServiceType.opendap: ["data"], + ServiceType.wcs: ["data"], + ServiceType.wms: ["visual"], + ServiceType.netcdfsubset: ["data"], + } + service_type: ServiceType + + +class THREDDSExtension( + Generic[T], + PropertiesExtension, + ExtensionManagementMixin[Union[pystac.Item, pystac.Collection]], + THREDDSMetadata, +): + def __init__(self, obj: Union[pystac.Item, pystac.Collection]): + self.obj = obj + + def apply( + self, + services: list["THREDDSService"], + links: list[pystac.Link], + ): + for svc in services: + key = svc.service_type.value + self.obj.add_asset(key, svc.get_asset()) + for link in links: + self.obj.add_link(link) + + @classmethod + def get_schema_uri(cls) -> str: + return "" + + @classmethod + def ext(cls, obj: T, add_if_missing: bool = False) -> "THREDDSExtension[T]": + """Extends the given STAC Object with properties from the + :stac-ext:`THREDDS Extension `. + + This extension can be applied to instances of :class:`~pystac.Item` or + :class:`~pystac.Asset`. + + Raises: + + pystac.ExtensionTypeError : If an invalid object type is passed. + """ + if isinstance(obj, pystac.Collection): + return cast(THREDDSExtension[T], CollectionTHREDDSExtension(obj)) + elif isinstance(obj, pystac.Item): + return cast(THREDDSExtension[T], ItemTHREDDSExtension(obj)) + else: + raise pystac.ExtensionTypeError(cls._ext_error_message(obj)) + + +class THREDDSService(THREDDSMetadata): + def __init__(self, service_type: ServiceType, href: str): + self.service_type = service_type + self.href = href + + def get_asset(self) -> pystac.Asset: + asset = pystac.Asset( + href=self.href, + media_type=str(self.media_types.get(self.service_type) or ""), + roles=self.asset_roles.get(self.service_type) or [], + ) + return asset + + +class ItemTHREDDSExtension(THREDDSExtension[pystac.Item]): + item: pystac.Item + """The :class:`~pystac.Item` being extended.""" + + def __init__(self, item: pystac.Item): + self.item = item + self.properties = item.properties + super().__init__(self.item) + + def __repr__(self) -> str: + return f"" + + +class CollectionTHREDDSExtension(THREDDSExtension[pystac.Item]): + + def __init__(self, collection: pystac.Collection): + super().__init__(collection) + + def __repr__(self) -> str: + return f"" + + +class THREDDSHelper: + def __init__(self, access_urls: dict[str, str]): + self.access_urls = { + ServiceType.from_value(svc): url + for svc, url in access_urls.items() + } + + @property + def services(self) -> list[THREDDSService]: + return [ + THREDDSService( + service_type=svc_type, + href=href, + ) + for svc_type, href in self.access_urls.items() + ] + + @property + def links(self) -> list[pystac.Link]: + url = self.access_urls[ServiceType.httpserver] + link = magpie_resource_link(url) + return [link] diff --git a/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py b/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py index c321a3a..2a3e3cc 100644 --- a/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py +++ b/STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py @@ -1,105 +1,25 @@ import argparse import json import os -from datetime import datetime -from typing import Any, List, Literal, MutableMapping, NoReturn, Optional, Union +from typing import Any, MutableMapping, NoReturn, Optional, Union -import pydantic_core -import pyessv from requests.sessions import Session -from pydantic import AnyHttpUrl, ConfigDict, Field, FieldValidationInfo, field_validator from pystac.extensions.datacube import DatacubeExtension from STACpopulator.cli import add_request_options, apply_request_options -from STACpopulator.implementations.CMIP6_UofT.extensions import DataCubeHelper +from STACpopulator.extensions.cmip6 import CMIP6Properties, CMIP6Helper +from STACpopulator.extensions.datacube import DataCubeHelper +from STACpopulator.extensions.thredds import THREDDSHelper, THREDDSExtension from STACpopulator.input import GenericLoader, ErrorLoader, THREDDSLoader -from STACpopulator.models import GeoJSONPolygon, STACItemProperties +from STACpopulator.models import GeoJSONPolygon from STACpopulator.populator_base import STACpopulatorBase -from STACpopulator.stac_utils import get_logger, STAC_item_from_metadata, collection2literal +from STACpopulator.stac_utils import get_logger LOGGER = get_logger(__name__) -# CMIP6 controlled vocabulary (CV) -CV = pyessv.WCRP.CMIP6 - -# Enum classes built from the pyessv' CV -ActivityID = collection2literal(CV.activity_id) -ExperimentID = collection2literal(CV.experiment_id) -Frequency = collection2literal(CV.frequency) -GridLabel = collection2literal(CV.grid_label) -InstitutionID = collection2literal(CV.institution_id) -NominalResolution = collection2literal(CV.nominal_resolution) -Realm = collection2literal(CV.realm) -SourceID = collection2literal(CV.source_id, "source_id") -SourceType = collection2literal(CV.source_type) -SubExperimentID = collection2literal(CV.sub_experiment_id) -TableID = collection2literal(CV.table_id) - - -def add_cmip6_prefix(name: str) -> str: - return "cmip6:" + name if "datetime" not in name else name - - -class CMIP6ItemProperties(STACItemProperties, validate_assignment=True): - """Data model for CMIP6 Controlled Vocabulary.""" - - Conventions: str - activity_id: ActivityID - creation_date: datetime - data_specs_version: str - experiment: str - experiment_id: ExperimentID - frequency: Frequency - further_info_url: AnyHttpUrl - grid_label: GridLabel - institution: str - institution_id: InstitutionID - nominal_resolution: NominalResolution - realm: List[Realm] - source: str - source_id: SourceID - source_type: List[SourceType] - sub_experiment: str | Literal["none"] - sub_experiment_id: SubExperimentID | Literal["none"] - table_id: TableID - variable_id: str - variant_label: str - initialization_index: int - physics_index: int - realization_index: int - forcing_index: int - tracking_id: str = Field("") - version: str = Field("") - product: str - license: str - grid: str - mip_era: str - - model_config = ConfigDict(alias_generator=add_cmip6_prefix, populate_by_name=True) - - @field_validator("initialization_index", "physics_index", "realization_index", "forcing_index", mode="before") - @classmethod - def only_item(cls, v: list[int], info: FieldValidationInfo): - """Pick single item from list.""" - assert len(v) == 1, f"{info.field_name} must have one item only." - return v[0] - - @field_validator("realm", "source_type", mode="before") - @classmethod - def split(cls, v: str, info: FieldValidationInfo): - """Split string into list.""" - return v.split(" ") - - @field_validator("version") - @classmethod - def validate_version(cls, v: str, info: FieldValidationInfo): - assert v[0] == "v", "Version string should begin with a lower case 'v'" - assert v[1:].isdigit(), "All characters in version string, except first, should be digits" - return v - class CMIP6populator(STACpopulatorBase): - item_properties_model = CMIP6ItemProperties + item_properties_model = CMIP6Properties item_geometry_model = GeoJSONPolygon def __init__( @@ -113,6 +33,7 @@ def __init__( """Constructor :param stac_host: URL to the STAC API + :type stac_host: str :param data_loader: loader to iterate over ingestion data. """ super().__init__( @@ -123,22 +44,6 @@ def __init__( config_file=config_file, ) - @staticmethod - def make_cmip6_item_id(attrs: MutableMapping[str, Any]) -> str: - """Return a unique ID for CMIP6 data item.""" - keys = [ - "activity_id", - "institution_id", - "source_id", - "experiment_id", - "variant_label", - "table_id", - "variable_id", - "grid_label", - ] - name = "_".join(attrs[k] for k in keys) - return name - def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) -> MutableMapping[str, Any]: """Creates the STAC item. @@ -149,18 +54,13 @@ def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) :return: _description_ :rtype: MutableMapping[str, Any] """ - iid = self.make_cmip6_item_id(item_data["attributes"]) - + # Add CMIP6 extension try: - item = STAC_item_from_metadata(iid, item_data, self.item_properties_model, self.item_geometry_model) - except pydantic_core._pydantic_core.ValidationError: - print(f"ERROR: ValidationError for {iid}") - return -1 - - # Add the CMIP6 STAC extension - item.stac_extensions.append( - "https://raw.githubusercontent.com/TomAugspurger/cmip6/main/json-schema/schema.json" - ) + cmip_helper = CMIP6Helper(item_data, self.item_geometry_model) + item = cmip_helper.stac_item() + except Exception: + LOGGER.error("Failed to add CMIP6 extension to item %s", item_name) + raise # Add datacube extension try: @@ -168,16 +68,25 @@ def create_stac_item(self, item_name: str, item_data: MutableMapping[str, Any]) dc_ext = DatacubeExtension.ext(item, add_if_missing=True) dc_ext.apply(dimensions=dc_helper.dimensions, variables=dc_helper.variables) except Exception: - LOGGER.warning(f"Failed to add Datacube extension to item {item_name}") + LOGGER.error("Failed to add Datacube extension to item %s", item_name) + raise + + try: + thredds_helper = THREDDSHelper(item_data["access_urls"]) + thredds_ext = THREDDSExtension.ext(item) + thredds_ext.apply(thredds_helper.services, thredds_helper.links) + except Exception: + LOGGER.error("Failed to add THREDDS references to item %s", item_name) + raise # print(json.dumps(item.to_dict())) return json.loads(json.dumps(item.to_dict())) def make_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(description="CMIP6 STAC populator") + parser = argparse.ArgumentParser(description="CMIP6 STAC populator from a THREDDS catalog or NCML XML.") parser.add_argument("stac_host", type=str, help="STAC API address") - parser.add_argument("thredds_catalog_URL", type=str, help="URL to the CMIP6 THREDDS catalog") + parser.add_argument("href", type=str, help="URL to a THREDDS catalog or a NCML XML with CMIP6 metadata.") parser.add_argument("--update", action="store_true", help="Update collection and its items") parser.add_argument("--mode", choices=["full", "single"], default="full", help="Operation mode, processing the full dataset or only the single reference.") @@ -197,7 +106,7 @@ def runner(ns: argparse.Namespace) -> Optional[int] | NoReturn: with Session() as session: apply_request_options(session, ns) if ns.mode == "full": - data_loader = THREDDSLoader(ns.thredds_catalog_URL, session=session) + data_loader = THREDDSLoader(ns.href, session=session) else: # To be implemented data_loader = ErrorLoader() diff --git a/STACpopulator/implementations/DirectoryLoader/crawl_directory.py b/STACpopulator/implementations/DirectoryLoader/crawl_directory.py index 37742d1..7a2651e 100644 --- a/STACpopulator/implementations/DirectoryLoader/crawl_directory.py +++ b/STACpopulator/implementations/DirectoryLoader/crawl_directory.py @@ -6,7 +6,7 @@ from STACpopulator.cli import add_request_options, apply_request_options from STACpopulator.input import STACDirectoryLoader -from STACpopulator.models import GeoJSONPolygon, STACItemProperties +from STACpopulator.models import GeoJSONPolygon from STACpopulator.populator_base import STACpopulatorBase from STACpopulator.stac_utils import get_logger @@ -14,7 +14,6 @@ class DirectoryPopulator(STACpopulatorBase): - item_properties_model = STACItemProperties item_geometry_model = GeoJSONPolygon def __init__( diff --git a/STACpopulator/models.py b/STACpopulator/models.py index f91dab5..e3296aa 100644 --- a/STACpopulator/models.py +++ b/STACpopulator/models.py @@ -36,71 +36,10 @@ class GeoJSONMultiPolygon(Geometry): coordinates: List[List[List[List[float]]]] -class Asset(BaseModel): - href: AnyHttpUrl - media_type: Optional[str] = None - title: Optional[str] = None - description: Optional[str] = None - roles: Optional[List[str]] = None - - -class STACItemProperties(BaseModel): - """Base STAC Item properties data model. In concrete implementations, users would want to define a new - data model that inherits from this base model and extends it with properties tailored to the data they are - ingesting.""" - - start_datetime: Optional[dt.datetime] = None - end_datetime: Optional[dt.datetime] = None - datetime: Optional[dt.datetime] = None - - @field_validator("datetime", mode="before") - @classmethod - def validate_datetime(cls, v: Union[dt.datetime, str], values: Dict[str, Any]) -> dt: - if v == "null": - if not values["start_datetime"] and not values["end_datetime"]: - raise ValueError("start_datetime and end_datetime must be specified when datetime is null") - - -# class Link(BaseModel): -# """ -# https://github.com/radiantearth/stac-spec/blob/v1.0.0/collection-spec/collection-spec.md#link-object -# """ - -# href: str = Field(..., alias="href", min_length=1) -# rel: str = Field(..., alias="rel", min_length=1) -# type: Optional[str] = None -# title: Optional[str] = None -# # Label extension -# label: Optional[str] = Field(None, alias="label:assets") -# model_config = ConfigDict(use_enum_values=True) - -# def resolve(self, base_url: str) -> None: -# """resolve a link to the given base URL""" -# self.href = urljoin(base_url, self.href) - - -# class PaginationLink(Link): -# """ -# https://github.com/radiantearth/stac-api-spec/blob/master/api-spec.md#paging-extension -# """ - -# rel: Literal["next", "previous"] -# method: Literal["GET", "POST"] -# body: Optional[Dict[Any, Any]] = None -# merge: bool = False - - -# Links = RootModel[List[Union[PaginationLink, Link]]] - - -class STACItem(BaseModel): - """STAC Item data model.""" - - id: str = Field(..., alias="id", min_length=1) - geometry: Optional[SerializeAsAny[Geometry]] = None - bbox: Optional[List[float]] = None - properties: Optional[SerializeAsAny[STACItemProperties]] = None - assets: Dict[str, Asset] = None - stac_extensions: Optional[List[AnyUrl]] = [] - collection: Optional[str] = None - datetime: Optional[dt.datetime] = None # Not in the spec, but needed by pystac.Item. +AnyGeometry = Union[ + Geometry, + GeoJSONPoint, + GeoJSONMultiPoint, + GeoJSONPolygon, + GeoJSONMultiPolygon, +] diff --git a/STACpopulator/populator_base.py b/STACpopulator/populator_base.py index 3d9fa1c..762c5b9 100644 --- a/STACpopulator/populator_base.py +++ b/STACpopulator/populator_base.py @@ -3,7 +3,7 @@ import os from abc import ABC, abstractmethod from datetime import datetime -from typing import Any, MutableMapping, Optional, Union +from typing import Any, MutableMapping, Optional, Type, Union import pystac from requests.sessions import Session @@ -14,6 +14,7 @@ stac_host_reachable, ) from STACpopulator.input import GenericLoader +from STACpopulator.models import AnyGeometry from STACpopulator.stac_utils import get_logger, load_config, url_validate @@ -91,16 +92,10 @@ def collection_id(self) -> str: @property @abstractmethod - def item_properties_model(self): - """In derived classes, this property should be defined as a pydantic data model that derives from - models.STACItemProperties.""" - raise NotImplementedError - - @property - @abstractmethod - def item_geometry_model(self): - """In derived classes, this property should be defined as a pydantic data model that derives from - models.STACItemProperties.""" + def item_geometry_model(self) -> Type[AnyGeometry]: + """ + Implementation of the expected Geometry representation in derived classes. + """ raise NotImplementedError @abstractmethod @@ -153,7 +148,7 @@ def ingest(self) -> None: for item_name, item_data in self._ingest_pipeline: LOGGER.info(f"Creating STAC representation for {item_name}") stac_item = self.create_stac_item(item_name, item_data) - if stac_item != -1: + if stac_item: post_stac_item( self.stac_host, self.collection_id, diff --git a/STACpopulator/stac_utils.py b/STACpopulator/stac_utils.py index fa984c6..5c3aa5d 100644 --- a/STACpopulator/stac_utils.py +++ b/STACpopulator/stac_utils.py @@ -1,16 +1,14 @@ -import json import logging import os import re -from typing import Any, Literal, MutableMapping, Union +from enum import Enum +from typing import Any, Literal, MutableMapping, Type, Union import numpy as np import pystac import yaml from colorlog import ColoredFormatter -from STACpopulator.models import STACItem - def get_logger( name: str, @@ -74,9 +72,9 @@ def load_config( return config_info -def collection2literal(collection, property="label"): +def collection2literal(collection, property="label") -> "Type[Literal]": terms = tuple(getattr(term, property) for term in collection) - return Literal[terms] + return Literal[terms] # noqa def ncattrs_to_geometry(attrs: MutableMapping[str, Any]) -> MutableMapping[str, Any]: @@ -157,73 +155,43 @@ def magpie_resource_link(url: str) -> pystac.Link: return link -def STAC_item_from_metadata(iid: str, attrs: MutableMapping[str, Any], item_props_datamodel, item_geometry_model): - """ - Create STAC Item from CF JSON metadata. - - Parameters - ---------- - iid : str - Unique item ID. - attrs: dict - CF JSON metadata returned by `xncml.Dataset.to_cf_dict`. - item_props_datamodel : pydantic.BaseModel - Data model describing the properties of the STAC item. - item_geometry_model : pydantic.BaseModel - Data model describing the geometry of the STAC item. - """ - - cfmeta = attrs["groups"]["CFMetadata"]["attributes"] - - # Create pydantic STAC item - item = STACItem( - id=iid, - geometry=item_geometry_model(**ncattrs_to_geometry(attrs)), - bbox=ncattrs_to_bbox(attrs), - properties=item_props_datamodel( - start_datetime=cfmeta["time_coverage_start"], - end_datetime=cfmeta["time_coverage_end"], - **attrs["attributes"], - ), - datetime=None, - ) - - # Convert pydantic STAC item to a PySTAC Item - item = pystac.Item(**json.loads(item.model_dump_json(by_alias=True))) - - root = attrs["access_urls"] - - for name, url in root.items(): - name = str(name) # converting name from siphon.catalog.CaseInsensitiveStr to str - asset = pystac.Asset(href=url, media_type=media_types.get(name), roles=asset_roles.get(name)) - - item.add_asset(name, asset) - - item.add_link(magpie_resource_link(root["HTTPServer"])) - - return item - - -asset_name_remaps = { - "httpserver_service": "HTTPServer", - "opendap_service": "OPENDAP", - "wcs_service": "WCS", - "wms_service": "WMS", - "nccs_service": "NetcdfSubset", -} - -media_types = { - "HTTPServer": "application/x-netcdf", - "OPENDAP": pystac.MediaType.HTML, - "WCS": pystac.MediaType.XML, - "WMS": pystac.MediaType.XML, - "NetcdfSubset": "application/x-netcdf", -} - -asset_roles = { - "HTTPServer": ["data"], - "OPENDAP": ["data"], - "WCS": ["data"], - "WMS": ["visual"], - "NetcdfSubset": ["data"], -} +class ServiceType(Enum): + adde = "ADDE" + dap4 = "DAP4" + dods = "DODS" # same as OpenDAP + opendap = "OpenDAP" + opendapg = "OpenDAPG" + netcdfsubset = "NetcdfSubset" + cdmremote = "CdmRemote" + cdmfeature = "CdmFeature" + ncjson = "ncJSON" + h5service = "H5Service" + httpserver = "HTTPServer" + ftp = "FTP" + gridftp = "GridFTP" + file = "File" + iso = "ISO" + las = "LAS" + ncml = "NcML" + uddc = "UDDC" + wcs = "WCS" + wms = "WMS" + wsdl = "WSDL" + webform = "WebForm" + catalog = "Catalog" + compound = "Compound" + resolver = "Resolver" + thredds = "THREDDS" + + @classmethod + def from_value(cls, value: str, default: Any = KeyError) -> "ServiceType": + """Return value irrespective of case.""" + try: + svc = value.lower() + if svc.endswith("_service"): # handle NCML edge cases + svc = svc.rsplit("_", 1)[0] + return cls[svc] + except KeyError: + if default is not KeyError: + return default + raise diff --git a/pyproject.toml b/pyproject.toml index 3c5fdad..00955d5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -86,7 +86,7 @@ dev = [ "responses", "bump-my-version", "jsonschema", - "pystac[validation]" + "pystac[validation]>=1.9.0" ] [tool.pytest.ini_options] diff --git a/tests/data/stac_item_testdata_xclim_cmip6_ncml.json b/tests/data/stac_item_testdata_xclim_cmip6_ncml.json index f3b8c23..2ff83d1 100644 --- a/tests/data/stac_item_testdata_xclim_cmip6_ncml.json +++ b/tests/data/stac_item_testdata_xclim_cmip6_ncml.json @@ -85,7 +85,7 @@ "data" ] }, - "OPENDAP": { + "OpenDAP": { "href": "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/birdhouse/testdata/xclim/cmip6/sic_SImon_CCCma-CanESM5_ssp245_r13i1p2f1_2020.nc", "type": "text/html", "roles": [ @@ -120,5 +120,7 @@ 359.99493408203125, 89.74176788330078 ], - "stac_extensions": [] -} \ No newline at end of file + "stac_extensions": [ + "https://raw.githubusercontent.com/TomAugspurger/cmip6/main/json-schema/schema.json" + ] +} diff --git a/tests/test_cmip6_datacube.py b/tests/test_cmip6_datacube.py index 0e7dd91..9adafa0 100644 --- a/tests/test_cmip6_datacube.py +++ b/tests/test_cmip6_datacube.py @@ -1,9 +1,11 @@ +import pystac import xncml from pathlib import Path -from STACpopulator.implementations.CMIP6_UofT.extensions import DataCubeHelper +from pystac.validation import validate_dict + +from STACpopulator.extensions.datacube import DataCubeHelper +from STACpopulator.extensions.cmip6 import CMIP6Helper from pystac.extensions.datacube import DatacubeExtension -from STACpopulator.stac_utils import STAC_item_from_metadata -from STACpopulator.implementations.CMIP6_UofT.add_CMIP6 import CMIP6ItemProperties from STACpopulator.models import GeoJSONPolygon DIR = Path(__file__).parent @@ -11,17 +13,25 @@ def test_datacube_helper(): # Create item - ds = xncml.Dataset(filepath=DIR / "data" / "o3_Amon_GFDL-ESM4_historical_r1i1p1f1_gr1_185001-194912.xml") + file_path = DIR / "data" / "o3_Amon_GFDL-ESM4_historical_r1i1p1f1_gr1_185001-194912.xml" + ds = xncml.Dataset(filepath=str(file_path)) attrs = ds.to_cf_dict() attrs["access_urls"] = {"HTTPServer": "http://example.com"} - item = STAC_item_from_metadata("test", attrs, CMIP6ItemProperties, GeoJSONPolygon) + item = CMIP6Helper(attrs, GeoJSONPolygon).stac_item() # Add extension dc = DataCubeHelper(attrs) dc_ext = DatacubeExtension.ext(item, add_if_missing=True) dc_ext.apply(dimensions=dc.dimensions, variables=dc.variables) - schemas = item.validate() + # same thing as 'item.validate()' but omit the missing CMIP6 that is not official + schemas = validate_dict( + stac_dict=item.to_dict(), + stac_object_type=item.STAC_OBJECT_TYPE, + stac_version=pystac.get_stac_version(), + extensions=[DatacubeExtension.get_schema_uri()], + href=item.get_self_href(), + ) assert len(schemas) >= 2 assert "item.json" in schemas[0] assert "datacube" in schemas[1] diff --git a/tests/test_standalone_stac_item.py b/tests/test_standalone_stac_item.py index 3163cd5..82de64e 100644 --- a/tests/test_standalone_stac_item.py +++ b/tests/test_standalone_stac_item.py @@ -7,10 +7,11 @@ import xncml -from STACpopulator.implementations.CMIP6_UofT.add_CMIP6 import CMIP6ItemProperties, CMIP6populator +from STACpopulator.extensions.cmip6 import CMIP6Helper +from STACpopulator.extensions.thredds import THREDDSHelper, THREDDSExtension +from STACpopulator.implementations.CMIP6_UofT.add_CMIP6 import CMIP6populator from STACpopulator.input import THREDDSLoader from STACpopulator.models import GeoJSONPolygon -from STACpopulator.stac_utils import STAC_item_from_metadata CUR_DIR = os.path.dirname(__file__) @@ -41,9 +42,10 @@ def test_standalone_stac_item_thredds_ncml(): "WMS": f"{thredds_url}/wms/{thredds_path}/{thredds_nc}?service=WMS&version=1.3.0&request=GetCapabilities", "NetcdfSubset": f"{thredds_url}/ncss/{thredds_path}/{thredds_nc}/dataset.html", } - - stac_item_id = CMIP6populator.make_cmip6_item_id(attrs["attributes"]) - stac_item = STAC_item_from_metadata(stac_item_id, attrs, CMIP6ItemProperties, GeoJSONPolygon) + stac_item = CMIP6Helper(attrs, GeoJSONPolygon).stac_item() + thredds_helper = THREDDSHelper(attrs["access_urls"]) + thredds_ext = THREDDSExtension.ext(stac_item) + thredds_ext.apply(services=thredds_helper.services, links=thredds_helper.links) ref_file = os.path.join(CUR_DIR, "data/stac_item_testdata_xclim_cmip6_ncml.json") with open(ref_file, mode="r", encoding="utf-8") as ff: