Skip to content

Commit

Permalink
Require domain difference for add/delete of data.
Browse files Browse the repository at this point in the history
Update dataset manager interface for add_data and delete_data (and
implementations) to require a DataDomain for the added/deleted data be
passed as an arg so that the dataset's domain can also be updated.
  • Loading branch information
robertbartel committed May 13, 2024
1 parent 55a8963 commit 02349a2
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 18 deletions.
11 changes: 8 additions & 3 deletions python/lib/core/dmod/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,8 +527,8 @@ def __init__(self, uuid: Optional[UUID] = None, datasets: Optional[Dict[str, Dat
""" A property attribute to hold errors encountered during operations. """

@abstractmethod
def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Reader]] = None, source: Optional[str] = None,
is_temp: bool = False, **kwargs) -> bool:
def add_data(self, dataset_name: str, dest: str, domain: DataDomain, data: Optional[Union[bytes, Reader]] = None,
source: Optional[str] = None, is_temp: bool = False, **kwargs) -> bool:
"""
Add data in some format to the dataset.
Expand All @@ -542,6 +542,8 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Rea
dest : str
A path-like string specifying a location within the dataset (e.g., file, object, sub-URL) where the data
should be added.
domain : DataDomain
The defined domain for the data being added.
data : Optional[Union[bytes, Reader]]
Optional encoded byte string _or_ object with read() method returning bytes containing data to be inserted
into the data set; either this or ``source`` must be provided.
Expand Down Expand Up @@ -648,14 +650,17 @@ def delete(self, dataset: Dataset, **kwargs) -> bool:

# TODO: add back as abstract, then implement properly in subtypes
#@abstractmethod
def delete_data(self, dataset_name: str, **kwargs) -> bool:
def delete_data(self, dataset_name: str, removed_domain: DataDomain, **kwargs) -> bool:
"""
Delete data in some format from the dataset.
Parameters
----------
dataset_name : str
The dataset from which to delete data.
removed_domain : DataDomain
The portion of the dataset's domain corresponding to the deleted data, which should be subtracted from the
dataset's domain.
kwargs
Implementation-specific params for referencing what data should be deleted and how.
Expand Down
33 changes: 29 additions & 4 deletions python/lib/modeldata/dmod/modeldata/data/filesystem_manager.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import json
import logging

from datetime import datetime
from pathlib import Path
from typing import Optional, List, Set, Union, Any, Tuple

from dmod.core.dataset import Dataset, DatasetManager, DatasetType
from dmod.core.common.reader import Reader
from dmod.core.dataset import Dataset, DatasetManager, DatasetType, InitialDataAdder
from dmod.core.exception import DmodRuntimeError
from dmod.core.meta_data import DataCategory, DataDomain

