Skip to content

Commit

Permalink
Optimize object store manager using archiving.
Browse files Browse the repository at this point in the history
Take advantage of minio archiving feature when adding data to empty,
read-only dataset (i.e., adding initial data that will not be changed),
since lots of files in minio bucket has significant overhead.
  • Loading branch information
robertbartel committed Jul 2, 2024
1 parent 158808f commit 054581c
Showing 1 changed file with 27 additions and 3 deletions.
30 changes: 27 additions & 3 deletions python/lib/modeldata/dmod/modeldata/data/object_store_manager.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import io
import json
import logging
import tempfile

import minio.retention

from dmod.core.meta_data import DataCategory, DataDomain
from dmod.core.dataset import Dataset, DatasetManager, DatasetType, InitialDataAdder
from dmod.core.dataset import DataArchiving, Dataset, DatasetManager, DatasetType, InitialDataAdder
from dmod.core.common.reader import Reader
from datetime import datetime, timedelta
from minio import Minio
Expand All @@ -14,6 +15,7 @@
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple, Union
from uuid import UUID
from zipfile import ZipFile


class ObjectStoreDatasetManager(DatasetManager):
Expand Down Expand Up @@ -245,6 +247,14 @@ def add_data(self, dataset_name: str, dest: str, domain: DataDomain, data: Optio
::method:`_push_file`
::method:`_push_files`
"""
# Prevent adding to read-only dataset except when first setting it up
if self.datasets[dataset_name].is_read_only:
ds_files = [f for f in self.list_files(dataset_name) if f != self.get_serial_dataset_filename(dataset_name)]
if len(ds_files) != 0:
logging.error(f"{self.__class__.__name__} can't add data to read-only dataset except when it is empty "
f"and initializing")
return False

if dataset_name not in self.datasets:
return False

Expand Down Expand Up @@ -292,8 +302,22 @@ def add_data(self, dataset_name: str, dest: str, domain: DataDomain, data: Optio
msg = "{}.{} source path '{}' does not exist."
raise ValueError(msg.format(self.__class__.__name__, _getframe(0).f_code.co_name, source))
elif src_path.is_dir():
bucket_root = kwargs.get('bucket_root', src_path)
self._push_files(bucket_name=dataset_name, dir_path=src_path, bucket_root=bucket_root)
# Recognized scenario when we may want to take advantage of small file archives
# (see https://blog.min.io/small-file-archives/)
# Also, we already know from above that, if read-only, dataset must also be empty
# TODO: (later) consider whether there is some minimum file count (perhaps configurable) to also consider
if self.datasets[dataset_name].is_read_only:
self.datasets[dataset_name].data_archiving = DataArchiving.ZIP_0
# Combine all the files in that directory into an uncompressed zip archive
with tempfile.TemporaryDirectory() as zip_dest_dir_name:
archive_path = Path(f"{zip_dest_dir_name}/{self.datasets[dataset_name].archive_name}")
with ZipFile(archive_path, "w") as archive:
for f in src_path.glob("*"):
archive.write(filename=str(f), arcname=f.name)
self._push_file(bucket_name=dataset_name, file=archive_path)
else:
self._push_files(bucket_name=dataset_name, dir_path=src_path,
bucket_root=kwargs.get('bucket_root', src_path))
self.datasets[dataset_name].data_domain = updated_domain
self.persist_serialized(dataset_name)
return True
Expand Down

0 comments on commit 054581c

Please sign in to comment.