Skip to content

Commit

Permalink
Update DataDeriveUtil deriving BMI config dataset.
Browse files Browse the repository at this point in the history
Separating parts of the functionality for deriving BMI init config
datasets into more focused/reusable/testable functions.
  • Loading branch information
robertbartel committed Jul 2, 2024
1 parent 054581c commit 729a482
Showing 1 changed file with 81 additions and 20 deletions.
101 changes: 81 additions & 20 deletions python/services/dataservice/dmod/dataservice/data_derive_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
from dmod.core.meta_data import (DataCategory, DataDomain, DataFormat, DataRequirement, DiscreteRestriction,
StandardDatasetIndex)
from dmod.core.exception import DmodRuntimeError
from dmod.core.dataset import Dataset, DatasetType
from dmod.core.dataset import Dataset, DatasetManager, DatasetType
from dmod.scheduler.job import Job, JobExecStep
from typing import List, Optional, Tuple
from typing import List, Optional, Tuple, Union


class DataDeriveUtil:
Expand Down Expand Up @@ -154,6 +154,40 @@ def _apply_dataset_to_requirement(self, dataset: Dataset, requirement: DataRequi
#################################################################################
requirement.fulfilled_by = dataset.name

def _build_bmi_auto_generator(self,
hydrofabric_dataset: Union[str, Dataset],
realization_config_dataset: Union[str, Dataset]) -> DataServiceBmiInitConfigGenerator:
"""
Build a BMI config auto-generator instance, using the subtype implementation from this service package.
Parameters
----------
hydrofabric_dataset
The hydrofabric dataset, or its name, containing the hydrofabric to use to generate BMI init configs.
realization_config_dataset
The realization config dataset, or its name, containing the ngen realization config to use to generate BMI
init configs.
Returns
-------
DataServiceBmiInitConfigGenerator
The BMI init config generator object.
"""
if isinstance(hydrofabric_dataset, str):
hydrofabric_dataset = self._managers.known_datasets()[hydrofabric_dataset]

if isinstance(realization_config_dataset, str):
realization_config_dataset = self._managers.known_datasets()[realization_config_dataset]

hydrofabric_gpkg_file, hydrofabric_attributes_file = self.find_hydrofabric_files(dataset=hydrofabric_dataset)

return DataServiceBmiInitConfigGenerator(hydrofabric_dataset=hydrofabric_dataset,
hydrofabric_geopackage_file_name=hydrofabric_gpkg_file,
hydrofabric_model_attributes_file_name=hydrofabric_attributes_file,
realization_config_dataset=realization_config_dataset,
realization_cfg_file_name="realization_config.json",
noah_owp_params_dir=self._noah_owp_params_dir)

def _can_derive_bmi_configs(self, requirement: DataRequirement, job: Job) -> bool:
"""
Determine whether a ``BMI_CONFIG`` :class:`DataFormat` dataset can be derived to fulfill this job requirement.
Expand Down Expand Up @@ -220,27 +254,11 @@ def _derive_bmi_init_config_dataset(self, requirement: DataRequirement, job: Job
ds_mgr = self._managers.manager(ds_type)

hf_ds_name = self.get_fulfilling_dataset_name(job=job, data_format=DataFormat.NGEN_GEOPACKAGE_HYDROFABRIC_V2)
hydrofabric_ds = self._managers.known_datasets()[hf_ds_name]
real_cfg_ds_name = self.get_fulfilling_dataset_name(job=job, data_format=DataFormat.NGEN_REALIZATION_CONFIG)
realization_cfg_ds = self._managers.known_datasets()[real_cfg_ds_name]

hf_gpkg_file, attributes_file = self.find_hydrofabric_files(dataset=hydrofabric_ds)

generator = DataServiceBmiInitConfigGenerator(hydrofabric_dataset=hydrofabric_ds,
hydrofabric_geopackage_file_name=hf_gpkg_file,
hydrofabric_model_attributes_file_name=attributes_file,
realization_config_dataset=realization_cfg_ds,
realization_cfg_file_name="realization_config.json",
noah_owp_params_dir=self._noah_owp_params_dir)
dataset: Dataset = self._generate_bmi_ds(bmi_ds_name=ds_name, bmi_ds_mgr=ds_mgr, hydrofabric_ds_name=hf_ds_name,
realization_cfg_ds_name=real_cfg_ds_name)

data_adder = BmiAutoGenerationAdder(dataset_name=ds_name, dataset_manager=ds_mgr, bmi_generator=generator)

domain = DataDomain(data_format=DataFormat.BMI_CONFIG,
discrete_restrictions=[DiscreteRestriction(variable=StandardDatasetIndex.DATA_ID,
values=[ds_name])])

dataset: Dataset = ds_mgr.create_temporary(name=ds_name, category=DataCategory.CONFIG, domain=domain,
is_read_only=False, initial_data=data_adder)
self._apply_dataset_to_requirement(dataset=dataset, requirement=requirement, job=job)

async def _derive_composite_job_config(self, requirement: DataRequirement, job: Job):
Expand Down Expand Up @@ -360,6 +378,49 @@ def _determine_access_location(self, dataset: Dataset, job: Job) -> str:
msg = "Could not determine proper access location for new dataset of type {} by non-Docker job {}."
raise DmodRuntimeError(msg.format(dataset.__class__.__name__, job.job_id))

def _generate_bmi_ds(self, bmi_ds_name: str, bmi_ds_mgr: DatasetManager, hydrofabric_ds_name: str,
realization_cfg_ds_name: str, is_temporary: bool = True) -> Dataset:
"""
Generate a BMI init config dataset using the given details.
Parameters
----------
bmi_ds_name
The name for the new BMI config dataset to create.
bmi_ds_mgr
The manager for the new BMI config dataset.
hydrofabric_ds_name
The name of the hydrofabric dataset containing the hydrofabric to use during BMI config generation.
realization_cfg_ds_name
The name of the ngen realization config dataset containing the hydrofabric to use during BMI config
generation.
is_temporary
Whether the generated dataset should be marked as temporary (by default, ``True``).
Returns
-------
Dataset
The newly generated dataset.
"""
data_adder = BmiAutoGenerationAdder(dataset_name=bmi_ds_name,
dataset_manager=bmi_ds_mgr,
bmi_generator=self._build_bmi_auto_generator(hydrofabric_ds_name,
realization_cfg_ds_name))
restrictions = [
DiscreteRestriction(variable=StandardDatasetIndex.DATA_ID, values=[bmi_ds_name]),
DiscreteRestriction(variable=StandardDatasetIndex.HYDROFABRIC_DATA_ID, values=[hydrofabric_ds_name]),
DiscreteRestriction(variable=StandardDatasetIndex.REALIZATION_CONFIG_DATA_ID,
values=[realization_cfg_ds_name])
]
domain = DataDomain(data_format=DataFormat.BMI_CONFIG, discrete_restrictions=restrictions)

if is_temporary:
return bmi_ds_mgr.create_temporary(name=bmi_ds_name, category=DataCategory.CONFIG, domain=domain,
is_read_only=True, initial_data=data_adder)
else:
return bmi_ds_mgr.create(name=bmi_ds_name, category=DataCategory.CONFIG, domain=domain,
is_read_only=True, initial_data=data_adder)

async def async_can_dataset_be_derived(self, requirement: DataRequirement, job: Optional[Job] = None) -> bool:
"""
Asynchronously determine if a dataset can be derived from existing datasets to fulfill this requirement.
Expand Down

0 comments on commit 729a482

Please sign in to comment.