diff --git a/python/services/dataservice/dmod/dataservice/data_derive_util.py b/python/services/dataservice/dmod/dataservice/data_derive_util.py index 2d2ecf1c6..094284b86 100644 --- a/python/services/dataservice/dmod/dataservice/data_derive_util.py +++ b/python/services/dataservice/dmod/dataservice/data_derive_util.py @@ -9,9 +9,9 @@ from dmod.core.meta_data import (DataCategory, DataDomain, DataFormat, DataRequirement, DiscreteRestriction, StandardDatasetIndex) from dmod.core.exception import DmodRuntimeError -from dmod.core.dataset import Dataset, DatasetType +from dmod.core.dataset import Dataset, DatasetManager, DatasetType from dmod.scheduler.job import Job, JobExecStep -from typing import List, Optional, Tuple +from typing import List, Optional, Tuple, Union class DataDeriveUtil: @@ -154,6 +154,40 @@ def _apply_dataset_to_requirement(self, dataset: Dataset, requirement: DataRequi ################################################################################# requirement.fulfilled_by = dataset.name + def _build_bmi_auto_generator(self, + hydrofabric_dataset: Union[str, Dataset], + realization_config_dataset: Union[str, Dataset]) -> DataServiceBmiInitConfigGenerator: + """ + Build a BMI config auto-generator instance, using the subtype implementation from this service package. + + Parameters + ---------- + hydrofabric_dataset + The hydrofabric dataset, or its name, containing the hydrofabric to use to generate BMI init configs. + realization_config_dataset + The realization config dataset, or its name, containing the ngen realization config to use to generate BMI + init configs. + + Returns + ------- + DataServiceBmiInitConfigGenerator + The BMI init config generator object. + """ + if isinstance(hydrofabric_dataset, str): + hydrofabric_dataset = self._managers.known_datasets()[hydrofabric_dataset] + + if isinstance(realization_config_dataset, str): + realization_config_dataset = self._managers.known_datasets()[realization_config_dataset] + + hydrofabric_gpkg_file, hydrofabric_attributes_file = self.find_hydrofabric_files(dataset=hydrofabric_dataset) + + return DataServiceBmiInitConfigGenerator(hydrofabric_dataset=hydrofabric_dataset, + hydrofabric_geopackage_file_name=hydrofabric_gpkg_file, + hydrofabric_model_attributes_file_name=hydrofabric_attributes_file, + realization_config_dataset=realization_config_dataset, + realization_cfg_file_name="realization_config.json", + noah_owp_params_dir=self._noah_owp_params_dir) + def _can_derive_bmi_configs(self, requirement: DataRequirement, job: Job) -> bool: """ Determine whether a ``BMI_CONFIG`` :class:`DataFormat` dataset can be derived to fulfill this job requirement. @@ -220,27 +254,11 @@ def _derive_bmi_init_config_dataset(self, requirement: DataRequirement, job: Job ds_mgr = self._managers.manager(ds_type) hf_ds_name = self.get_fulfilling_dataset_name(job=job, data_format=DataFormat.NGEN_GEOPACKAGE_HYDROFABRIC_V2) - hydrofabric_ds = self._managers.known_datasets()[hf_ds_name] real_cfg_ds_name = self.get_fulfilling_dataset_name(job=job, data_format=DataFormat.NGEN_REALIZATION_CONFIG) - realization_cfg_ds = self._managers.known_datasets()[real_cfg_ds_name] - - hf_gpkg_file, attributes_file = self.find_hydrofabric_files(dataset=hydrofabric_ds) - generator = DataServiceBmiInitConfigGenerator(hydrofabric_dataset=hydrofabric_ds, - hydrofabric_geopackage_file_name=hf_gpkg_file, - hydrofabric_model_attributes_file_name=attributes_file, - realization_config_dataset=realization_cfg_ds, - realization_cfg_file_name="realization_config.json", - noah_owp_params_dir=self._noah_owp_params_dir) + dataset: Dataset = self._generate_bmi_ds(bmi_ds_name=ds_name, bmi_ds_mgr=ds_mgr, hydrofabric_ds_name=hf_ds_name, + realization_cfg_ds_name=real_cfg_ds_name) - data_adder = BmiAutoGenerationAdder(dataset_name=ds_name, dataset_manager=ds_mgr, bmi_generator=generator) - - domain = DataDomain(data_format=DataFormat.BMI_CONFIG, - discrete_restrictions=[DiscreteRestriction(variable=StandardDatasetIndex.DATA_ID, - values=[ds_name])]) - - dataset: Dataset = ds_mgr.create_temporary(name=ds_name, category=DataCategory.CONFIG, domain=domain, - is_read_only=False, initial_data=data_adder) self._apply_dataset_to_requirement(dataset=dataset, requirement=requirement, job=job) async def _derive_composite_job_config(self, requirement: DataRequirement, job: Job): @@ -360,6 +378,49 @@ def _determine_access_location(self, dataset: Dataset, job: Job) -> str: msg = "Could not determine proper access location for new dataset of type {} by non-Docker job {}." raise DmodRuntimeError(msg.format(dataset.__class__.__name__, job.job_id)) + def _generate_bmi_ds(self, bmi_ds_name: str, bmi_ds_mgr: DatasetManager, hydrofabric_ds_name: str, + realization_cfg_ds_name: str, is_temporary: bool = True) -> Dataset: + """ + Generate a BMI init config dataset using the given details. + + Parameters + ---------- + bmi_ds_name + The name for the new BMI config dataset to create. + bmi_ds_mgr + The manager for the new BMI config dataset. + hydrofabric_ds_name + The name of the hydrofabric dataset containing the hydrofabric to use during BMI config generation. + realization_cfg_ds_name + The name of the ngen realization config dataset containing the hydrofabric to use during BMI config + generation. + is_temporary + Whether the generated dataset should be marked as temporary (by default, ``True``). + + Returns + ------- + Dataset + The newly generated dataset. + """ + data_adder = BmiAutoGenerationAdder(dataset_name=bmi_ds_name, + dataset_manager=bmi_ds_mgr, + bmi_generator=self._build_bmi_auto_generator(hydrofabric_ds_name, + realization_cfg_ds_name)) + restrictions = [ + DiscreteRestriction(variable=StandardDatasetIndex.DATA_ID, values=[bmi_ds_name]), + DiscreteRestriction(variable=StandardDatasetIndex.HYDROFABRIC_DATA_ID, values=[hydrofabric_ds_name]), + DiscreteRestriction(variable=StandardDatasetIndex.REALIZATION_CONFIG_DATA_ID, + values=[realization_cfg_ds_name]) + ] + domain = DataDomain(data_format=DataFormat.BMI_CONFIG, discrete_restrictions=restrictions) + + if is_temporary: + return bmi_ds_mgr.create_temporary(name=bmi_ds_name, category=DataCategory.CONFIG, domain=domain, + is_read_only=True, initial_data=data_adder) + else: + return bmi_ds_mgr.create(name=bmi_ds_name, category=DataCategory.CONFIG, domain=domain, + is_read_only=True, initial_data=data_adder) + async def async_can_dataset_be_derived(self, requirement: DataRequirement, job: Optional[Job] = None) -> bool: """ Asynchronously determine if a dataset can be derived from existing datasets to fulfill this requirement.