Expand Down Expand Up @@ -84,8 +86,8 @@ def __init__(self, serialized_files_directory: Optional[Path] = None, *args, **k
msg = "{} could not reload a dataset from {} due to {} ({})"
logging.warning(msg.format(self.__class__.__name__, str(file), e.__class__.__name__, str(e)))

def add_data(self, dataset_name: str, dest: str, data: Optional[bytes] = None, source: Optional[str] = None,
is_temp: bool = False, **kwargs) -> bool:
def add_data(self, dataset_name: str, dest: str, domain: DataDomain, data: Optional[Union[bytes, Reader]] = None,
source: Optional[str] = None, is_temp: bool = False, **kwargs) -> bool:
"""
Add raw data or data from one or more files to this dataset.
Expand All @@ -97,6 +99,8 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[bytes] = None, s
A path-like string that provides information on the location within the dataset where the data should be
added when either adding byte string data from ``data`` or when adding from a single file specified in
``source`` (ignored when adding from files within a ``source`` directory).
domain : DataDomain
The defined domain for the data being added.
data : Optional[bytes]
Optional encoded byte string containing data to be inserted into the data set; either this or ``source``
must be provided.
Expand All @@ -122,7 +126,7 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[bytes] = None, s
raise NotImplementedError(msg.format(self.__class__.__name__))

def create(self, name: str, category: DataCategory, domain: DataDomain, is_read_only: bool,
initial_data: Optional[str] = None) -> Dataset:
initial_data: Optional[InitialDataAdder] = None, expires_on: Optional[datetime] = None) -> Dataset:
msg = "Creating datasets managed by {} type not currently supported"
raise NotImplementedError(msg.format(self.__class__.__name__))

Expand All @@ -139,6 +143,27 @@ def delete(self, dataset: Dataset, **kwargs) -> bool:
msg = "Deleting datasets managed by {} type not currently supported"
raise NotImplementedError(msg.format(self.__class__.__name__))

def delete_data(self, dataset_name: str, removed_domain: DataDomain, **kwargs) -> bool:
"""
Delete data in some format from the dataset.
Parameters
----------
dataset_name : str
The dataset from which to delete data.
removed_domain : DataDomain
The portion of the dataset's domain corresponding to the deleted data, which should be subtracted from the
dataset's domain.
kwargs
Implementation-specific params for referencing what data should be deleted and how.
Returns
-------
bool
Whether the data was deleted successfully.
"""
raise NotImplementedError(f"{self.__class__.__name__} does not currently support deleting data.")

def get_data(self, dataset_name: str, item_name: str, **kwargs) -> Union[bytes, Any]:
msg = "Getting data from datasets managed by {} type not currently supported"
raise NotImplementedError(msg.format(self.__class__.__name__))
Expand Down
55 changes: 44 additions & 11 deletions python/lib/modeldata/dmod/modeldata/data/object_store_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ def _push_files(self, bucket_name: str, dir_path: Path, recursive: bool = True,
self.persist_serialized(bucket_name)

# TODO: update to also make adjustments to the domain appropriately when data changes (deleting data also)
def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Reader]] = None, source: Optional[str] = None,
is_temp: bool = False, **kwargs) -> bool:
def add_data(self, dataset_name: str, dest: str, domain: DataDomain, data: Optional[Union[bytes, Reader]] = None,
source: Optional[str] = None, is_temp: bool = False, **kwargs) -> bool:
"""
Add raw data or data from one or more files to the object store for the given dataset.
Expand Down Expand Up @@ -216,6 +216,8 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Rea
A path-like string that provides information on the location within the dataset where the data should be
added when either adding byte string data from ``data`` or when adding from a single file specified in
``source`` (ignored when adding from files within a ``source`` directory).
domain : DataDomain
The defined domain for the data being added.
data : Optional[Union[bytes, Reader]]
Optional encoded byte string _or_ object with read() method returning bytes containing data to be inserted
into the data set; either this or ``source`` must be provided.
Expand Down Expand Up @@ -244,7 +246,15 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Rea
"""
if dataset_name not in self.datasets:
return False
elif data is not None:

# Make sure we can updated the data domain as expected
try:
updated_domain = DataDomain.merge_domains(self.datasets[dataset_name].data_domain, domain)
except:
# TODO: (later) log and/or return result indicator explaining why there was a failure
return False

if data is not None:
if is_temp:
retention = minio.retention.Retention(mode=minio.retention.GOVERNANCE,
retain_until_date=datetime.now() + timedelta(hours=1))
Expand All @@ -254,13 +264,18 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Rea
# Use AWS S3 default part size of 5MiB
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
part_size = 5 * 1024 * 1024
result = self._client.put_object(bucket_name=dataset_name, data=data, length=-1, part_size=part_size, object_name=dest,
retention=retention)
result = self._client.put_object(bucket_name=dataset_name, data=data, length=-1, part_size=part_size,
object_name=dest, retention=retention)
else:
result = self._client.put_object(bucket_name=dataset_name, data=io.BytesIO(data), length=len(data),
object_name=dest, retention=retention)
object_name=dest, retention=retention)
# TODO: do something more intelligent than this for determining success
return result.bucket_name == dataset_name
if result.bucket_name != dataset_name:
return False
else:
self.datasets[dataset_name].data_domain = updated_domain
self.persist_serialized(dataset_name)
return True
elif is_temp:
raise NotImplementedError("Function add_data() does not support ``is_temp`` except when suppying raw data.")
elif source is None or len(source) == 0:
Expand All @@ -276,12 +291,18 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Rea
elif src_path.is_dir():
bucket_root = kwargs.get('bucket_root', src_path)
self._push_files(bucket_name=dataset_name, dir_path=src_path, bucket_root=bucket_root)
# TODO: probably need something better than just always returning True if this gets executed
self.datasets[dataset_name].data_domain = updated_domain
self.persist_serialized(dataset_name)
return True
else:
result = self._push_file(bucket_name=dataset_name, file=src_path, dest=dest)
# TODO: test
return isinstance(result.object_name, str)
if isinstance(result.object_name, str):
self.datasets[dataset_name].data_domain = updated_domain
self.persist_serialized(dataset_name)
return True
else:
return False

def combine_partials_into_composite(self, dataset_name: str, item_name: str, combined_list: List[str]) -> bool:
try:
Expand Down Expand Up @@ -404,14 +425,16 @@ def delete(self, dataset: Dataset, **kwargs) -> bool:
else:
return False

# TODO: update to also make adjustments to the domain appropriately when data changes (deleting data also)
def delete_data(self, dataset_name: str, **kwargs) -> bool:
def delete_data(self, dataset_name: str, removed_domain: DataDomain, **kwargs) -> bool:
"""
Parameters
----------
dataset_name : str
The name of the dataset.
removed_domain : DataDomain
The portion of the dataset's domain corresponding to the deleted data, which should be subtracted from the
dataset's domain.
kwargs
Keyword args (see below).
Expand All @@ -427,15 +450,25 @@ def delete_data(self, dataset_name: str, **kwargs) -> bool:
bool
Whether the delete was successful.
"""
# TODO: (later) account for automated domain rechecking
item_names = kwargs.get('item_names', kwargs.get('file_names', None))
if item_names is None:
return False
# Make sure all the files we are asked to delete are actually in the dataset bucket
elif 0 < len([fn for fn in item_names if fn not in self.list_files(dataset_name)]):
return False

# Account for the removed domain param
try:
updated_domain = DataDomain.subtract_domains(self.datasets[dataset_name].data_domain, removed_domain)
except:
# TODO: (later) log and/or return result indicator explaining why there was a failure
return False

errors = self._client.remove_objects(bucket_name=dataset_name,
delete_object_list=[DeleteObject(fn) for fn in item_names])
self.datasets[dataset_name].data_domain = updated_domain
self.persist_serialized(dataset_name)
error_list = []
for error in errors:
# TODO: later on, probably need to log this somewhere
Expand Down

0 comments on commit 02349a2

Please sign in to comment.