From 2462c2b3494f28df1554503ce589de2f83d16096 Mon Sep 17 00:00:00 2001 From: Markus Binsteiner Date: Tue, 9 Jan 2024 17:23:46 +0100 Subject: [PATCH] chore: start working on data export --- docs/development/stores.md | 26 +++ pyproject.toml | 2 + src/kiara/context/config.py | 4 +- src/kiara/interfaces/python_api/__init__.py | 4 +- src/kiara/registries/__init__.py | 7 + .../registries/data/data_store/__init__.py | 90 ++++++-- .../data/data_store/filesystem_store.py | 12 +- .../data/data_store/sqlite_store.py | 195 ++++++++++++++++++ src/kiara/utils/cli/run.py | 11 + 9 files changed, 323 insertions(+), 28 deletions(-) create mode 100644 docs/development/stores.md create mode 100644 src/kiara/registries/data/data_store/sqlite_store.py diff --git a/docs/development/stores.md b/docs/development/stores.md new file mode 100644 index 000000000..6580828eb --- /dev/null +++ b/docs/development/stores.md @@ -0,0 +1,26 @@ +# *kiara* stores + +This page contains some information about how *kiara* stores work. + +Practically, there are two types of stores in *kiara*: + +- *archives*: stores that can only be read from, but not written to +- *stores*: atual 'stores', those are read as well as write + +*kiara* has different store types, depending on what exactly is stored: + +- *data stores*: stores that store actual data, those are the most important ones +- *alias stores*: stores that keep human readable references (aliases), and link them to actual data (using their value_id) +- *job stores*: stores details and records about past jobs that were run in a *kiara* instance + +## Base store + +All archives & stores inherit from the base class 'kiara.registries.BaseArchive', which manages basic attributes like thie stores id, it's configuration, and it holds a reference to the current kiara context. + +As a developer, you probably won't be using this directly, but you will inherit from either a higher level abstract base class, in case of data-stores that would be: + +- `kiara.registries.data.DataArchive` +- `kiara.registries.data.DataStore` + +Depending on what you want to store, it's a good idea to check out the source code of those base classes, and look up which methods you need to implement. +Also, you can check out the default implementation of such a store/archive ('filesystem'-based in all cases), to get an idea what needs to happen in each of those methods. diff --git a/pyproject.toml b/pyproject.toml index acb428b24..39c2bcb96 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -110,6 +110,8 @@ filesystem_destiny_archive = "kiara.registries.destinies.filesystem_store:FileSy filesystem_destiny_store = "kiara.registries.destinies.filesystem_store:FileSystemDestinyStore" filesystem_workflow_archive = "kiara.registries.workflows.archives:FileSystemWorkflowArchive" filesystem_workflow_store = "kiara.registries.workflows.archives:FileSystemWorkflowStore" +sqlite_data_archive = "kiara.registries.data.data_store.sqlite_store:SqliteDataArchive" +sqlite_data_store = "kiara.registries.data.data_store.sqlite_store:SqliteDataStore" [project.entry-points."kiara.cli_subcommands"] diff --git a/src/kiara/context/config.py b/src/kiara/context/config.py index e49e9d9ce..3c704daf7 100644 --- a/src/kiara/context/config.py +++ b/src/kiara/context/config.py @@ -317,7 +317,9 @@ def _validate_context(self, context_config: KiaraContextConfig) -> bool: changed = False if DEFAULT_DATA_STORE_MARKER not in context_config.archives.keys(): - data_store_type = "filesystem_data_store" + # data_store_type = "filesystem_data_store" + # TODO: change that back + data_store_type = "sqlite_data_store" assert data_store_type in available_archives.item_infos.keys() data_store_id = ID_REGISTRY.generate(comment="default data store id") diff --git a/src/kiara/interfaces/python_api/__init__.py b/src/kiara/interfaces/python_api/__init__.py index 9e53d61ba..bddca9423 100644 --- a/src/kiara/interfaces/python_api/__init__.py +++ b/src/kiara/interfaces/python_api/__init__.py @@ -1665,7 +1665,9 @@ def store_value( result = StoreValueResult( value=value_obj, aliases=sorted(alias) if alias else [], - error=str(e), + error=str(e) + if str(e) + else f"Unknown error (type '{type(e).__name__}').", persisted_data=persisted_data, ) diff --git a/src/kiara/registries/__init__.py b/src/kiara/registries/__init__.py index ccc367fc2..82348f91d 100644 --- a/src/kiara/registries/__init__.py +++ b/src/kiara/registries/__init__.py @@ -147,3 +147,10 @@ class FileSystemArchiveConfig(ArchiveConfig): archive_path: str = Field( description="The path where the data for this archive is stored." ) + + +class SqliteArchiveConfig(ArchiveConfig): + + archive_path: str = Field( + description="The path where the data for this archive is stored." + ) diff --git a/src/kiara/registries/data/data_store/__init__.py b/src/kiara/registries/data/data_store/__init__.py index 19c26184c..8aab50007 100644 --- a/src/kiara/registries/data/data_store/__init__.py +++ b/src/kiara/registries/data/data_store/__init__.py @@ -25,8 +25,16 @@ class DataArchive(BaseArchive): + """Base class for data archiv implementationss.""" + + @classmethod + def is_writeable(cls) -> bool: + """Archives are never writable.""" + return False + @classmethod def supported_item_types(cls) -> Iterable[str]: + """This archive type only supports storing data.""" return ["data"] @@ -42,6 +50,7 @@ def __init__(self, archive_id: uuid.UUID, config: ARCHIVE_CONFIG_CLS): def retrieve_serialized_value( self, value: Union[uuid.UUID, Value] ) -> PersistedData: + """Retrieve a 'PersistedData' instance from a value id or value instance.""" if isinstance(value, Value): value_id: uuid.UUID = value.value_id @@ -64,9 +73,21 @@ def retrieve_serialized_value( @abc.abstractmethod def _retrieve_serialized_value(self, value: Value) -> PersistedData: + """Retrieve a 'PersistedData' instance from a value instance. + + This method basically implements the store-specific logic to serialize/deserialize the value data to/from disk. + + Raise an exception if the value is not persisted in this archive. + """ pass def retrieve_value(self, value_id: uuid.UUID) -> Value: + """Retrieve the value for the specified value_id. + + Looks up the value in the cache first, and if not found, calls the '_retrieve_value_details' method to retrieve + + Raises an exception if the value is not persisted in this archive. + """ cached = self._value_cache.get(value_id, None) if cached is not None: @@ -100,12 +121,18 @@ def retrieve_value(self, value_id: uuid.UUID) -> Value: @abc.abstractmethod def _retrieve_value_details(self, value_id: uuid.UUID) -> Mapping[str, Any]: + """Retrieve the value details for the specified value_id from disk. + + This method basically implements the store-specific logic to retrieve the value details from disk. + + """ pass @property def value_ids(self) -> Union[None, Iterable[uuid.UUID]]: return self._retrieve_all_value_ids() + @abc.abstractmethod def _retrieve_all_value_ids( self, data_type_name: Union[str, None] = None ) -> Union[None, Iterable[uuid.UUID]]: @@ -153,6 +180,11 @@ def retrieve_environment_details( def _retrieve_environment_details( self, env_type: str, env_hash: str ) -> Mapping[str, Any]: + """Retrieve the environment details with the specified type and hash. + + Each store needs to implement this so environemnt details related to a value can be retrieved later on. Since in most cases the environment details will not change, a lookup is more efficient than having to store the full information with each value. + """ + pass def find_values(self, matcher: ValueMatcher) -> Iterable[Value]: @@ -164,12 +196,16 @@ def find_values_with_hash( value_size: Union[int, None] = None, data_type_name: Union[str, None] = None, ) -> Set[uuid.UUID]: + """Find all values that have data that match the specifid hash. - if data_type_name is not None: - raise NotImplementedError() + If the data type name is specified, only values of that type are considered, which should speed up the search. Same with 'value_size'. But both filters are not implemented yet. + """ - if value_size is not None: - raise NotImplementedError() + # if data_type_name is not None: + # raise NotImplementedError() + # + # if value_size is not None: + # raise NotImplementedError() if value_hash in self._value_hash_index.keys(): value_ids: Union[Set[uuid.UUID], None] = self._value_hash_index[value_hash] @@ -182,6 +218,9 @@ def find_values_with_hash( self._value_hash_index[value_hash] = value_ids assert value_ids is not None + + # TODO: if data_type_name or value_size are specified, validate the results? + return value_ids @abc.abstractmethod @@ -191,11 +230,23 @@ def _find_values_with_hash( value_size: Union[int, None] = None, data_type_name: Union[str, None] = None, ) -> Union[Set[uuid.UUID], None]: + """Find all values that have data that match the specifid hash. + + If the data type name is specified, only values of that type are considered, which should speed up the search. Same with 'value_size'. + This needs to be implemented in the implementing store though, and might or might not be used. + """ + pass def find_destinies_for_value( self, value_id: uuid.UUID, alias_filter: Union[str, None] = None ) -> Union[Mapping[str, uuid.UUID], None]: + """Find all destinies for the specified value id. + + TODO: explain destinies, and when they would be used. + + For now, you can just return 'None' in your implementation. + """ return self._find_destinies_for_value( value_id=value_id, alias_filter=alias_filter @@ -205,6 +256,13 @@ def find_destinies_for_value( def _find_destinies_for_value( self, value_id: uuid.UUID, alias_filter: Union[str, None] = None ) -> Union[Mapping[str, uuid.UUID], None]: + """Find all destinies for the specified value id. + + TODO: explain destinies, and when they would be used. + + For now, you can just return 'None' in your implementation. + """ + pass @abc.abstractmethod @@ -214,18 +272,11 @@ def retrieve_chunk( as_file: Union[bool, str, None] = None, symlink_ok: bool = True, ) -> Union[bytes, str]: - pass + """Retrieve the chunk with the specified id. - # def retrieve_job_record(self, inputs_manifest: InputsManifest) -> Optional[JobRecord]: - # return self._retrieve_job_record( - # manifest_hash=inputs_manifest.manifest_hash, jobs_hash=inputs_manifest.jobs_hash - # ) - # - # @abc.abstractmethod - # def _retrieve_job_record( - # self, manifest_hash: int, jobs_hash: int - # ) -> Optional[JobRecord]: - # pass + If 'as_file' is specified, the chunk is written to a file, and the file path is returned. Otherwise, the chunk is returned as 'bytes'. + """ + pass class DataStore(DataArchive): @@ -255,6 +306,10 @@ class BaseDataStore(DataStore): @abc.abstractmethod def _persist_stored_value_info(self, value: Value, persisted_value: PersistedData): + """Store the details about the persisted data. + + This is used so an archive of this type can load the value data again later on. Value metadata is stored separately, later, using the '_persist_value_details' method. + """ pass @abc.abstractmethod @@ -263,6 +318,7 @@ def _persist_value_details(self, value: Value): @abc.abstractmethod def _persist_value_data(self, value: Value) -> PersistedData: + """Persist the actual value data.""" pass @abc.abstractmethod @@ -278,6 +334,10 @@ def _persist_value_pedigree(self, value: Value): def _persist_environment_details( self, env_type: str, env_hash: str, env_data: Mapping[str, Any] ): + """Persist the environment details. + + Each store type needs to store this for lookup purposes. + """ pass @abc.abstractmethod diff --git a/src/kiara/registries/data/data_store/filesystem_store.py b/src/kiara/registries/data/data_store/filesystem_store.py index ff5ec9021..ffdeb9b1d 100644 --- a/src/kiara/registries/data/data_store/filesystem_store.py +++ b/src/kiara/registries/data/data_store/filesystem_store.py @@ -25,7 +25,6 @@ ) from kiara.registries import ArchiveDetails, FileSystemArchiveConfig from kiara.registries.data.data_store import BaseDataStore, DataArchive -from kiara.registries.jobs import JobArchive from kiara.utils import log_message from kiara.utils.hashfs import HashAddress, HashFS from kiara.utils.json import orjson_dumps @@ -53,22 +52,13 @@ class EntityType(Enum): DEFAULT_HASH_FS_ALGORITHM = "sha256" -class FileSystemDataArchive(DataArchive, JobArchive): +class FileSystemDataArchive(DataArchive): """Data store that loads data from the local filesystem.""" _archive_type_name = "filesystem_data_archive" _config_cls = FileSystemArchiveConfig # type: ignore - # @classmethod - # def supported_item_types(cls) -> Iterable[str]: - # - # return ["data", "job_record"] - - @classmethod - def is_writeable(cls) -> bool: - return False - def __init__(self, archive_id: uuid.UUID, config: FileSystemArchiveConfig): DataArchive.__init__(self, archive_id=archive_id, config=config) diff --git a/src/kiara/registries/data/data_store/sqlite_store.py b/src/kiara/registries/data/data_store/sqlite_store.py new file mode 100644 index 000000000..f19962910 --- /dev/null +++ b/src/kiara/registries/data/data_store/sqlite_store.py @@ -0,0 +1,195 @@ +# -*- coding: utf-8 -*- +import uuid +from pathlib import Path +from typing import Any, Iterable, Mapping, Set, Union + +from sqlalchemy import Engine, create_engine, text + +from kiara.models.values.value import PersistedData, SerializedData, Value +from kiara.registries import SqliteArchiveConfig +from kiara.registries.data import DataArchive +from kiara.registries.data.data_store import BaseDataStore +from kiara.utils.windows import fix_windows_longpath + + +class SqliteDataArchive(DataArchive): + + _archive_type_name = "sqlite_data_archive" + _config_cls = SqliteArchiveConfig + + def __init__(self, archive_id: uuid.UUID, config: SqliteArchiveConfig): + + DataArchive.__init__(self, archive_id=archive_id, config=config) + self._db_path: Union[Path, None] = None + self._cached_engine: Union[Engine, None] = None + # self._lock: bool = True + + @property + def sqlite_path(self): + + if self._db_path is not None: + return self._db_path + + db_path = Path(self.config.archive_path).resolve() + self._db_path = fix_windows_longpath(db_path) + + if self._db_path.exists(): + return self._db_path + + self._db_path.parent.mkdir(parents=True, exist_ok=True) + return self._db_path + + @property + def db_url(self) -> str: + return f"sqlite:///{self.sqlite_path}" + + @property + def sqlite_engine(self) -> "Engine": + + if self._cached_engine is not None: + return self._cached_engine + + # def _pragma_on_connect(dbapi_con, con_record): + # dbapi_con.execute("PRAGMA query_only = ON") + + self._cached_engine = create_engine(self.db_url, future=True) + create_table_sql = """ +CREATE TABLE IF NOT EXISTS values_metadata ( + value_id TEXT PRIMARY KEY, + value_hash TEXT NOT NULL, + value_size INTEGER NOT NULL, + data_type_name TEXT NOT NULL, + metadata TEXT NOT NULL +); +CREATE TABLE IF NOT EXISTS values_data ( + chunk_id TEXT PRIMARY KEY, + chunk_type TEXT NOT NULL, + chunk_data BLOB NOT NULL, + compression_type TEXT NULL, +); +CREATE TABLE IF NOT EXISTS environments ( + environment_type TEXT NOT NULL, + environment_hash TEXT NOT NULL, + environment_data TEXT NOT NULL, + PRIMARY KEY (environment_type, environment_hash) +); +""" + + with self._cached_engine.begin() as connection: + for statement in create_table_sql.split(";"): + if statement.strip(): + connection.execute(text(statement)) + + # if self._lock: + # event.listen(self._cached_engine, "connect", _pragma_on_connect) + return self._cached_engine + + def _retrieve_serialized_value(self, value: Value) -> PersistedData: + raise NotImplementedError() + + def _retrieve_value_details(self, value_id: uuid.UUID) -> Mapping[str, Any]: + + sql = text("SELECT metadata FROM values_metadata WHERE value_id = ?") + with self.sqlite_engine.connect() as conn: + cursor = conn.execute(sql, (str(value_id),)) + result = cursor.fetchone() + return result[0] + + raise NotImplementedError() + + def _retrieve_environment_details( + self, env_type: str, env_hash: str + ) -> Mapping[str, Any]: + + sql = text( + "SELECT environment_data FROM environments_data WHERE environment_type = ? AND environment_hash = ?" + ) + with self.sqlite_engine.connect() as conn: + cursor = conn.execute(sql, (env_type, env_hash)) + result = cursor.fetchone() + return result[0] + + # def find_values(self, matcher: ValueMatcher) -> Iterable[Value]: + # raise NotImplementedError() + + def _retrieve_all_value_ids( + self, data_type_name: Union[str, None] = None + ) -> Union[None, Iterable[uuid.UUID]]: + + dbg("RETRIEVE ALL") + + sql = text("SELECT value_id FROM values_metadata") + with self.sqlite_engine.connect() as conn: + cursor = conn.execute(sql) + result = cursor.fetchall() + return {uuid.UUID(x[0]) for x in result} + + def _find_values_with_hash( + self, + value_hash: str, + value_size: Union[int, None] = None, + data_type_name: Union[str, None] = None, + ) -> Union[Set[uuid.UUID], None]: + + if value_size is not None: + raise NotImplementedError() + if data_type_name is not None: + raise NotImplementedError() + + sql = text("SELECT value_id FROM values_metadata WHERE value_hash = ?") + with self.sqlite_engine.connect() as conn: + cursor = conn.execute(sql, (value_hash,)) + result = cursor.fetchall() + return {uuid.UUID(x[0]) for x in result} + + def _find_destinies_for_value( + self, value_id: uuid.UUID, alias_filter: Union[str, None] = None + ) -> Union[Mapping[str, uuid.UUID], None]: + + # TODO: implement this + return None + + def retrieve_chunk( + self, + chunk_id: str, + as_file: Union[bool, str, None] = None, + symlink_ok: bool = True, + ) -> Union[bytes, str]: + + if as_file: + raise NotImplementedError() + + sql = text("SELECT data FROM value_data WHERE chunk_id = ?") + with self.sqlite_engine.connect() as conn: + cursor = conn.execute(sql, (chunk_id,)) + result_bytes = cursor.fetchone() + return result_bytes[0] + + +class SqliteDataStore(SqliteDataArchive, BaseDataStore): + + _archive_type_name = "sqlite_data_store" + + def _persist_environment_details( + self, env_type: str, env_hash: str, env_data: Mapping[str, Any] + ): + raise NotImplementedError() + + def _persist_value_data(self, value: Value) -> PersistedData: + + serialized_value: SerializedData = value.serialized_data + dbg(serialized_value) + + raise NotImplementedError() + + def _persist_stored_value_info(self, value: Value, persisted_value: PersistedData): + raise NotImplementedError() + + def _persist_value_details(self, value: Value): + raise NotImplementedError() + + def _persist_destiny_backlinks(self, value: Value): + raise NotImplementedError() + + def _persist_value_pedigree(self, value: Value): + raise NotImplementedError() diff --git a/src/kiara/utils/cli/run.py b/src/kiara/utils/cli/run.py index feb985987..354b7c214 100644 --- a/src/kiara/utils/cli/run.py +++ b/src/kiara/utils/cli/run.py @@ -407,6 +407,17 @@ def execute_job( saved_results = api.store_values(outputs, alias_map=alias_map) + error = False + for field, v in saved_results.root.items(): + if v.error: + error = True + terminal_print() + terminal_print( + f"[red]Error saving result for field '{field}'[/red]: {v.error}" + ) + if error: + sys.exit(1) + api.context.job_registry.store_job_record(job_id=job_id) if len(saved_results) == 1: