Skip to content

Commit

Permalink
[#8] initial adapter for timeseries metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
pkdash committed Mar 27, 2024
1 parent 43a88f4 commit d6779ae
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 0 deletions.
248 changes: 248 additions & 0 deletions hsextract/adapters/hydroshare.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,3 +716,251 @@ def to_catalog_record(aggr_metadata: dict):
"""Converts extracted feature aggregation metadata to a catalog dataset record"""
aggr_model = _FeatureAggregationMetadata(**aggr_metadata)
return aggr_model.to_catalog_dataset()


class TimeseriesResult(BaseModel):
class Method(BaseModel):
method_code: str
method_name: str
method_type: str
method_description: str

def to_schema_method(self):
method = schema.PropertyValue.construct()
method.name = "method"
method.value = []
method_code = schema.PropertyValue.construct()
method_code.name = "methodCode"
method_code.value = self.method_code
method.value.append(method_code)
method_name = schema.PropertyValue.construct()
method_name.name = "methodName"
method_name.value = self.method_name
method.value.append(method_name)
method_type = schema.PropertyValue.construct()
method_type.name = "methodType"
method_type.value = self.method_type
method.value.append(method_type)
method_description = schema.PropertyValue.construct()
method_description.name = "methodDescription"
method_description.value = self.method_description
method.value.append(method_description)
return method

class ProcessingLevel(BaseModel):
definition: str
processing_level_code: str
explanation: str

def to_schema_processing_level(self):
processing_level = schema.PropertyValue.construct()
processing_level.name = "processingLevel"
processing_level.value = []
definition = schema.PropertyValue.construct()
definition.name = "definition"
definition.value = self.definition
processing_level.value.append(definition)
code = schema.PropertyValue.construct()
code.name = "processingLevelCode"
code.value = self.processing_level_code
processing_level.value.append(code)
explanation = schema.PropertyValue.construct()
explanation.name = "explanation"
explanation.value = self.explanation
processing_level.value.append(explanation)
return processing_level

class Site(BaseModel):
site_name: str
site_type: str
site_code: str
elevation_datum: str
elevation_m: str
latitude: float
longitude: float

def to_schema_site(self):
site = schema.PropertyValue.construct()
site.name = "site"
site.value = []
site_name = schema.PropertyValue.construct()
site_name.name = "siteName"
site_name.value = self.site_name
site.value.append(site_name)
site_type = schema.PropertyValue.construct()
site_type.name = "siteType"
site_type.value = self.site_type
site.value.append(site_type)
site_code = schema.PropertyValue.construct()
site_code.name = "siteCode"
site_code.value = self.site_code
site.value.append(site_code)
elevation_datum = schema.PropertyValue.construct()
elevation_datum.name = "elevationDatum"
elevation_datum.value = self.elevation_datum
site.value.append(elevation_datum)
elevation_m = schema.PropertyValue.construct()
elevation_m.name = "elevationM"
elevation_m.value = self.elevation_m
site.value.append(elevation_m)
latitude = schema.PropertyValue.construct()
latitude.name = "latitude"
latitude.value = self.latitude
site.value.append(latitude)
longitude = schema.PropertyValue.construct()
longitude.name = "longitude"
longitude.value = self.longitude
site.value.append(longitude)
return site

class Unit(BaseModel):
name: str
type: str
abbreviation: str

def to_schema_unit(self):
unit = schema.PropertyValue.construct()
unit.name = "unit"
unit.value = []
unit_name = schema.PropertyValue.construct()
unit_name.name = "unitName"
unit_name.value = self.name
unit.value.append(unit_name)
unit_type = schema.PropertyValue.construct()
unit_type.name = "unitType"
unit_type.value = self.type
unit.value.append(unit_type)
unit_abbreviation = schema.PropertyValue.construct()
unit_abbreviation.name = "unitAbbreviation"
unit_abbreviation.value = self.abbreviation
unit.value.append(unit_abbreviation)
return unit

class TSVariable(BaseModel):
variable_name: str
variable_code: str
variable_type: str
speciation: str
no_data_value: float

def to_schema_variable(self):
variable = schema.PropertyValue.construct()
variable.name = "variable"
variable.value = []
variable_name = schema.PropertyValue.construct()
variable_name.name = "variableName"
variable_name.value = self.variable_name
variable.value.append(variable_name)
variable_code = schema.PropertyValue.construct()
variable_code.name = "variableCode"
variable_code.value = self.variable_code
variable.value.append(variable_code)
variable_type = schema.PropertyValue.construct()
variable_type.name = "variableType"
variable_type.value = self.variable_type
variable.value.append(variable_type)
variable_speciation = schema.PropertyValue.construct()
variable_speciation.name = "speciation"
variable_speciation.value = self.speciation
variable.value.append(variable_speciation)
no_data_value = schema.PropertyValue.construct()
no_data_value.name = "noDataValue"
no_data_value.value = self.no_data_value
variable.value.append(no_data_value)
return variable

aggregation_statistic: str
method: Method
processing_level: ProcessingLevel
sample_medium: str
series_id: str
status: str
unit: Unit
variable: TSVariable
value_count: int

def to_schema_timeseries_result(self, ts_property):
aggregation_statistic = schema.PropertyValue.construct()
aggregation_statistic.name = "aggregationStatistic"
aggregation_statistic.value = self.aggregation_statistic
ts_property.value.append(aggregation_statistic)
method = self.method.to_schema_method()
ts_property.value.append(method)
processing_level = self.processing_level.to_schema_processing_level()
ts_property.value.append(processing_level)
sample_medium = schema.PropertyValue.construct()
sample_medium.name = "sampleMedium"
sample_medium.value = self.sample_medium
ts_property.value.append(sample_medium)
series_id = schema.PropertyValue.construct()
series_id.name = "seriesID"
series_id.value = self.series_id
ts_property.value.append(series_id)
status = schema.PropertyValue.construct()
status.name = "status"
status.value = self.status
ts_property.value.append(status)
unit = self.unit.to_schema_unit()
ts_property.value.append(unit)
variable = self.variable.to_schema_variable()
ts_property.value.append(variable)
value_count = schema.PropertyValue.construct()
value_count.name = "valueCount"
value_count.value = self.value_count
ts_property.value.append(value_count)
return ts_property


class _TimeseriesAggregationMetadata(BaseModel):
title: Optional[str]
abstract: Optional[str]
spatial_coverage: Optional[Union[SpatialCoverageBox, SpatialCoveragePoint]]
period_coverage: Optional[TemporalCoverage]
# the extracted file (media object) metadata is already in schema.MediaObject format
associatedMedia: Optional[List[schema.MediaObject]]
creators: Optional[List[Creator]]
contributors: Optional[List[Creator]]
subjects: Optional[List[str]]
time_series_results: List[TimeseriesResult]

def to_aggregation_spatial_coverage(self):
if self.spatial_coverage:
aggr_spatial_coverage = self.spatial_coverage.to_dataset_spatial_coverage()
return aggr_spatial_coverage
return None

def to_aggregation_period_coverage(self):
if self.period_coverage:
return self.period_coverage.to_dataset_temporal_coverage()
return None

def to_schema_time_series_results(self):
ts_properties = []
ts_property = schema.PropertyValue.construct()
ts_property.name = "timeSeriesResult"
ts_property.value = []
for ts_result in self.time_series_results:
ts_result.to_schema_timeseries_result(ts_property)
ts_properties.append(ts_property)
return ts_properties

def to_catalog_dataset(self):
aggregation_metadata = schema.TimeseriesAggregationMetadata.construct()
aggregation_metadata.name = self.title
aggregation_metadata.description = self.abstract
aggregation_metadata.spatialCoverage = self.to_aggregation_spatial_coverage()
aggregation_metadata.temporalCoverage = self.to_aggregation_period_coverage()
aggregation_metadata.creator = [creator.to_dataset_creator() for creator in self.creators]
aggregation_metadata.creator.extend([contributor.to_dataset_creator() for contributor in self.contributors])
aggregation_metadata.keywords = self.subjects
aggregation_metadata.additionalProperty = self.to_schema_time_series_results()
aggregation_metadata.associatedMedia = self.associatedMedia
return aggregation_metadata


class TimeseriesAggregationMetadataAdapter:
@staticmethod
def to_catalog_record(aggr_metadata: dict):
"""Converts extracted raster aggregation metadata to a catalog dataset record"""
aggr_model = _TimeseriesAggregationMetadata(**aggr_metadata)
return aggr_model.to_catalog_dataset()
8 changes: 8 additions & 0 deletions hsextract/models/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,3 +626,11 @@ class FeatureAggregationMetadata(BaseAggregationMetadata):
const=True,
description="Additional type of aggregation."
)


