Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update dataset manager API for data-domain-impacting operations #608

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/lib/client/dmod/client/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.6.1'
__version__ = '0.7.0'
4 changes: 2 additions & 2 deletions python/lib/client/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
license='',
include_package_data=True,
#install_requires=['websockets', 'jsonschema'],vi
install_requires=['dmod-core>=0.15.2', 'websockets>=8.1', 'pydantic>=1.10.8,~=1.10', 'dmod-communication>=0.17.0',
'dmod-externalrequests>=0.6.0', 'dmod-modeldata>=0.11.1'],
install_requires=['dmod-core>=0.16.0', 'websockets>=8.1', 'pydantic>=1.10.8,~=1.10', 'dmod-communication>=0.17.0',
'dmod-externalrequests>=0.6.0', 'dmod-modeldata>=0.12.0'],
packages=find_namespace_packages(include=['dmod.*'], exclude=['dmod.test'])
)
2 changes: 1 addition & 1 deletion python/lib/core/dmod/core/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.15.2'
__version__ = '0.16.0'
11 changes: 8 additions & 3 deletions python/lib/core/dmod/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,8 +527,8 @@ def __init__(self, uuid: Optional[UUID] = None, datasets: Optional[Dict[str, Dat
""" A property attribute to hold errors encountered during operations. """

@abstractmethod
def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Reader]] = None, source: Optional[str] = None,
is_temp: bool = False, **kwargs) -> bool:
def add_data(self, dataset_name: str, dest: str, domain: DataDomain, data: Optional[Union[bytes, Reader]] = None,
source: Optional[str] = None, is_temp: bool = False, **kwargs) -> bool:
"""
Add data in some format to the dataset.

Expand All @@ -542,6 +542,8 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Rea
dest : str
A path-like string specifying a location within the dataset (e.g., file, object, sub-URL) where the data
should be added.
domain : DataDomain
The defined domain for the data being added.
data : Optional[Union[bytes, Reader]]
Optional encoded byte string _or_ object with read() method returning bytes containing data to be inserted
into the data set; either this or ``source`` must be provided.
Expand Down Expand Up @@ -648,14 +650,17 @@ def delete(self, dataset: Dataset, **kwargs) -> bool:

# TODO: add back as abstract, then implement properly in subtypes
#@abstractmethod
def delete_data(self, dataset_name: str, **kwargs) -> bool:
def delete_data(self, dataset_name: str, removed_domain: DataDomain, **kwargs) -> bool:
"""
Delete data in some format from the dataset.

Parameters
----------
dataset_name : str
The dataset from which to delete data.
removed_domain : DataDomain
The portion of the dataset's domain corresponding to the deleted data, which should be subtracted from the
dataset's domain.
kwargs
Implementation-specific params for referencing what data should be deleted and how.

Expand Down
293 changes: 253 additions & 40 deletions python/lib/core/dmod/core/meta_data.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion python/lib/modeldata/dmod/modeldata/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.11.1'
__version__ = '0.12.0'
33 changes: 29 additions & 4 deletions python/lib/modeldata/dmod/modeldata/data/filesystem_manager.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import json
import logging

from datetime import datetime
from pathlib import Path
from typing import Optional, List, Set, Union, Any, Tuple

from dmod.core.dataset import Dataset, DatasetManager, DatasetType
from dmod.core.common.reader import Reader
from dmod.core.dataset import Dataset, DatasetManager, DatasetType, InitialDataAdder
from dmod.core.exception import DmodRuntimeError
from dmod.core.meta_data import DataCategory, DataDomain

