diff --git a/python/lib/client/dmod/client/_version.py b/python/lib/client/dmod/client/_version.py index 8411e551d..a71c5c7f1 100644 --- a/python/lib/client/dmod/client/_version.py +++ b/python/lib/client/dmod/client/_version.py @@ -1 +1 @@ -__version__ = '0.6.1' +__version__ = '0.7.0' diff --git a/python/lib/client/setup.py b/python/lib/client/setup.py index 0d1a22ce9..a0197db1b 100644 --- a/python/lib/client/setup.py +++ b/python/lib/client/setup.py @@ -22,7 +22,7 @@ license='', include_package_data=True, #install_requires=['websockets', 'jsonschema'],vi - install_requires=['dmod-core>=0.15.2', 'websockets>=8.1', 'pydantic>=1.10.8,~=1.10', 'dmod-communication>=0.17.0', - 'dmod-externalrequests>=0.6.0', 'dmod-modeldata>=0.11.1'], + install_requires=['dmod-core>=0.16.0', 'websockets>=8.1', 'pydantic>=1.10.8,~=1.10', 'dmod-communication>=0.17.0', + 'dmod-externalrequests>=0.6.0', 'dmod-modeldata>=0.12.0'], packages=find_namespace_packages(include=['dmod.*'], exclude=['dmod.test']) ) diff --git a/python/lib/core/dmod/core/_version.py b/python/lib/core/dmod/core/_version.py index 00d1ab54f..8911e95ca 100644 --- a/python/lib/core/dmod/core/_version.py +++ b/python/lib/core/dmod/core/_version.py @@ -1 +1 @@ -__version__ = '0.15.2' +__version__ = '0.16.0' diff --git a/python/lib/core/dmod/core/dataset.py b/python/lib/core/dmod/core/dataset.py index c57f1547c..07a115ba8 100644 --- a/python/lib/core/dmod/core/dataset.py +++ b/python/lib/core/dmod/core/dataset.py @@ -527,8 +527,8 @@ def __init__(self, uuid: Optional[UUID] = None, datasets: Optional[Dict[str, Dat """ A property attribute to hold errors encountered during operations. """ @abstractmethod - def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Reader]] = None, source: Optional[str] = None, - is_temp: bool = False, **kwargs) -> bool: + def add_data(self, dataset_name: str, dest: str, domain: DataDomain, data: Optional[Union[bytes, Reader]] = None, + source: Optional[str] = None, is_temp: bool = False, **kwargs) -> bool: """ Add data in some format to the dataset. @@ -542,6 +542,8 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Rea dest : str A path-like string specifying a location within the dataset (e.g., file, object, sub-URL) where the data should be added. + domain : DataDomain + The defined domain for the data being added. data : Optional[Union[bytes, Reader]] Optional encoded byte string _or_ object with read() method returning bytes containing data to be inserted into the data set; either this or ``source`` must be provided. @@ -648,7 +650,7 @@ def delete(self, dataset: Dataset, **kwargs) -> bool: # TODO: add back as abstract, then implement properly in subtypes #@abstractmethod - def delete_data(self, dataset_name: str, **kwargs) -> bool: + def delete_data(self, dataset_name: str, removed_domain: DataDomain, **kwargs) -> bool: """ Delete data in some format from the dataset. @@ -656,6 +658,9 @@ def delete_data(self, dataset_name: str, **kwargs) -> bool: ---------- dataset_name : str The dataset from which to delete data. + removed_domain : DataDomain + The portion of the dataset's domain corresponding to the deleted data, which should be subtracted from the + dataset's domain. kwargs Implementation-specific params for referencing what data should be deleted and how. diff --git a/python/lib/core/dmod/core/meta_data.py b/python/lib/core/dmod/core/meta_data.py index 19b7b446f..e66d7199e 100644 --- a/python/lib/core/dmod/core/meta_data.py +++ b/python/lib/core/dmod/core/meta_data.py @@ -6,7 +6,7 @@ from .serializable import Serializable from .common.helper_functions import get_subclasses from .exception import DmodRuntimeError -from typing import Any, Dict, List, Optional, Set, Type, Union +from typing import Any, Dict, Generic, List, Optional, Set, Tuple, Type, TypeVar, Union from typing_extensions import Self from collections.abc import Iterable from collections import OrderedDict @@ -520,9 +520,48 @@ def can_expand_with(self, other: ContinuousRestriction) -> bool: else: return other.begin <= self.end + def can_have_subtracted(self, subtrahend: ContinuousRestriction) -> bool: + """ + Whether another restriction is subtraction-compatible with this one. + + Whether another restriction is expansion-compatible with this one. To be compatible, the subtrahend must have + the same variable, be fully contained by this instance, and have either the same :attribute:`begin` or the same + :attribute:`end` value, but not both. + + Parameters + ---------- + subtrahend: ContinuousRestriction + The restriction to potentially be subtracted from this instance. + + Returns + ------- + bool + Whether this subtrahend is compatible with subtraction from this instance. + + Notes + ----- + Equal restrictions cannot be subtracted. Subtraction must return a restriction, but a restriction with no range + is undefined. Also, the domain subtraction operation treats equal restrictions as things that should not be + changed. + + In practice this makes sense. Subtract a CSV forcing file for one catchment away from a dataset, and that + file's individual data domain will be for the time range of the dataset and it's single catchment. So, + subtracting the data domain of the file from the data domain of the dataset should subtract that one catchment + from the dataset's domain, but it shouldn't delete (or in any way alter) the time range. + + See Also + -------- + contains + subtract + """ + return (self._compatible_with(subtrahend) + and self.contains(subtrahend) + and (self.begin == subtrahend.begin or self.end == subtrahend.end) + and not (self.begin == subtrahend.begin and self.end == subtrahend.end)) + def contains(self, other: ContinuousRestriction) -> bool: """ - Whether this object contains all the values of the given object and the two are of the same index. + Whether this object contains all the values of the given object and the two are of the same variable index. For this type, equal begin or end values are considered contained. @@ -533,7 +572,7 @@ def contains(self, other: ContinuousRestriction) -> bool: Returns ------- bool - Whether this object contains all the values of the given object and the two are of the same index. + Whether this object contains all the values of the given object and the two are of the same variable index. """ return self._compatible_with(other) and self.begin <= other.begin and self.end >= other.end @@ -564,6 +603,31 @@ def expand(self, other: ContinuousRestriction) -> ContinuousRestriction: return self.__class__(variable=self.variable, begin=min(self.begin, other.begin), end=max(self.end, other.end), datetime_pattern=self.datetime_pattern, subclass=self.subclass) + def subtract(self, subtrahend: ContinuousRestriction) -> ContinuousRestriction: + """ + Produce another instance made by subtracting the given restriction from this one, assuming they are compatible. + + Parameters + ---------- + subtrahend + + Returns + ------- + ContinuousRestriction + A new instance representing the result of subtraction. + + See Also + -------- + can_have_subtracted + """ + if not self.can_have_subtracted(subtrahend): + raise ValueError(f"Can't subtract given {subtrahend.__class__.__name__}") + new_restrict = ContinuousRestriction(**self.dict()) + if subtrahend.begin == self.begin: + new_restrict.begin = subtrahend.end + else: + new_restrict.end = subtrahend.begin + return new_restrict class DiscreteRestriction(Serializable): """ @@ -638,6 +702,41 @@ def can_expand_with(self, other: DiscreteRestriction) -> bool: """ return self._compatible_with(other) and not self.contains(other) + def can_have_subtracted(self, subtrahend: DiscreteRestriction) -> bool: + """ + Whether another restriction is subtraction-compatible with this one. + + Whether another restriction is expansion-compatible with this one. To be compatible, the subtrahend must have + the same variable and be fully contained by this instance, but not equal to it. + + Parameters + ---------- + subtrahend: DiscreteRestriction + The restriction to potentially be subtracted from this instance. + + Returns + ------- + bool + Whether this subtrahend is compatible with subtraction from this instance. + + Notes + ----- + Equal restrictions cannot be subtracted. A restriction without any explicit values (i.e., with all subtracted) + has a different implied meaning: "all" within some context). This is not the desired behavior for subtraction. + + Also, the domain subtraction operation treats equal restrictions as things that should not be changed. In + practice this makes sense. Subtract a NetCDF forcing file with data for one hour away from a dataset, and that + file's individual data domain will be for all catchments for the dataset and that single hour of time. + So, subtracting the data domain of the file from the data domain of the dataset should subtract that one hour + (assuming it doesn't split the time range in two) from the dataset's domain, but it shouldn't delete the + catchments. + + See Also + -------- + contains + """ + return self._compatible_with(subtrahend) and self.contains(subtrahend) and not self == subtrahend + def contains(self, other: DiscreteRestriction) -> bool: """ Whether this object contains all the values of the given object and the two are of the same index. @@ -718,6 +817,36 @@ def is_all_possible_values(self) -> bool: """ return self.values is not None and len(self.values) == 0 + def subtract(self, subtrahend: DiscreteRestriction) -> DiscreteRestriction: + """ + Produce another instance made by subtracting the given restriction from this one, assuming they are compatible. + + Parameters + ---------- + subtrahend + + Returns + ------- + DiscreteRestriction + A new instance representing the result of subtraction. + + See Also + -------- + can_have_subtracted + """ + if not self.can_have_subtracted(subtrahend): + raise ValueError(f"Can't subtract given {subtrahend.__class__.__name__}") + if self.is_all_possible_values or subtrahend.is_all_possible_values: + raise ValueError("Can't subtract unbound restriction") + + new_restrict = DiscreteRestriction(**self.dict()) + for val in (v for v in new_restrict.values if v in subtrahend.values): + new_restrict.values.remove(val) + return new_restrict + + +R = TypeVar("R", bound=Union[ContinuousRestriction, DiscreteRestriction]) + class DataDomain(Serializable): """ @@ -888,10 +1017,83 @@ def factory_init_from_restriction_collections(cls, data_format: DataFormat, **kw continuous_restrictions=None if len(continuous) == 0 else continuous, discrete_restrictions=None if len(discrete) == 0 else discrete) + @classmethod + def subtract_domains(cls, minuend: DataDomain, subtrahend: DataDomain) -> DataDomain: + """ + Subtract part of a defined data domain along a single dimension, producing a domain that is "smaller." + + Subtraction the subtrahend from the minuend along one dimension - i.e., one constraint variable. Constraints + only in one of the domains are ignored, as are constraints that are exactly equal across the two domains (the + subtraction operation should make the result smaller in one dimension, not remove that dimension from it). + + Parameters + ---------- + minuend: DataDomain + The original, larger domain to be subtracted from. + subtrahend: DataDomain + The portion to be subtracted from the minuend. + + Raises + ------ + ValueError + If the formats or restriction constraints of the two domains do not support a subtraction to be performed. + + Returns + ------- + DataDomain + The new, updated domain value, as a new object. + """ + # TODO: (later) consider exceptions to format rule (here and in merging) and perhaps other behavior, like for composites + if minuend.data_format != subtrahend.data_format: + raise ValueError(f"Can't subtract {subtrahend.data_format.name} domain from {minuend.data_format.name} one") + + def get_subtractables(r_minuend: Dict[StandardDatasetIndex, R], + r_subtrahend: Dict[StandardDatasetIndex, R] + ) -> Set[StandardDatasetIndex]: + indices_in_both = {idx for idx in r_minuend if idx in r_subtrahend} + indices_unequal = {idx for idx in indices_in_both if r_minuend[idx] != r_subtrahend[idx]} + can_subtract = {idx for idx in indices_in_both if r_minuend[idx].can_have_subtracted(r_subtrahend[idx])} + cannot_subtract = indices_unequal - can_subtract + + if cannot_subtract: + raise ValueError(f"Can't subtract incompatible constraint values for " + f"{[idx.name for idx in cannot_subtract]!s}") + return can_subtract + + cont_rest_diffs = get_subtractables(minuend.continuous_restrictions, subtrahend.continuous_restrictions) + discr_rest_diffs = get_subtractables(minuend.discrete_restrictions, subtrahend.discrete_restrictions) + + if not cont_rest_diffs and not discr_rest_diffs: + raise ValueError(f"Nothing in domain {subtrahend!s} needs to be subtracted from {minuend!s}") + elif len(cont_rest_diffs) + len(discr_rest_diffs) > 1: + raise ValueError(f"Can't subtract across more than one dimension at a time.") + + # Make a copy, and we'll remove things from it + new_dom = DataDomain(**minuend.to_dict()) + + for std_idx in cont_rest_diffs: + new_dom.continuous_restrictions[std_idx].subtract(subtrahend.continuous_restrictions[std_idx]) + for std_idx in discr_rest_diffs: + new_dom.discrete_restrictions[std_idx].subtract(subtrahend.discrete_restrictions[std_idx]) + + return new_dom + @classmethod def merge_domains(cls, d1: DataDomain, d2: DataDomain) -> DataDomain: """ - Merge the two domains into a new combined domain. + Merge two domains into a new combined domain, combining values along a single restriction variable dimension. + + Merge the values of two compatible domain objects into a combined domain object. In order to be compatible, + the two domains must be of the same :class:`DataFormat` and have at most one different restriction value. Unlike + subtraction, this includes restrictions only present on one of the two domains. + + Also unlike subtraction, strictly speaking, domains are merge-compatible if they are equal or if one contains + the other; i.e., these cases are valid for the function. However, the function will not have any side effects + in those situations. + + Related to the above, the returned domain will be a new :class:`DataDomain` object, with two exceptions: + - if one domain already fully contains the other, then the original, containing domain object is returned + - if the domains are equal, then `d1` is returned Parameters ---------- @@ -903,54 +1105,65 @@ def merge_domains(cls, d1: DataDomain, d2: DataDomain) -> DataDomain: Returns ------- DataDomain - The new merged domain. + The resulting domain from the merge operation. Raises ------ ValueError - If the formats or constraints of the two domains do not permit them to be merged - - Notes - ----- - For any two domains ``d1`` and ``d2`` that can successfully be merged, and the derived domain ``d3`` equal to - ``merge_domains(d1, d2)``, then it should always be true that ``substract_domains(d3, d2) == d1``. + If the formats or restriction constraints of the two domains do not permit them to be merged. See Also -------- subtract_domains """ - # TODO: (later) consider exceptions to format rule and perhaps other behavior, like for composites + # TODO: (later) consider exceptions to format rule (here and in subtracting) and perhaps other behavior, like for composites if d1.data_format != d2.data_format: raise ValueError(f"Can't merge {d2.data_format.name} format domain into one of {d1.data_format.name}") - # New continuous; taken directly from domain 1 if not present or equal in domain 2; otherwise, by extending - new_c_restricts: Dict[StandardDatasetIndex, ContinuousRestriction] = { - std_ds_idx: ( - d1_restrict - if std_ds_idx not in d2.continuous_restrictions or d1_restrict == d2.continuous_restrictions[std_ds_idx] - else d1_restrict.expand(d2.continuous_restrictions[std_ds_idx]) - ) - for std_ds_idx, d1_restrict in d1.continuous_restrictions.items()} - - # Any other indices in d2, just move them over - for std_ds_idx in (i for i in d2.continuous_restrictions if i not in new_c_restricts): - new_c_restricts[std_ds_idx] = d2.continuous_restrictions[std_ds_idx] - - # And now similarly for discrete - new_d_restricts: Dict[StandardDatasetIndex, DiscreteRestriction] = { - std_ds_idx: ( - d1_restrict - if std_ds_idx not in d2.discrete_restrictions or d1_restrict == d2.discrete_restrictions[std_ds_idx] - else d1_restrict.expand(d2.discrete_restrictions[std_ds_idx]) - ) - for std_ds_idx, d1_restrict in d1.discrete_restrictions.items()} - - for std_ds_idx in (i for i in d2.discrete_restrictions if i not in new_d_restricts): - new_d_restricts[std_ds_idx] = d2.discrete_restrictions[std_ds_idx] - - return DataDomain(data_format=d1.data_format, - continuous_restrictions=new_c_restricts, - discrete_restrictions=new_d_restricts) + if d1 == d2: + return d1 + elif d1.contains(d2): + return d1 + elif d2.contains(d1): + return d2 + + def merge_diff(d1_restricts: Dict[StandardDatasetIndex, R], d2_restricts: Dict[StandardDatasetIndex, R] + ) -> Optional[R]: + all_indices = set(d1_restricts.keys()).union(d2_restricts.keys()) + only_in_1 = {idx for idx in d1_restricts if idx not in d2_restricts} + only_in_2 = {idx for idx in d2_restricts if idx not in d1_restricts} + in_both = {idx for idx in all_indices if idx not in only_in_1 and idx not in only_in_2} + unequal = {idx for idx in in_both if d1_restricts[idx] != d2_restricts[idx]} + if len(only_in_1) + len(only_in_2) + len(unequal) == 0: + return None + if len(only_in_1) + len(only_in_2) + len(unequal) > 1: + raise ValueError(f"Can't support multiple different restrictions (even of one type) when merging") + if only_in_1: + value = d1_restricts[only_in_1.pop()] + return value.__class__(**value.dict()) + if only_in_2: + value = d2_restricts[only_in_2.pop()] + return value.__class__(**value.dict()) + idx = unequal.pop() + try: + return d1_restricts[idx].expand(d2_restricts[idx]) + except Exception as e: + raise ValueError(f"Failure merging restriction for domain due to {e.__class__.__name__}: {e!s}") + + cont_restrict_diff = merge_diff(d1.continuous_restrictions, d2.continuous_restrictions) + discr_restrict_diff = merge_diff(d1.discrete_restrictions, d2.discrete_restrictions) + new_domain = DataDomain(**d1.dict()) + + if cont_restrict_diff and discr_restrict_diff: + raise ValueError(f"Can't merge {d1!s} and {d2!s} with continuous and discrete restriction differences") + elif not cont_restrict_diff and not discr_restrict_diff: + raise AssertionError(f"Should not reach this condition with no different restriction in merging domains") + elif cont_restrict_diff: + new_domain.continuous_restrictions[cont_restrict_diff.variable] = cont_restrict_diff + else: + new_domain.discrete_restrictions[discr_restrict_diff.variable] = discr_restrict_diff + + return new_domain def __eq__(self, other): return isinstance(other, DataDomain) and hash(self) == hash(other) diff --git a/python/lib/modeldata/dmod/modeldata/_version.py b/python/lib/modeldata/dmod/modeldata/_version.py index ae4865cf6..2c7bffbf8 100644 --- a/python/lib/modeldata/dmod/modeldata/_version.py +++ b/python/lib/modeldata/dmod/modeldata/_version.py @@ -1 +1 @@ -__version__ = '0.11.1' +__version__ = '0.12.0' diff --git a/python/lib/modeldata/dmod/modeldata/data/filesystem_manager.py b/python/lib/modeldata/dmod/modeldata/data/filesystem_manager.py index 8b7dca2e0..142036332 100644 --- a/python/lib/modeldata/dmod/modeldata/data/filesystem_manager.py +++ b/python/lib/modeldata/dmod/modeldata/data/filesystem_manager.py @@ -1,10 +1,12 @@ import json import logging +from datetime import datetime from pathlib import Path from typing import Optional, List, Set, Union, Any, Tuple -from dmod.core.dataset import Dataset, DatasetManager, DatasetType +from dmod.core.common.reader import Reader +from dmod.core.dataset import Dataset, DatasetManager, DatasetType, InitialDataAdder from dmod.core.exception import DmodRuntimeError from dmod.core.meta_data import DataCategory, DataDomain @@ -84,8 +86,8 @@ def __init__(self, serialized_files_directory: Optional[Path] = None, *args, **k msg = "{} could not reload a dataset from {} due to {} ({})" logging.warning(msg.format(self.__class__.__name__, str(file), e.__class__.__name__, str(e))) - def add_data(self, dataset_name: str, dest: str, data: Optional[bytes] = None, source: Optional[str] = None, - is_temp: bool = False, **kwargs) -> bool: + def add_data(self, dataset_name: str, dest: str, domain: DataDomain, data: Optional[Union[bytes, Reader]] = None, + source: Optional[str] = None, is_temp: bool = False, **kwargs) -> bool: """ Add raw data or data from one or more files to this dataset. @@ -97,6 +99,8 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[bytes] = None, s A path-like string that provides information on the location within the dataset where the data should be added when either adding byte string data from ``data`` or when adding from a single file specified in ``source`` (ignored when adding from files within a ``source`` directory). + domain : DataDomain + The defined domain for the data being added. data : Optional[bytes] Optional encoded byte string containing data to be inserted into the data set; either this or ``source`` must be provided. @@ -122,7 +126,7 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[bytes] = None, s raise NotImplementedError(msg.format(self.__class__.__name__)) def create(self, name: str, category: DataCategory, domain: DataDomain, is_read_only: bool, - initial_data: Optional[str] = None) -> Dataset: + initial_data: Optional[InitialDataAdder] = None, expires_on: Optional[datetime] = None) -> Dataset: msg = "Creating datasets managed by {} type not currently supported" raise NotImplementedError(msg.format(self.__class__.__name__)) @@ -139,6 +143,27 @@ def delete(self, dataset: Dataset, **kwargs) -> bool: msg = "Deleting datasets managed by {} type not currently supported" raise NotImplementedError(msg.format(self.__class__.__name__)) + def delete_data(self, dataset_name: str, removed_domain: DataDomain, **kwargs) -> bool: + """ + Delete data in some format from the dataset. + + Parameters + ---------- + dataset_name : str + The dataset from which to delete data. + removed_domain : DataDomain + The portion of the dataset's domain corresponding to the deleted data, which should be subtracted from the + dataset's domain. + kwargs + Implementation-specific params for referencing what data should be deleted and how. + + Returns + ------- + bool + Whether the data was deleted successfully. + """ + raise NotImplementedError(f"{self.__class__.__name__} does not currently support deleting data.") + def get_data(self, dataset_name: str, item_name: str, **kwargs) -> Union[bytes, Any]: msg = "Getting data from datasets managed by {} type not currently supported" raise NotImplementedError(msg.format(self.__class__.__name__)) diff --git a/python/lib/modeldata/dmod/modeldata/data/object_store_manager.py b/python/lib/modeldata/dmod/modeldata/data/object_store_manager.py index 8127108e8..8b3097e23 100644 --- a/python/lib/modeldata/dmod/modeldata/data/object_store_manager.py +++ b/python/lib/modeldata/dmod/modeldata/data/object_store_manager.py @@ -1,5 +1,6 @@ import io import json +import logging import minio.retention @@ -174,8 +175,8 @@ def _push_files(self, bucket_name: str, dir_path: Path, recursive: bool = True, self.persist_serialized(bucket_name) # TODO: update to also make adjustments to the domain appropriately when data changes (deleting data also) - def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Reader]] = None, source: Optional[str] = None, - is_temp: bool = False, **kwargs) -> bool: + def add_data(self, dataset_name: str, dest: str, domain: DataDomain, data: Optional[Union[bytes, Reader]] = None, + source: Optional[str] = None, is_temp: bool = False, **kwargs) -> bool: """ Add raw data or data from one or more files to the object store for the given dataset. @@ -216,6 +217,8 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Rea A path-like string that provides information on the location within the dataset where the data should be added when either adding byte string data from ``data`` or when adding from a single file specified in ``source`` (ignored when adding from files within a ``source`` directory). + domain : DataDomain + The defined domain for the data being added. data : Optional[Union[bytes, Reader]] Optional encoded byte string _or_ object with read() method returning bytes containing data to be inserted into the data set; either this or ``source`` must be provided. @@ -244,7 +247,17 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Rea """ if dataset_name not in self.datasets: return False - elif data is not None: + + # Make sure we can updated the data domain as expected + try: + updated_domain = DataDomain.merge_domains(self.datasets[dataset_name].data_domain, domain) + except Exception as e: + # TODO: (later) return result indicator explaining why there was a failure + logging.debug(f"Failed to add data to {dataset_name} after {e.__class__.__name__} ({e!s}); couldn't merge " + f"new domain component {domain!s} into {self.datasets[dataset_name].data_domain!s}") + return False + + if data is not None: if is_temp: retention = minio.retention.Retention(mode=minio.retention.GOVERNANCE, retain_until_date=datetime.now() + timedelta(hours=1)) @@ -254,13 +267,18 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Rea # Use AWS S3 default part size of 5MiB # https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html part_size = 5 * 1024 * 1024 - result = self._client.put_object(bucket_name=dataset_name, data=data, length=-1, part_size=part_size, object_name=dest, - retention=retention) + result = self._client.put_object(bucket_name=dataset_name, data=data, length=-1, part_size=part_size, + object_name=dest, retention=retention) else: result = self._client.put_object(bucket_name=dataset_name, data=io.BytesIO(data), length=len(data), - object_name=dest, retention=retention) + object_name=dest, retention=retention) # TODO: do something more intelligent than this for determining success - return result.bucket_name == dataset_name + if result.bucket_name != dataset_name: + return False + else: + self.datasets[dataset_name].data_domain = updated_domain + self.persist_serialized(dataset_name) + return True elif is_temp: raise NotImplementedError("Function add_data() does not support ``is_temp`` except when suppying raw data.") elif source is None or len(source) == 0: @@ -276,12 +294,18 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Rea elif src_path.is_dir(): bucket_root = kwargs.get('bucket_root', src_path) self._push_files(bucket_name=dataset_name, dir_path=src_path, bucket_root=bucket_root) - # TODO: probably need something better than just always returning True if this gets executed + self.datasets[dataset_name].data_domain = updated_domain + self.persist_serialized(dataset_name) return True else: result = self._push_file(bucket_name=dataset_name, file=src_path, dest=dest) # TODO: test - return isinstance(result.object_name, str) + if isinstance(result.object_name, str): + self.datasets[dataset_name].data_domain = updated_domain + self.persist_serialized(dataset_name) + return True + else: + return False def combine_partials_into_composite(self, dataset_name: str, item_name: str, combined_list: List[str]) -> bool: try: @@ -404,14 +428,16 @@ def delete(self, dataset: Dataset, **kwargs) -> bool: else: return False - # TODO: update to also make adjustments to the domain appropriately when data changes (deleting data also) - def delete_data(self, dataset_name: str, **kwargs) -> bool: + def delete_data(self, dataset_name: str, removed_domain: DataDomain, **kwargs) -> bool: """ Parameters ---------- dataset_name : str The name of the dataset. + removed_domain : DataDomain + The portion of the dataset's domain corresponding to the deleted data, which should be subtracted from the + dataset's domain. kwargs Keyword args (see below). @@ -427,6 +453,7 @@ def delete_data(self, dataset_name: str, **kwargs) -> bool: bool Whether the delete was successful. """ + # TODO: (later) account for automated domain rechecking item_names = kwargs.get('item_names', kwargs.get('file_names', None)) if item_names is None: return False @@ -434,8 +461,17 @@ def delete_data(self, dataset_name: str, **kwargs) -> bool: elif 0 < len([fn for fn in item_names if fn not in self.list_files(dataset_name)]): return False + # Account for the removed domain param + try: + updated_domain = DataDomain.subtract_domains(self.datasets[dataset_name].data_domain, removed_domain) + except: + # TODO: (later) log and/or return result indicator explaining why there was a failure + return False + errors = self._client.remove_objects(bucket_name=dataset_name, delete_object_list=[DeleteObject(fn) for fn in item_names]) + self.datasets[dataset_name].data_domain = updated_domain + self.persist_serialized(dataset_name) error_list = [] for error in errors: # TODO: later on, probably need to log this somewhere diff --git a/python/lib/modeldata/dmod/test/it_object_store_dataset_manager.py b/python/lib/modeldata/dmod/test/it_object_store_dataset_manager.py index 075d8de2f..93c2a9af8 100644 --- a/python/lib/modeldata/dmod/test/it_object_store_dataset_manager.py +++ b/python/lib/modeldata/dmod/test/it_object_store_dataset_manager.py @@ -3,6 +3,7 @@ import os import unittest from ..modeldata.data.object_store_manager import Dataset, DatasetType, ObjectStoreDatasetManager +from ..modeldata.data.item_domain_detector import AorcCsvFileDomainDetector from dmod.core.meta_data import DataCategory, DataDomain, DataFormat, DiscreteRestriction, TimeRange from pathlib import Path from typing import Optional, Set @@ -86,11 +87,10 @@ def setUp(self) -> None: # '%Y-%m-%d %H:%M:%S' time_range_1 = TimeRange.factory_init_from_deserialized_json( - {'begin': '2022-01-01 00:00:00', - 'end': '2022-02-01 00:00:00', + {'begin': '2016-01-01 00:00:00', + 'end': '2016-01-31 23:00:00', 'variable': 'TIME', - 'subclass': 'TimeRange', - 'datetime_pattern': TimeRange.get_datetime_str_format()}) + 'subclass': 'TimeRange'}) domain_1 = DataDomain(data_format=DataFormat.AORC_CSV, continuous_restrictions=[time_range_1], discrete_restrictions=[DiscreteRestriction("catchment_id", ['cat-1', 'cat-2', 'cat-3'])]) # Remember that this is not example serialized JSON, but a dict that gets expanded into parameters passed to @@ -109,18 +109,22 @@ def test_add_data_1_a(self): """ Test that a simple data add creates a new object as expected. """ ex_num = 1 dataset_name = self.examples[ex_num]['name'] - dest_object_name = 'data_file' + file_to_add = Path(self.find_git_root_dir()).joinpath('data/example_forcing_aorc_csv/cat-12.csv') + #expected_name = 'cat-12.csv' + dest_object_name = 'cat-12.csv' + detector = AorcCsvFileDomainDetector(item=file_to_add) self.assertFalse(self.minio_client.bucket_exists(dataset_name)) self.manager.create(**self.examples[ex_num]) + does_exist = self.minio_client.bucket_exists(dataset_name) if does_exist: self._datasets_to_cleanup.add(dataset_name) self.assertNotIn(dest_object_name, self.manager.list_files(dataset_name)) - original_data = "File data contents" - result = self.manager.add_data(dataset_name=dataset_name, dest=dest_object_name, data=original_data.encode()) + result = self.manager.add_data(dataset_name=dataset_name, dest=dest_object_name, data=file_to_add.read_bytes(), + domain=detector.detect()) self.assertTrue(result) self.assertIn(dest_object_name, self.manager.list_files(dataset_name)) @@ -129,7 +133,10 @@ def test_add_data_1_b(self): """ Test that a simple data add of raw data works correctly. """ ex_num = 1 dataset_name = self.examples[ex_num]['name'] - dest_object_name = 'data_file' + file_to_add = Path(self.find_git_root_dir()).joinpath('data/example_forcing_aorc_csv/cat-12.csv') + #expected_name = 'cat-12.csv' + dest_object_name = 'cat-12.csv' + detector = AorcCsvFileDomainDetector(item=file_to_add) self.assertFalse(self.minio_client.bucket_exists(dataset_name)) self.manager.create(**self.examples[ex_num]) @@ -137,19 +144,20 @@ def test_add_data_1_b(self): if does_exist: self._datasets_to_cleanup.add(dataset_name) - original_data = "File data contents" - self.manager.add_data(dataset_name=dataset_name, dest=dest_object_name, data=original_data.encode()) + original_data = file_to_add.read_bytes() + self.manager.add_data(dataset_name=dataset_name, dest=dest_object_name, data=original_data, + domain=detector.detect()) raw_read_data = self.manager.get_data(dataset_name, item_name=dest_object_name) - read_data = raw_read_data.decode() - self.assertEqual(original_data, read_data) + self.assertEqual(original_data, raw_read_data) def test_add_data_1_c(self): """ Test that a data add of a file works correctly with specified dest. """ ex_num = 1 dataset_name = self.examples[ex_num]['name'] - file_to_add = Path(self.find_git_root_dir()).joinpath('doc/GIT_USAGE.md') - expected_name = 'GIT_USAGE.md' + file_to_add = Path(self.find_git_root_dir()).joinpath('data/example_forcing_aorc_csv/cat-12.csv') + expected_name = 'cat-12.csv' + detector = AorcCsvFileDomainDetector(item=file_to_add) self.assertTrue(file_to_add.is_file()) expected_data = file_to_add.read_bytes() @@ -160,7 +168,8 @@ def test_add_data_1_c(self): if does_exist: self._datasets_to_cleanup.add(dataset_name) - self.manager.add_data(dataset_name=dataset_name, dest=expected_name, source=str(file_to_add)) + self.manager.add_data(dataset_name=dataset_name, dest=expected_name, source=str(file_to_add), + domain=detector.detect()) raw_read_data = self.manager.get_data(dataset_name, item_name=expected_name) self.assertEqual(expected_data, raw_read_data) @@ -169,16 +178,23 @@ def test_add_data_1_d(self): """ Test that a data add of a directory of files works correctly with implied bucket root. """ ex_num = 1 dataset_name = self.examples[ex_num]['name'] - dir_to_add = Path(self.find_git_root_dir()).joinpath('doc') - + dir_to_add = Path(self.find_git_root_dir()).joinpath('data/example_forcing_aorc_csv') # Note that if the project's doc dir is altered in certain ways, this may have to be manually updated self.assertTrue(dir_to_add.is_dir()) - one_files_name = 'GIT_USAGE.md' - one_file = dir_to_add.joinpath(one_files_name) - one_files_expected_data = one_file.read_bytes() - num_uploaded_files = sum([len(files) for _, _, files in os.walk(dir_to_add)]) + + domain = None + one_file = None + uploaded_file_count = 0 + for f in dir_to_add.iterdir(): + uploaded_file_count += 1 + if one_file is None: + one_file = f + detector = AorcCsvFileDomainDetector(item=f) + domain = detector.detect() if domain is None else DataDomain.merge_domains(domain, detector.detect()) + + one_file_expected_data = one_file.read_bytes() # This is actually one more, because of the serialized dataset state file - expected_num_files = num_uploaded_files + 1 + expected_num_files = uploaded_file_count + 1 self.assertFalse(self.minio_client.bucket_exists(dataset_name)) self.manager.create(**self.examples[ex_num]) @@ -186,14 +202,14 @@ def test_add_data_1_d(self): if does_exist: self._datasets_to_cleanup.add(dataset_name) - self.manager.add_data(dataset_name=dataset_name, dest='', source=str(dir_to_add)) + self.manager.add_data(dataset_name=dataset_name, dest='', source=str(dir_to_add), domain=domain) actual_num_files = len(self.manager.list_files(dataset_name)) self.assertEqual(expected_num_files, actual_num_files) - raw_read_data = self.manager.get_data(dataset_name, item_name=one_files_name) + raw_read_data = self.manager.get_data(dataset_name, item_name=one_file.name) - self.assertEqual(one_files_expected_data, raw_read_data) + self.assertEqual(one_file_expected_data, raw_read_data) def test_create_1_a(self): """ diff --git a/python/lib/modeldata/setup.py b/python/lib/modeldata/setup.py index 03f0b676a..c65e9a271 100644 --- a/python/lib/modeldata/setup.py +++ b/python/lib/modeldata/setup.py @@ -26,7 +26,7 @@ "geopandas", "ngen-config@git+https://github.com/noaa-owp/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_conf", "dmod-communication>=0.4.2", - "dmod-core>=0.15.2", + "dmod-core>=0.16.0", "minio", "aiohttp~=3.8", "shapely>=2.0.0", diff --git a/python/services/dataservice/dmod/dataservice/_version.py b/python/services/dataservice/dmod/dataservice/_version.py index 8969d4966..9d1bb721b 100644 --- a/python/services/dataservice/dmod/dataservice/_version.py +++ b/python/services/dataservice/dmod/dataservice/_version.py @@ -1 +1 @@ -__version__ = '0.9.1' +__version__ = '0.10.0' diff --git a/python/services/dataservice/setup.py b/python/services/dataservice/setup.py index 473035e81..cb95223b5 100644 --- a/python/services/dataservice/setup.py +++ b/python/services/dataservice/setup.py @@ -17,8 +17,8 @@ author_email='', url='', license='', - install_requires=['dmod-core>=0.10.0', 'dmod-communication>=0.14.0', 'dmod-scheduler>=0.10.0', - 'dmod-modeldata>=0.9.0', 'redis', "pydantic[dotenv]>=1.10.8,~=1.10", "fastapi", "uvicorn[standard]", + install_requires=['dmod-core>=0.16.0', 'dmod-communication>=0.14.0', 'dmod-scheduler>=0.10.0', + 'dmod-modeldata>=0.12.0', 'redis', "pydantic[dotenv]>=1.10.8,~=1.10", "fastapi", "uvicorn[standard]", 'ngen-config@git+https://github.com/noaa-owp/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_conf', 'ngen-cal@git+https://github.com/noaa-owp/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_cal'], packages=find_namespace_packages(exclude=['dmod.test', 'deprecated', 'conf', 'schemas', 'ssl', 'src'])