From f56a26850274589282105014745f81a50927e2bf Mon Sep 17 00:00:00 2001 From: Markus Binsteiner Date: Tue, 23 Jan 2024 10:46:32 +0100 Subject: [PATCH] chore: before destiny refactoring --- src/kiara/context/config.py | 2 +- src/kiara/registries/__init__.py | 1 - src/kiara/registries/aliases/__init__.py | 10 +- src/kiara/registries/data/__init__.py | 12 +- src/kiara/registries/destinies/registry.py | 46 +++---- .../registries/destinies/sqlite_store.py | 116 ++++++++++++++++++ src/kiara/registries/jobs/__init__.py | 9 +- src/kiara/registries/workflows/__init__.py | 12 +- 8 files changed, 163 insertions(+), 45 deletions(-) create mode 100644 src/kiara/registries/destinies/sqlite_store.py diff --git a/src/kiara/context/config.py b/src/kiara/context/config.py index 473d2d998..6c59c68d5 100644 --- a/src/kiara/context/config.py +++ b/src/kiara/context/config.py @@ -477,7 +477,6 @@ def _validate_context(self, context_config: KiaraContextConfig) -> bool: changed = False - sqlite_base_path = os.path.join(self.stores_base_path, "sqlite_stores") filesystem_base_path = os.path.join(self.stores_base_path, "filesystem_stores") @@ -521,6 +520,7 @@ def _validate_context(self, context_config: KiaraContextConfig) -> bool: if DEFAULT_WORKFLOW_STORE_MARKER not in context_config.archives.keys(): workflow_store_type = "filesystem_workflow_store" + workflow_store = create_default_store_config( store_type=workflow_store_type, stores_base_path=os.path.join(filesystem_base_path, "workflows"), diff --git a/src/kiara/registries/__init__.py b/src/kiara/registries/__init__.py index 42fe3f2af..6587512a9 100644 --- a/src/kiara/registries/__init__.py +++ b/src/kiara/registries/__init__.py @@ -310,7 +310,6 @@ def create_new_store_config( # Connect to the SQLite database (or create it if it doesn't exist) import sqlite3 - print(archive_path) conn = sqlite3.connect(archive_path) # Create a cursor object diff --git a/src/kiara/registries/aliases/__init__.py b/src/kiara/registries/aliases/__init__.py index 5b7af38c9..d817be10f 100644 --- a/src/kiara/registries/aliases/__init__.py +++ b/src/kiara/registries/aliases/__init__.py @@ -95,15 +95,13 @@ def __init__(self, kiara: "Kiara"): def register_archive( self, archive: AliasArchive, - alias: Union[str, None] = None, set_as_default_store: Union[bool, None] = None, ): - alias_archive_id = archive.archive_id - archive.register_archive(kiara=self._kiara) + alias = archive.archive_alias - if alias is None: - alias = str(alias_archive_id) + if not alias: + raise Exception("Invalid alias archive alias: can't be empty.") if "." in alias: raise Exception( @@ -113,6 +111,8 @@ def register_archive( if alias in self._alias_archives.keys(): raise Exception(f"Can't add store, alias '{alias}' already registered.") + archive.register_archive(kiara=self._kiara) + self._alias_archives[alias] = archive is_store = False is_default_store = False diff --git a/src/kiara/registries/data/__init__.py b/src/kiara/registries/data/__init__.py index 34b7f4663..7aaed3c53 100644 --- a/src/kiara/registries/data/__init__.py +++ b/src/kiara/registries/data/__init__.py @@ -258,19 +258,21 @@ def retrieve_all_available_value_ids(self) -> Set[uuid.UUID]: def register_data_archive( self, archive: DataArchive, - alias: Union[str, None] = None, set_as_default_store: Union[bool, None] = None, ) -> str: - data_store_id = archive.archive_id - archive.register_archive(kiara=self._kiara) - if alias is None: - alias = str(data_store_id) + alias = archive.archive_alias + + if not alias: + raise Exception("Invalid data archive alias: can't be empty.") if alias in self._data_archives.keys(): raise Exception( f"Can't add data archive, alias '{alias}' already registered." ) + + archive.register_archive(kiara=self._kiara) + self._data_archives[alias] = archive is_store = False is_default_store = False diff --git a/src/kiara/registries/destinies/registry.py b/src/kiara/registries/destinies/registry.py index 2125bdd2d..b14a8091c 100644 --- a/src/kiara/registries/destinies/registry.py +++ b/src/kiara/registries/destinies/registry.py @@ -66,14 +66,14 @@ def destiny_archives(self) -> Mapping[str, DestinyArchive]: def register_destiny_archive( self, archive: DestinyArchive, - alias: Union[str, None] = None, set_as_default_store: Union[bool, None] = None, ): - destiny_store_id = archive.archive_id + alias = archive.archive_alias archive.register_archive(kiara=self._kiara) - if alias is None: - alias = str(destiny_store_id) + + if not alias: + raise Exception("Invalid destiny archive alias: can't be empty.") if alias in self._destiny_archives.keys(): raise Exception( @@ -288,22 +288,22 @@ def attach_as_property( add_origin_to_property_value=True, ) - def store_destiny(self, destiny_id: Union[Destiny, uuid.UUID]): - - try: - _destiny_id: uuid.UUID = destiny_id.destiny_id # type: ignore - except Exception: - # just in case this is a 'Destiny' object - _destiny_id = destiny_id # type: ignore - - store_id = self._destiny_store_map[_destiny_id] - destiny = self._destinies[_destiny_id] - store: DestinyStore = self._destiny_archives[store_id] # type: ignore - - if not isinstance(store, DestinyStore): - full_alias = f"{store_id}.{destiny.destiny_alias}" - raise Exception( - f"Can't store destiny '{full_alias}': prefix '{store_id}' not writable in this kiara context." - ) - - store.persist_destiny(destiny=destiny) + # def store_destiny(self, destiny_id: Union[Destiny, uuid.UUID]): + # + # try: + # _destiny_id: uuid.UUID = destiny_id.destiny_id # type: ignore + # except Exception: + # # just in case this is a 'Destiny' object + # _destiny_id = destiny_id # type: ignore + # + # store_id = self._destiny_store_map[_destiny_id] + # destiny = self._destinies[_destiny_id] + # store: DestinyStore = self._destiny_archives[store_id] # type: ignore + # + # if not isinstance(store, DestinyStore): + # full_alias = f"{store_id}.{destiny.destiny_alias}" + # raise Exception( + # f"Can't store destiny '{full_alias}': prefix '{store_id}' not writable in this kiara context." + # ) + # + # store.persist_destiny(destiny=destiny) diff --git a/src/kiara/registries/destinies/sqlite_store.py b/src/kiara/registries/destinies/sqlite_store.py new file mode 100644 index 000000000..16d7b7381 --- /dev/null +++ b/src/kiara/registries/destinies/sqlite_store.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +import uuid +from pathlib import Path +from typing import Set, Union + +from sqlalchemy import Engine, create_engine, text + +from kiara.models.module.destiny import Destiny +from kiara.registries import SqliteArchiveConfig +from kiara.registries.destinies import DestinyArchive, DestinyStore +from kiara.utils.windows import fix_windows_longpath + + +class SqliteDestinyArchive(DestinyArchive): + + _archive_type_name = "sqlite_destiny_archive" + _config_cls = SqliteArchiveConfig + + def __init__(self, archive_alias: str, archive_config: SqliteArchiveConfig): + + DestinyArchive.__init__( + self, archive_alias=archive_alias, archive_config=archive_config + ) + self._db_path: Union[Path, None] = None + self._cached_engine: Union[Engine, None] = None + # self._lock: bool = True + + def _retrieve_archive_id(self) -> uuid.UUID: + sql = text("SELECT value FROM archive_metadata WHERE key='archive_id'") + + with self.sqlite_engine.connect() as connection: + result = connection.execute(sql) + row = result.fetchone() + if row is None: + raise Exception("No archive ID found in metadata") + return uuid.UUID(row[0]) + + @property + def sqlite_path(self): + + if self._db_path is not None: + return self._db_path + + db_path = Path(self.config.sqlite_db_path).resolve() + self._db_path = fix_windows_longpath(db_path) + + if self._db_path.exists(): + return self._db_path + + self._db_path.parent.mkdir(parents=True, exist_ok=True) + return self._db_path + + @property + def db_url(self) -> str: + return f"sqlite:///{self.sqlite_path}" + + @property + def sqlite_engine(self) -> "Engine": + + if self._cached_engine is not None: + return self._cached_engine + + # def _pragma_on_connect(dbapi_con, con_record): + # dbapi_con.execute("PRAGMA query_only = ON") + + self._cached_engine = create_engine(self.db_url, future=True) + create_table_sql = """ +CREATE TABLE IF NOT EXISTS destiny_details ( + job_hash TEXT PRIMARY KEY, + manifest_hash TEXT NOT NULL, + inputs_hash TEXT NOT NULL, + job_metadata TEXT NOT NULL +); +""" + + with self._cached_engine.begin() as connection: + for statement in create_table_sql.split(";"): + if statement.strip(): + connection.execute(text(statement)) + + # if self._lock: + # event.listen(self._cached_engine, "connect", _pragma_on_connect) + return self._cached_engine + + def get_all_value_ids(self) -> Set[uuid.UUID]: + pass + + def get_destiny_aliases_for_value( + self, value_id: uuid.UUID + ) -> Union[Set[str], None]: + pass + + def get_destiny(self, value_id: uuid.UUID, destiny: str) -> Destiny: + pass + + +class SqliteJobStore(SqliteDestinyArchive, DestinyStore): + + _archive_type_name = "sqlite_destiny_store" + + def persist_destiny(self, destiny: Destiny): + + for value_id in destiny.fixed_inputs.values(): + + path = self._translate_value_id( + value_id=value_id, destiny_alias=destiny.destiny_alias + ) + if path.exists(): + logger.debug("replace.destiny.file", path=path.as_posix()) + path.unlink() + # raise Exception( + # f"Can't persist destiny '{destiny.destiny_id}': already persisted." + # ) + + path.parent.mkdir(parents=True, exist_ok=True) + fix_windows_symlink(destiny_path, path) diff --git a/src/kiara/registries/jobs/__init__.py b/src/kiara/registries/jobs/__init__.py index d009f0a79..43213f370 100644 --- a/src/kiara/registries/jobs/__init__.py +++ b/src/kiara/registries/jobs/__init__.py @@ -242,16 +242,19 @@ def suppoerted_event_types(self) -> Iterable[Type[KiaraEvent]]: return [JobArchiveAddedEvent, JobRecordPreStoreEvent, JobRecordStoredEvent] - def register_job_archive(self, archive: JobArchive, alias: Union[str, None] = None): + def register_job_archive(self, archive: JobArchive): - if alias is None: - alias = str(archive.archive_id) + alias = archive.archive_alias + + if not alias: + raise Exception("Invalid job archive alias: can't be empty.") if alias in self._job_archives.keys(): raise Exception( f"Can't register job store, store id already registered: {alias}." ) + archive.register_archive(self._kiara) self._job_archives[alias] = archive is_store = False diff --git a/src/kiara/registries/workflows/__init__.py b/src/kiara/registries/workflows/__init__.py index abf22a385..b03d669fa 100644 --- a/src/kiara/registries/workflows/__init__.py +++ b/src/kiara/registries/workflows/__init__.py @@ -147,15 +147,11 @@ def __init__(self, kiara: "Kiara"): def register_archive( self, archive: WorkflowArchive, - alias: Union[str, None] = None, set_as_default_store: Union[bool, None] = None, ): - - workflow_archive_id = archive.archive_id - archive.register_archive(kiara=self._kiara) - - if alias is None: - alias = str(workflow_archive_id) + alias = archive.archive_alias + if not alias: + raise Exception("Invalid workflows archive alias: can't be empty.") if "." in alias: raise Exception( @@ -167,6 +163,8 @@ def register_archive( f"Can't add store, workflow archive alias '{alias}' already registered." ) + archive.register_archive(kiara=self._kiara) + self._workflow_archives[alias] = archive is_store = False is_default_store = False