From 091a14463bdec6e3af27661514e38bc77cae7922 Mon Sep 17 00:00:00 2001 From: Markus Binsteiner Date: Mon, 29 Jan 2024 11:22:58 +0100 Subject: [PATCH] chore: ongoing work stores --- pyproject.toml | 2 + src/kiara/context/__init__.py | 45 ++++--- src/kiara/context/config.py | 111 ++++++++++----- src/kiara/models/events/alias_registry.py | 5 +- src/kiara/registries/__init__.py | 10 +- src/kiara/registries/aliases/__init__.py | 33 ++++- src/kiara/registries/aliases/sqlite_store.py | 62 ++++++++- src/kiara/registries/data/__init__.py | 14 +- .../data/data_store/sqlite_store.py | 12 +- src/kiara/registries/jobs/__init__.py | 4 +- .../registries/jobs/job_store/sqlite_store.py | 62 ++++++++- .../registries/workflows/sqlite_store.py | 126 ++++++++++++++++++ src/kiara/utils/stores.py | 32 ++++- 13 files changed, 439 insertions(+), 79 deletions(-) create mode 100644 src/kiara/registries/workflows/sqlite_store.py diff --git a/pyproject.toml b/pyproject.toml index 3835d298e..864dccb5b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -114,6 +114,8 @@ sqlite_alias_archive = "kiara.registries.aliases.sqlite_store:SqliteAliasArchive sqlite_alias_store = "kiara.registries.aliases.sqlite_store:SqliteAliasStore" sqlite_job_archive = "kiara.registries.jobs.job_store.sqlite_store:SqliteJobArchive" sqlite_job_store = "kiara.registries.jobs.job_store.sqlite_store:SqliteJobStore" +sqlite_workflow_archive = "kiara.registries.workflows.sqlite_store:SqliteWorkflowArchive" +sqlite_workflow_store = "kiara.registries.workflows.sqlite_store:SqliteWorkflowStore" [project.entry-points."kiara.cli_subcommands"] diff --git a/src/kiara/context/__init__.py b/src/kiara/context/__init__.py index 3647a6b57..c6ef6db83 100644 --- a/src/kiara/context/__init__.py +++ b/src/kiara/context/__init__.py @@ -9,7 +9,7 @@ # from alembic import command # type: ignore from pydantic import Field -from kiara.context.config import KiaraArchiveReference, KiaraConfig, KiaraContextConfig +from kiara.context.config import KiaraConfig, KiaraContextConfig from kiara.context.runtime_config import KiaraRuntimeConfig from kiara.data_types import DataType from kiara.exceptions import KiaraContextException @@ -47,6 +47,7 @@ from kiara.utils import log_exception, log_message from kiara.utils.class_loading import find_all_archive_types from kiara.utils.operations import filter_operations +from kiara.utils.stores import check_external_archive # Copyright (c) 2021, University of Luxembourg / DHARPA project # Copyright (c) 2021, Markus Binsteiner @@ -322,35 +323,35 @@ def register_external_archive( self, archive: Union[str, KiaraArchive, List[KiaraArchive], List[str]], allow_write_access: bool = False, - ): - - if isinstance(archive, (KiaraArchive, str)): - _archives = [archive] - else: - _archives = archive - - archive_instances = set() - for _archive in _archives: - - if isinstance(_archive, KiaraArchive): - archive_instances.add(_archive) - # TODO: handle write access - continue + ) -> Dict[str, str]: + """Register one or several external archives with the context. - loaded = KiaraArchiveReference.load_existing_archive( - archive_uri=_archive, allow_write_access=allow_write_access - ) + In case you provide KiaraArchive instances, they will be modified in case the provided 'allow_write_access' is different from the 'is_force_read_only' attribute of the archive. + """ - archive_instances.update(loaded) + archive_instances = check_external_archive( + archive=archive, allow_write_access=allow_write_access + ) - for _archve_inst in archive_instances: + result = {} + for _archive_inst in archive_instances: log_message( "register.external.archive", - archive=_archve_inst.archive_alias, + archive=_archive_inst.archive_alias, allow_write_access=allow_write_access, ) - return list(archive_instances) + _archive_inst.set_force_read_only(not allow_write_access) + + supported_item_types = _archive_inst.supported_item_types() + if "data" in supported_item_types: + result["data"] = self.data_registry.register_data_archive(_archive_inst) # type: ignore + if "alias" in supported_item_types: + result["alias"] = self.alias_registry.register_archive(_archive_inst) # type: ignore + if "job_record" in supported_item_types: + result["job_record"] = self.job_registry.register_job_archive(_archive_inst) # type: ignore + + return result def create_manifest( self, module_or_operation: str, config: Union[Mapping[str, Any], None] = None diff --git a/src/kiara/context/config.py b/src/kiara/context/config.py index f415e383c..ca9f44638 100644 --- a/src/kiara/context/config.py +++ b/src/kiara/context/config.py @@ -100,10 +100,14 @@ def load_existing_archive( allow_write_access=allow_write_access, **kwargs, ) + archive_config = archive_cls._config_cls(**data) archive: KiaraArchive = archive_cls( - archive_alias=archive_alias, archive_config=data + archive_alias=archive_alias, archive_config=archive_config ) - archive_configs.append(data) + wrapped_archive_config = KiaraArchiveConfig( + archive_type=store_type, config=data + ) + archive_configs.append(wrapped_archive_config) archives.append(archive) else: for st in store_type: @@ -117,25 +121,33 @@ def load_existing_archive( allow_write_access=allow_write_access, **kwargs, ) + archive_config = archive_cls._config_cls(**data) archive: KiaraArchive = archive_cls( - archive_alias=archive_alias, archive_config=data + archive_alias=archive_alias, archive_config=archive_config + ) + wrapped_archive_config = KiaraArchiveConfig( + archive_type=st, config=data ) - archive_configs.append(data) + archive_configs.append(wrapped_archive_config) archives.append(archive) else: for archive_type, archive_cls in archive_types.items(): - data = archive_cls.load_store_config( store_uri=archive_uri, allow_write_access=allow_write_access, **kwargs, ) + if data is None: continue + archive_config = archive_cls._config_cls(**data) archive: KiaraArchive = archive_cls( - archive_alias=archive_alias, archive_config=data + archive_alias=archive_alias, archive_config=archive_config + ) + wrapped_archive_config = KiaraArchiveConfig( + archive_type=archive_type, config=data ) - archive_configs.append(data) + archive_configs.append(wrapped_archive_config) archives.append(archive) if archives is None: @@ -183,10 +195,14 @@ def archives(self) -> List["KiaraArchive"]: ) archive_cls = archive_types[config.archive_type] - archive = archive_cls.load_store_config( + archive_config_data = archive_cls.load_store_config( archive_uri=self.archive_uri, allow_write_access=self.allow_write_access, ) + archive_config = archive_cls._config_cls(**archive_config_data) + archive = archive_cls( + archive_alias=self.archive_alias, archive_config=archive_config + ) result.append(archive) self._archives = result @@ -479,26 +495,61 @@ def _validate_context(self, context_config: KiaraContextConfig) -> bool: sqlite_base_path = os.path.join(self.stores_base_path, "sqlite_stores") filesystem_base_path = os.path.join(self.stores_base_path, "filesystem_stores") + def create_default_sqlite_archive_config() -> Dict[str, Any]: + + store_id = str(uuid.uuid4()) + file_name = f"{store_id}.sqlite" + archive_path = Path( + os.path.abspath(os.path.join(sqlite_base_path, file_name)) + ) + + if archive_path.exists(): + raise Exception( + f"Archive path '{archive_path.as_posix()}' already exists." + ) + + archive_path.parent.mkdir(exist_ok=True, parents=True) + + # Connect to the SQLite database (or create it if it doesn't exist) + import sqlite3 + + conn = sqlite3.connect(archive_path) + + # Create a cursor object + c = conn.cursor() + # Create table + c.execute( + """CREATE TABLE archive_metadata + (key text PRIMARY KEY , value text NOT NULL)""" + ) + c.execute( + f"INSERT INTO archive_metadata VALUES ('archive_id','{store_id}')" + ) + conn.commit() + conn.close() + + return {"sqlite_db_path": archive_path.as_posix()} + + default_sqlite_config: Union[Dict[str, Any], None] = None + if DEFAULT_DATA_STORE_MARKER not in context_config.archives.keys(): - # data_store_type = "filesystem_data_store" - # TODO: change that back - data_store_type = "sqlite_data_store" - data_store = create_default_store_config( - store_type=data_store_type, - stores_base_path=sqlite_base_path, + default_sqlite_config = create_default_sqlite_archive_config() + + data_store = KiaraArchiveConfig( + archive_type="sqlite_data_store", config=default_sqlite_config ) + context_config.archives[DEFAULT_DATA_STORE_MARKER] = data_store changed = True if DEFAULT_JOB_STORE_MARKER not in context_config.archives.keys(): - # job_store_type = "filesystem_job_store" - job_store_type = "sqlite_job_store" + if default_sqlite_config is None: + default_sqlite_config = create_default_sqlite_archive_config() - job_store = create_default_store_config( - store_type=job_store_type, - stores_base_path=sqlite_base_path, + job_store = KiaraArchiveConfig( + archive_type="sqlite_job_store", config=default_sqlite_config ) context_config.archives[DEFAULT_JOB_STORE_MARKER] = job_store @@ -506,19 +557,20 @@ def _validate_context(self, context_config: KiaraContextConfig) -> bool: if DEFAULT_ALIAS_STORE_MARKER not in context_config.archives.keys(): - alias_store_type = "filesystem_alias_store" - alias_store_type = "sqlite_alias_store" + if default_sqlite_config is None: + default_sqlite_config = create_default_sqlite_archive_config() - alias_store = create_default_store_config( - store_type=alias_store_type, - stores_base_path=self.stores_base_path, + alias_store = KiaraArchiveConfig( + archive_type="sqlite_alias_store", config=default_sqlite_config ) + context_config.archives[DEFAULT_ALIAS_STORE_MARKER] = alias_store changed = True if DEFAULT_WORKFLOW_STORE_MARKER not in context_config.archives.keys(): workflow_store_type = "filesystem_workflow_store" + # workflow_store_type = "sqlite_workflow_store" workflow_store = create_default_store_config( store_type=workflow_store_type, @@ -527,17 +579,6 @@ def _validate_context(self, context_config: KiaraContextConfig) -> bool: context_config.archives[DEFAULT_WORKFLOW_STORE_MARKER] = workflow_store changed = True - # if METADATA_DESTINY_STORE_MARKER not in context_config.archives.keys(): - # - # destiny_store_type = "filesystem_destiny_store" - # - # destiny_store = create_default_store_config( - # store_type=destiny_store_type, - # stores_base_path=os.path.join(filesystem_base_path, "destinies"), - # ) - # context_config.archives[METADATA_DESTINY_STORE_MARKER] = destiny_store - # changed = True - return changed def create_context_config( diff --git a/src/kiara/models/events/alias_registry.py b/src/kiara/models/events/alias_registry.py index fdf99d62c..a442914c6 100644 --- a/src/kiara/models/events/alias_registry.py +++ b/src/kiara/models/events/alias_registry.py @@ -5,7 +5,7 @@ # Mozilla Public License, version 2.0 (see LICENSE or https://www.mozilla.org/en-US/MPL/2.0/) import uuid -from typing import Iterable, Literal +from typing import Iterable, Literal, Union from pydantic import Field @@ -27,6 +27,9 @@ class AliasArchiveAddedEvent(RegistryEvent): is_default_store: bool = Field( description="Whether this store acts as default store." ) + mount_point: Union[str, None] = Field( + description="The mountpoint of this alias archive (if specified)." + ) class AliasPreStoreEvent(RegistryEvent): diff --git a/src/kiara/registries/__init__.py b/src/kiara/registries/__init__.py index 6587512a9..058465083 100644 --- a/src/kiara/registries/__init__.py +++ b/src/kiara/registries/__init__.py @@ -89,7 +89,7 @@ class KiaraArchive(abc.ABC, typing.Generic[ARCHIVE_CONFIG_CLS]): @classmethod def _load_store_config( cls, store_uri: str, allow_write_access: bool, **kwargs - ) -> Union[ARCHIVE_CONFIG_CLS, None]: + ) -> Union[typing.Dict[str, typing.Any], None]: """Tries to assemble an archive config from an uri (and optional paramters). If the archive type supports the archive at the uri, then a valid config will be returned, @@ -101,7 +101,7 @@ def _load_store_config( @classmethod def load_store_config( cls, store_uri: str, allow_write_access: bool, **kwargs - ) -> Union[ARCHIVE_CONFIG_CLS, None]: + ) -> Union[typing.Dict[str, typing.Any], None]: log_message( "attempt_loading_existing_store", @@ -160,6 +160,12 @@ def register_archive(self, kiara: "Kiara"): def archive_alias(self) -> str: return self._archive_alias + def is_force_read_only(self) -> bool: + return self._force_read_only + + def set_force_read_only(self, force_read_only: bool): + self._force_read_only = force_read_only + def is_writeable(self) -> bool: if self._force_read_only: return False diff --git a/src/kiara/registries/aliases/__init__.py b/src/kiara/registries/aliases/__init__.py index d817be10f..7f20c5e01 100644 --- a/src/kiara/registries/aliases/__init__.py +++ b/src/kiara/registries/aliases/__init__.py @@ -85,6 +85,8 @@ def __init__(self, kiara: "Kiara"): self._alias_archives: Dict[str, AliasArchive] = {} """All registered archives/stores.""" + self._mountpoints: Dict[str, str] = {} + """All registered mountpoints (key: mountpoint, value: archive_alias).""" self._default_alias_store: Union[str, None] = None """The alias of the store where new aliases are stored by default.""" @@ -96,24 +98,34 @@ def register_archive( self, archive: AliasArchive, set_as_default_store: Union[bool, None] = None, - ): + mount_point: Union[str, None] = None, + ) -> str: alias = archive.archive_alias if not alias: raise Exception("Invalid alias archive alias: can't be empty.") - if "." in alias: + if mount_point and "." in mount_point: raise Exception( - f"Can't register alias archive with as '{alias}': registered name is not allowed to contain a '.' character (yet)." + f"Can't register alias archive with mountpoint '{alias}': mountpoint is not allowed to contain a '.' character (yet, anyway)." ) if alias in self._alias_archives.keys(): raise Exception(f"Can't add store, alias '{alias}' already registered.") - archive.register_archive(kiara=self._kiara) + if mount_point: + if mount_point in self.aliases: + raise Exception( + f"Can't mount alias archive: mountpoint '{mount_point}' already in use as alias." + ) + if mount_point in self._mountpoints.keys(): + raise Exception(f"Mountpoint '{mount_point}' already registered.") + self._mountpoints[mount_point] = alias + archive.register_archive(kiara=self._kiara) self._alias_archives[alias] = archive + is_store = False is_default_store = False if isinstance(archive, AliasStore): @@ -133,9 +145,12 @@ def register_archive( alias_archive_alias=alias, is_store=is_store, is_default_store=is_default_store, + mount_point=mount_point, ) self._event_callback(event) + return alias + @property def default_alias_store(self) -> str: @@ -216,7 +231,10 @@ def find_value_id_for_alias(self, alias: str) -> Union[uuid.UUID, None]: if "." not in alias: return None - archive_id, rest = alias.split(".", maxsplit=2) + mountpoint, rest = alias.split(".", maxsplit=2) + if mountpoint not in self._mountpoints.keys(): + return None + archive_id = self._mountpoints[mountpoint] archive = self.get_archive(archive_id=archive_id) if archive is None: @@ -282,6 +300,11 @@ def register_aliases( msg=f"Invalid alias name: {alias}.", details=f"The following names can't be used as alias: {', '.join(INVALID_ALIAS_NAMES)}.", ) + if alias in self._mountpoints.keys(): + raise KiaraException( + msg=f"Invalid alias name: {alias}.", + details="Alias is used as mountpoint in this context.", + ) value_id = self._get_value_id(value_id=value_id) store_name = self.default_alias_store diff --git a/src/kiara/registries/aliases/sqlite_store.py b/src/kiara/registries/aliases/sqlite_store.py index 25d594a23..6e6ad9805 100644 --- a/src/kiara/registries/aliases/sqlite_store.py +++ b/src/kiara/registries/aliases/sqlite_store.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- import uuid from pathlib import Path -from typing import Mapping, Set, Union +from typing import Any, Mapping, Set, Union from sqlalchemy import Engine, create_engine, text @@ -15,6 +15,36 @@ class SqliteAliasArchive(AliasArchive): _archive_type_name = "sqlite_alias_archive" _config_cls = SqliteArchiveConfig + @classmethod + def _load_store_config( + cls, store_uri: str, allow_write_access: bool, **kwargs + ) -> Union[Mapping[str, Any], None]: + + if allow_write_access: + return None + + if not Path(store_uri).is_file(): + return None + + import sqlite3 + + con = sqlite3.connect(store_uri) + + cursor = con.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") + tables = {x[0] for x in cursor.fetchall()} + con.close() + + required_tables = { + "aliases", + } + + if not required_tables.issubset(tables): + return None + + # config = SqliteArchiveConfig(sqlite_db_path=store_uri) + return {"sqlite_db_path": store_uri} + def __init__(self, archive_alias: str, archive_config: SqliteArchiveConfig): AliasArchive.__init__( @@ -108,6 +138,36 @@ class SqliteAliasStore(SqliteAliasArchive, AliasStore): _archive_type_name = "sqlite_alias_store" + @classmethod + def _load_store_config( + cls, store_uri: str, allow_write_access: bool, **kwargs + ) -> Union[Mapping[str, Any], None]: + + if not allow_write_access: + return None + + if not Path(store_uri).is_file(): + return None + + import sqlite3 + + con = sqlite3.connect(store_uri) + + cursor = con.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") + tables = {x[0] for x in cursor.fetchall()} + con.close() + + required_tables = { + "aliases", + } + + if not required_tables.issubset(tables): + return None + + # config = SqliteArchiveConfig(sqlite_db_path=store_uri) + return {"sqlite_db_path": store_uri} + def register_aliases(self, value_id: uuid.UUID, *aliases: str): sql = text( diff --git a/src/kiara/registries/data/__init__.py b/src/kiara/registries/data/__init__.py index e5620302e..a1e5512da 100644 --- a/src/kiara/registries/data/__init__.py +++ b/src/kiara/registries/data/__init__.py @@ -72,6 +72,7 @@ from kiara.utils import log_exception, log_message from kiara.utils.data import pretty_print_data from kiara.utils.hashing import NONE_CID +from kiara.utils.stores import check_external_archive if TYPE_CHECKING: from kiara.context import Kiara @@ -136,13 +137,16 @@ def resolve_alias(self, alias: str) -> uuid.UUID: if "#" in rest: raise NotImplementedError() - archives = load_existing_archives(store=rest) + archives = check_external_archive( + archive=rest, allow_write_access=False + ) if archives: for archive in archives: - archive_ref = self._kiara.data_registry.register_data_archive( - archive - ) - print(archive_ref) + print(archive) + # archive_ref = self._kiara.data_registry.register_data_archive( + # archive + # ) + # print(archive_ref) raise NotImplementedError("x") else: diff --git a/src/kiara/registries/data/data_store/sqlite_store.py b/src/kiara/registries/data/data_store/sqlite_store.py index f8ec06920..d17c38aed 100644 --- a/src/kiara/registries/data/data_store/sqlite_store.py +++ b/src/kiara/registries/data/data_store/sqlite_store.py @@ -26,7 +26,7 @@ class SqliteDataArchive(DataArchive[SqliteArchiveConfig]): @classmethod def _load_store_config( cls, store_uri: str, allow_write_access: bool, **kwargs - ) -> Union[SqliteArchiveConfig, None]: + ) -> Union[Mapping[str, Any], None]: if allow_write_access: return None @@ -43,7 +43,7 @@ def _load_store_config( tables = {x[0] for x in cursor.fetchall()} con.close() - if tables != { + required_tables = { "values_pedigree", "values_destinies", "archive_metadata", @@ -51,11 +51,13 @@ def _load_store_config( "values_metadata", "values_data", "environments", - }: + } + + if not required_tables.issubset(tables): return None - config = SqliteArchiveConfig(sqlite_db_path=store_uri) - return config + # config = SqliteArchiveConfig(sqlite_db_path=store_uri) + return {"sqlite_db_path": store_uri} def __init__( self, diff --git a/src/kiara/registries/jobs/__init__.py b/src/kiara/registries/jobs/__init__.py index 43213f370..bbe52f2f7 100644 --- a/src/kiara/registries/jobs/__init__.py +++ b/src/kiara/registries/jobs/__init__.py @@ -242,7 +242,7 @@ def suppoerted_event_types(self) -> Iterable[Type[KiaraEvent]]: return [JobArchiveAddedEvent, JobRecordPreStoreEvent, JobRecordStoredEvent] - def register_job_archive(self, archive: JobArchive): + def register_job_archive(self, archive: JobArchive) -> str: alias = archive.archive_alias @@ -273,6 +273,8 @@ def register_job_archive(self, archive: JobArchive): ) self._event_callback(event) + return alias + @property def default_job_store(self) -> str: diff --git a/src/kiara/registries/jobs/job_store/sqlite_store.py b/src/kiara/registries/jobs/job_store/sqlite_store.py index 1269a2152..867cbf6eb 100644 --- a/src/kiara/registries/jobs/job_store/sqlite_store.py +++ b/src/kiara/registries/jobs/job_store/sqlite_store.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- import uuid from pathlib import Path -from typing import Iterable, Union +from typing import Any, Iterable, Mapping, Union from orjson import orjson from sqlalchemy import Engine, create_engine, text @@ -17,6 +17,36 @@ class SqliteJobArchive(JobArchive): _archive_type_name = "sqlite_job_archive" _config_cls = SqliteArchiveConfig + @classmethod + def _load_store_config( + cls, store_uri: str, allow_write_access: bool, **kwargs + ) -> Union[Mapping[str, Any], None]: + + if allow_write_access: + return None + + if not Path(store_uri).is_file(): + return None + + import sqlite3 + + con = sqlite3.connect(store_uri) + + cursor = con.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") + tables = {x[0] for x in cursor.fetchall()} + con.close() + + required_tables = { + "job_records", + } + + if not required_tables.issubset(tables): + return None + + # config = SqliteArchiveConfig(sqlite_db_path=store_uri) + return {"sqlite_db_path": store_uri} + def __init__(self, archive_alias: str, archive_config: SqliteArchiveConfig): JobArchive.__init__( @@ -135,6 +165,36 @@ class SqliteJobStore(SqliteJobArchive, JobStore): _archive_type_name = "sqlite_job_store" + @classmethod + def _load_store_config( + cls, store_uri: str, allow_write_access: bool, **kwargs + ) -> Union[Mapping[str, Any], None]: + + if not allow_write_access: + return None + + if not Path(store_uri).is_file(): + return None + + import sqlite3 + + con = sqlite3.connect(store_uri) + + cursor = con.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") + tables = {x[0] for x in cursor.fetchall()} + con.close() + + required_tables = { + "job_records", + } + + if not required_tables.issubset(tables): + return None + + # config = SqliteArchiveConfig(sqlite_db_path=store_uri) + return {"sqlite_db_path": store_uri} + def store_job_record(self, job_record: JobRecord): manifest_hash = str(job_record.manifest_cid) diff --git a/src/kiara/registries/workflows/sqlite_store.py b/src/kiara/registries/workflows/sqlite_store.py new file mode 100644 index 000000000..c881d2e77 --- /dev/null +++ b/src/kiara/registries/workflows/sqlite_store.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +import uuid +from pathlib import Path +from typing import Iterable, Mapping, Union + +from sqlalchemy import Engine, create_engine, text + +from kiara.models.workflow import WorkflowMetadata, WorkflowState +from kiara.registries import SqliteArchiveConfig +from kiara.registries.workflows import WorkflowArchive, WorkflowStore +from kiara.utils.windows import fix_windows_longpath + + +class SqliteWorkflowArchive(WorkflowArchive): + + _archive_type_name = "sqlite_workflow_archive" + _config_cls = SqliteArchiveConfig + + def __init__(self, archive_alias: str, archive_config: SqliteArchiveConfig): + + WorkflowArchive.__init__( + self, archive_alias=archive_alias, archive_config=archive_config + ) + self._db_path: Union[Path, None] = None + self._cached_engine: Union[Engine, None] = None + # self._lock: bool = True + + def _retrieve_archive_id(self) -> uuid.UUID: + sql = text("SELECT value FROM archive_metadata WHERE key='archive_id'") + + with self.sqlite_engine.connect() as connection: + result = connection.execute(sql) + row = result.fetchone() + if row is None: + raise Exception("No archive ID found in metadata") + return uuid.UUID(row[0]) + + @property + def sqlite_path(self): + + if self._db_path is not None: + return self._db_path + + db_path = Path(self.config.sqlite_db_path).resolve() + self._db_path = fix_windows_longpath(db_path) + + if self._db_path.exists(): + return self._db_path + + self._db_path.parent.mkdir(parents=True, exist_ok=True) + return self._db_path + + @property + def db_url(self) -> str: + return f"sqlite:///{self.sqlite_path}" + + @property + def sqlite_engine(self) -> "Engine": + + if self._cached_engine is not None: + return self._cached_engine + + # def _pragma_on_connect(dbapi_con, con_record): + # dbapi_con.execute("PRAGMA query_only = ON") + + self._cached_engine = create_engine(self.db_url, future=True) + create_table_sql = """ +CREATE TABLE IF NOT EXISTS job_records ( + job_hash TEXT PRIMARY KEY, + manifest_hash TEXT NOT NULL, + inputs_hash TEXT NOT NULL, + job_metadata TEXT NOT NULL +); +""" + + with self._cached_engine.begin() as connection: + for statement in create_table_sql.split(";"): + if statement.strip(): + connection.execute(text(statement)) + + # if self._lock: + # event.listen(self._cached_engine, "connect", _pragma_on_connect) + return self._cached_engine + + def retrieve_all_workflow_aliases(self) -> Mapping[str, uuid.UUID]: + + raise NotImplementedError() + + def retrieve_all_workflow_ids(self) -> Iterable[uuid.UUID]: + + raise NotImplementedError() + + def retrieve_workflow_metadata(self, workflow_id: uuid.UUID) -> WorkflowMetadata: + + raise NotImplementedError() + + def retrieve_workflow_state(self, workflow_state_id: str) -> WorkflowState: + + raise NotImplementedError() + + def retrieve_all_states_for_workflow( + self, workflow_id: uuid.UUID + ) -> Mapping[str, WorkflowState]: + + raise NotImplementedError() + + +class SqliteWorkflowStore(SqliteWorkflowArchive, WorkflowStore): + + _archive_type_name = "sqlite_workflow_store" + + def _register_workflow_metadata(self, workflow_metadata: WorkflowMetadata) -> None: + + raise NotImplementedError() + + def _update_workflow_metadata(self, workflow_metadata: WorkflowMetadata): + + raise NotImplementedError() + + def add_workflow_state(self, workflow_state: WorkflowState): + + raise NotImplementedError() + + def register_alias(self, workflow_id: uuid.UUID, alias: str): + + raise NotImplementedError() diff --git a/src/kiara/utils/stores.py b/src/kiara/utils/stores.py index e8471f73f..cfa8a5ec1 100644 --- a/src/kiara/utils/stores.py +++ b/src/kiara/utils/stores.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -from typing import TYPE_CHECKING, Any, Type +from typing import TYPE_CHECKING, Any, List, Type, Union if TYPE_CHECKING: from kiara.registries import KiaraArchive @@ -31,3 +31,33 @@ def create_new_store( archive_instance = archive_cls(archive_alias=archive_alias, archive_config=config, force_read_only=force_read_only) # type: ignore return archive_instance + + +def check_external_archive( + archive: Union[str, "KiaraArchive"], allow_write_access: bool = False +) -> List["KiaraArchive"]: + + from kiara.context import KiaraArchiveReference + from kiara.registries import KiaraArchive + + if isinstance(archive, (KiaraArchive, str)): + _archives = [archive] + else: + _archives = archive + + archive_instances: List[KiaraArchive] = [] + for _archive in _archives: + + if isinstance(_archive, KiaraArchive): + archive_instances.append(_archive) + # TODO: handle write access + continue + + loaded = KiaraArchiveReference.load_existing_archive( + archive_uri=_archive, allow_write_access=allow_write_access + ) + + for _archive_inst in loaded.archives: + archive_instances.append(_archive_inst) + + return archive_instances