class TimeseriesAggregationMetadata(BaseAggregationMetadata):
additionalType: str = Field(
default="Timeseries Dataset",
const=True,
description="Additional type of aggregation."
)
7 changes: 7 additions & 0 deletions hsextract/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
NetCDFAggregationMetadataAdapter,
RasterAggregationMetadataAdapter,
FeatureAggregationMetadataAdapter,
TimeseriesAggregationMetadataAdapter,
)
from hsextract.listing.utils import prepare_files
from hsextract.models.schema import CoreMetadataDOC
Expand Down Expand Up @@ -41,6 +42,8 @@ def extract_metadata_with_file_path(type: str, filepath: str, user_metadata_file
def extract_metadata(type: str, filepath, use_adapter=True):
# use_adapter is a flag to determine if the metadata should be converted to a catalog record
# it is set to False in tests when testing for the raw extracted metadata

extension = os.path.splitext(filepath)[1]
try:
extracted_metadata = _extract_metadata(type, filepath)
except Exception as e:
Expand All @@ -65,6 +68,10 @@ def extract_metadata(type: str, filepath, use_adapter=True):
adapter = NetCDFAggregationMetadataAdapter()
elif type == "feature":
adapter = FeatureAggregationMetadataAdapter()
elif type == "timeseries" and extension == ".sqlite":
# TODO: Add support for timeseries csv metadata extraction
adapter = TimeseriesAggregationMetadataAdapter()

catalog_record = json.loads(adapter.to_catalog_record(extracted_metadata).json())
return catalog_record
else:
Expand Down

0 comments on commit d6779ae

Please sign in to comment.