Expand Down Expand Up @@ -84,8 +86,8 @@ def __init__(self, serialized_files_directory: Optional[Path] = None, *args, **k
msg = "{} could not reload a dataset from {} due to {} ({})"
logging.warning(msg.format(self.__class__.__name__, str(file), e.__class__.__name__, str(e)))

def add_data(self, dataset_name: str, dest: str, data: Optional[bytes] = None, source: Optional[str] = None,
is_temp: bool = False, **kwargs) -> bool:
def add_data(self, dataset_name: str, dest: str, domain: DataDomain, data: Optional[Union[bytes, Reader]] = None,
source: Optional[str] = None, is_temp: bool = False, **kwargs) -> bool:
"""
Add raw data or data from one or more files to this dataset.

Expand All @@ -97,6 +99,8 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[bytes] = None, s
A path-like string that provides information on the location within the dataset where the data should be
added when either adding byte string data from ``data`` or when adding from a single file specified in
``source`` (ignored when adding from files within a ``source`` directory).
domain : DataDomain
The defined domain for the data being added.
data : Optional[bytes]
Optional encoded byte string containing data to be inserted into the data set; either this or ``source``
must be provided.
Expand All @@ -122,7 +126,7 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[bytes] = None, s
raise NotImplementedError(msg.format(self.__class__.__name__))

def create(self, name: str, category: DataCategory, domain: DataDomain, is_read_only: bool,
initial_data: Optional[str] = None) -> Dataset:
initial_data: Optional[InitialDataAdder] = None, expires_on: Optional[datetime] = None) -> Dataset:
msg = "Creating datasets managed by {} type not currently supported"
raise NotImplementedError(msg.format(self.__class__.__name__))

Expand All @@ -139,6 +143,27 @@ def delete(self, dataset: Dataset, **kwargs) -> bool:
msg = "Deleting datasets managed by {} type not currently supported"
raise NotImplementedError(msg.format(self.__class__.__name__))

def delete_data(self, dataset_name: str, removed_domain: DataDomain, **kwargs) -> bool:
"""
Delete data in some format from the dataset.

Parameters
----------
dataset_name : str
The dataset from which to delete data.
removed_domain : DataDomain
The portion of the dataset's domain corresponding to the deleted data, which should be subtracted from the
dataset's domain.
kwargs
Implementation-specific params for referencing what data should be deleted and how.

Returns
-------
bool
Whether the data was deleted successfully.
"""
raise NotImplementedError(f"{self.__class__.__name__} does not currently support deleting data.")

def get_data(self, dataset_name: str, item_name: str, **kwargs) -> Union[bytes, Any]:
msg = "Getting data from datasets managed by {} type not currently supported"
raise NotImplementedError(msg.format(self.__class__.__name__))
Expand Down
55 changes: 44 additions & 11 deletions python/lib/modeldata/dmod/modeldata/data/object_store_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ def _push_files(self, bucket_name: str, dir_path: Path, recursive: bool = True,
self.persist_serialized(bucket_name)

# TODO: update to also make adjustments to the domain appropriately when data changes (deleting data also)
def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Reader]] = None, source: Optional[str] = None,
is_temp: bool = False, **kwargs) -> bool:
def add_data(self, dataset_name: str, dest: str, domain: DataDomain, data: Optional[Union[bytes, Reader]] = None,
source: Optional[str] = None, is_temp: bool = False, **kwargs) -> bool:
"""
Add raw data or data from one or more files to the object store for the given dataset.

Expand Down Expand Up @@ -216,6 +216,8 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Rea
A path-like string that provides information on the location within the dataset where the data should be
added when either adding byte string data from ``data`` or when adding from a single file specified in
``source`` (ignored when adding from files within a ``source`` directory).
domain : DataDomain
The defined domain for the data being added.
data : Optional[Union[bytes, Reader]]
Optional encoded byte string _or_ object with read() method returning bytes containing data to be inserted
into the data set; either this or ``source`` must be provided.
Expand Down Expand Up @@ -244,7 +246,15 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Rea
"""
if dataset_name not in self.datasets:
return False
elif data is not None:

# Make sure we can updated the data domain as expected
try:
updated_domain = DataDomain.merge_domains(self.datasets[dataset_name].data_domain, domain)
except:
# TODO: (later) log and/or return result indicator explaining why there was a failure
return False

if data is not None:
if is_temp:
retention = minio.retention.Retention(mode=minio.retention.GOVERNANCE,
retain_until_date=datetime.now() + timedelta(hours=1))
Expand All @@ -254,13 +264,18 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Rea
# Use AWS S3 default part size of 5MiB
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
part_size = 5 * 1024 * 1024
result = self._client.put_object(bucket_name=dataset_name, data=data, length=-1, part_size=part_size, object_name=dest,
retention=retention)
result = self._client.put_object(bucket_name=dataset_name, data=data, length=-1, part_size=part_size,
object_name=dest, retention=retention)
else:
result = self._client.put_object(bucket_name=dataset_name, data=io.BytesIO(data), length=len(data),
object_name=dest, retention=retention)
object_name=dest, retention=retention)
# TODO: do something more intelligent than this for determining success
return result.bucket_name == dataset_name
if result.bucket_name != dataset_name:
return False
else:
self.datasets[dataset_name].data_domain = updated_domain
self.persist_serialized(dataset_name)
return True
elif is_temp:
raise NotImplementedError("Function add_data() does not support ``is_temp`` except when suppying raw data.")
elif source is None or len(source) == 0:
Expand All @@ -276,12 +291,18 @@ def add_data(self, dataset_name: str, dest: str, data: Optional[Union[bytes, Rea
elif src_path.is_dir():
bucket_root = kwargs.get('bucket_root', src_path)
self._push_files(bucket_name=dataset_name, dir_path=src_path, bucket_root=bucket_root)
# TODO: probably need something better than just always returning True if this gets executed
self.datasets[dataset_name].data_domain = updated_domain
self.persist_serialized(dataset_name)
return True
else:
result = self._push_file(bucket_name=dataset_name, file=src_path, dest=dest)
# TODO: test
return isinstance(result.object_name, str)
if isinstance(result.object_name, str):
self.datasets[dataset_name].data_domain = updated_domain
self.persist_serialized(dataset_name)
return True
else:
return False

def combine_partials_into_composite(self, dataset_name: str, item_name: str, combined_list: List[str]) -> bool:
try:
Expand Down Expand Up @@ -404,14 +425,16 @@ def delete(self, dataset: Dataset, **kwargs) -> bool:
else:
return False

# TODO: update to also make adjustments to the domain appropriately when data changes (deleting data also)
def delete_data(self, dataset_name: str, **kwargs) -> bool:
def delete_data(self, dataset_name: str, removed_domain: DataDomain, **kwargs) -> bool:
"""

Parameters
----------
dataset_name : str
The name of the dataset.
removed_domain : DataDomain
The portion of the dataset's domain corresponding to the deleted data, which should be subtracted from the
dataset's domain.
kwargs
Keyword args (see below).

Expand All @@ -427,15 +450,25 @@ def delete_data(self, dataset_name: str, **kwargs) -> bool:
bool
Whether the delete was successful.
"""
# TODO: (later) account for automated domain rechecking
item_names = kwargs.get('item_names', kwargs.get('file_names', None))
if item_names is None:
return False
# Make sure all the files we are asked to delete are actually in the dataset bucket
elif 0 < len([fn for fn in item_names if fn not in self.list_files(dataset_name)]):
return False

# Account for the removed domain param
try:
updated_domain = DataDomain.subtract_domains(self.datasets[dataset_name].data_domain, removed_domain)
except:
# TODO: (later) log and/or return result indicator explaining why there was a failure
return False

errors = self._client.remove_objects(bucket_name=dataset_name,
delete_object_list=[DeleteObject(fn) for fn in item_names])
self.datasets[dataset_name].data_domain = updated_domain
self.persist_serialized(dataset_name)
error_list = []
for error in errors:
# TODO: later on, probably need to log this somewhere
Expand Down
2 changes: 1 addition & 1 deletion python/lib/modeldata/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"geopandas",
"ngen-config@git+https://github.com/noaa-owp/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_conf",
"dmod-communication>=0.4.2",
"dmod-core>=0.15.2",
"dmod-core>=0.16.0",
"minio",
"aiohttp~=3.8",
"shapely>=2.0.0",
Expand Down
2 changes: 1 addition & 1 deletion python/services/dataservice/dmod/dataservice/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.9.1'
__version__ = '0.10.0'
4 changes: 2 additions & 2 deletions python/services/dataservice/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
author_email='',
url='',
license='',
install_requires=['dmod-core>=0.10.0', 'dmod-communication>=0.14.0', 'dmod-scheduler>=0.10.0',
'dmod-modeldata>=0.9.0', 'redis', "pydantic[dotenv]>=1.10.8,~=1.10", "fastapi", "uvicorn[standard]",
install_requires=['dmod-core>=0.16.0', 'dmod-communication>=0.14.0', 'dmod-scheduler>=0.10.0',
'dmod-modeldata>=0.12.0', 'redis', "pydantic[dotenv]>=1.10.8,~=1.10", "fastapi", "uvicorn[standard]",
'ngen-config@git+https://github.com/noaa-owp/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_conf',
'ngen-cal@git+https://github.com/noaa-owp/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_cal'],
packages=find_namespace_packages(exclude=['dmod.test', 'deprecated', 'conf', 'schemas', 'ssl', 'src'])
Expand Down
Loading