diff --git a/pyproject.toml b/pyproject.toml index 767c150b3..3835d298e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -106,8 +106,6 @@ filesystem_job_archive = "kiara.registries.jobs.job_store.filesystem_store:FileS filesystem_job_store = "kiara.registries.jobs.job_store.filesystem_store:FileSystemJobStore" filesystem_alias_archive = "kiara.registries.aliases.archives:FileSystemAliasArchive" filesystem_alias_store = "kiara.registries.aliases.archives:FileSystemAliasStore" -filesystem_destiny_archive = "kiara.registries.destinies.filesystem_store:FileSystemDestinyArchive" -filesystem_destiny_store = "kiara.registries.destinies.filesystem_store:FileSystemDestinyStore" filesystem_workflow_archive = "kiara.registries.workflows.archives:FileSystemWorkflowArchive" filesystem_workflow_store = "kiara.registries.workflows.archives:FileSystemWorkflowStore" sqlite_data_archive = "kiara.registries.data.data_store.sqlite_store:SqliteDataArchive" diff --git a/src/kiara/context/__init__.py b/src/kiara/context/__init__.py index 878c8c482..3647a6b57 100644 --- a/src/kiara/context/__init__.py +++ b/src/kiara/context/__init__.py @@ -160,7 +160,12 @@ def __init__( for archive_alias, archive in self._config.archives.items(): + # this is just to make old context that still had that not error out + if "_destiny_" in archive.archive_type: + continue + archive_cls = self._archive_types.get(archive.archive_type, None) + if archive_cls is None: raise Exception( f"Can't create context: no archive type '{archive.archive_type}' available. Available types: {', '.join(self._archive_types.keys())}" diff --git a/src/kiara/context/config.py b/src/kiara/context/config.py index 6c59c68d5..f415e383c 100644 --- a/src/kiara/context/config.py +++ b/src/kiara/context/config.py @@ -26,7 +26,6 @@ KIARA_CONFIG_FILE_NAME, KIARA_MAIN_CONFIG_FILE, KIARA_MAIN_CONTEXTS_PATH, - METADATA_DESTINY_STORE_MARKER, kiara_app_dirs, ) from kiara.exceptions import KiaraException @@ -528,16 +527,16 @@ def _validate_context(self, context_config: KiaraContextConfig) -> bool: context_config.archives[DEFAULT_WORKFLOW_STORE_MARKER] = workflow_store changed = True - if METADATA_DESTINY_STORE_MARKER not in context_config.archives.keys(): - - destiny_store_type = "filesystem_destiny_store" - - destiny_store = create_default_store_config( - store_type=destiny_store_type, - stores_base_path=os.path.join(filesystem_base_path, "destinies"), - ) - context_config.archives[METADATA_DESTINY_STORE_MARKER] = destiny_store - changed = True + # if METADATA_DESTINY_STORE_MARKER not in context_config.archives.keys(): + # + # destiny_store_type = "filesystem_destiny_store" + # + # destiny_store = create_default_store_config( + # store_type=destiny_store_type, + # stores_base_path=os.path.join(filesystem_base_path, "destinies"), + # ) + # context_config.archives[METADATA_DESTINY_STORE_MARKER] = destiny_store + # changed = True return changed diff --git a/src/kiara/defaults.py b/src/kiara/defaults.py index f9c2a03eb..e726bc9fa 100644 --- a/src/kiara/defaults.py +++ b/src/kiara/defaults.py @@ -95,7 +95,7 @@ DEFAULT_WORKFLOW_STORE_MARKER = "default_workflow_store" """Name for the default context workflow store.""" -METADATA_DESTINY_STORE_MARKER = "metadata" +METADATA_PROPERTY_MARKER = "metadata" """Name for the default context destiny store.""" PIPELINE_PARENT_MARKER = "__pipeline__" diff --git a/src/kiara/registries/destinies/__init__.py b/src/kiara/registries/destinies/__init__.py deleted file mode 100644 index bebec7a5b..000000000 --- a/src/kiara/registries/destinies/__init__.py +++ /dev/null @@ -1,50 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (c) 2021, Markus Binsteiner -# -# Mozilla Public License, version 2.0 (see LICENSE or https://www.mozilla.org/en-US/MPL/2.0/) - -import abc -import uuid -from typing import Iterable, Set, Union - -from kiara.models.module.destiny import Destiny -from kiara.registries import ARCHIVE_CONFIG_CLS, BaseArchive - - -class DestinyArchive(BaseArchive): - @classmethod - def supported_item_types(cls) -> Iterable[str]: - return ["destiny"] - - def __init__(self, archive_alias: str, archive_config: ARCHIVE_CONFIG_CLS): - - super().__init__(archive_alias=archive_alias, archive_config=archive_config) - - @abc.abstractmethod - def get_all_value_ids(self) -> Set[uuid.UUID]: - """Retrun a list of all value ids that have destinies stored in this archive.""" - - @abc.abstractmethod - def get_destiny_aliases_for_value( - self, value_id: uuid.UUID - ) -> Union[Set[str], None]: - """ - Retrieve all the destinies for the specified value within this archive. - - In case this archive discovers its value destinies dynamically, this can return 'None'. - """ - - @abc.abstractmethod - def get_destiny(self, value_id: uuid.UUID, destiny: str) -> Destiny: - pass - - -class DestinyStore(DestinyArchive): - @abc.abstractmethod - def persist_destiny(self, destiny: Destiny): - pass - - @classmethod - def _is_writeable(cls) -> bool: - return True diff --git a/src/kiara/registries/destinies/filesystem_store.py b/src/kiara/registries/destinies/filesystem_store.py deleted file mode 100644 index d7fbbc0cf..000000000 --- a/src/kiara/registries/destinies/filesystem_store.py +++ /dev/null @@ -1,187 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (c) 2021, University of Luxembourg / DHARPA project -# Copyright (c) 2021, Markus Binsteiner -# -# Mozilla Public License, version 2.0 (see LICENSE or https://www.mozilla.org/en-US/MPL/2.0/) - -import os -import uuid -from pathlib import Path -from typing import Set, Tuple, Union - -import orjson -import structlog - -from kiara.models.module.destiny import Destiny -from kiara.registries import ArchiveDetails, FileSystemArchiveConfig -from kiara.registries.destinies import DestinyArchive, DestinyStore -from kiara.utils.windows import fix_windows_longpath, fix_windows_symlink - -logger = structlog.getLogger() - - -class FileSystemDestinyArchive(DestinyArchive): - - _archive_type_name = "filesystem_destiny_archive" - _config_cls = FileSystemArchiveConfig # type: ignore - - # @classmethod - # def create_from_kiara_context(cls, kiara: "Kiara"): - # - # TODO = kiara_app_dirs.user_data_dir - # base_path = Path(TODO) / "destiny_store" - # base_path.mkdir(parents=True, exist_ok=True) - # result = cls(base_path=base_path, store_id=kiara.id) - # ID_REGISTRY.update_metadata( - # result.get_destiny_archive_id(), kiara_id=kiara.id, obj=result - # ) - # return result - - def __init__(self, archive_alias: str, archive_config: FileSystemArchiveConfig): - - super().__init__(archive_alias=archive_alias, archive_config=archive_config) - self._base_path: Union[Path, None] = None - - # base_path = config.archive_path - # if not base_path.is_dir(): - # raise Exception( - # f"Can't create file system archive instance, base path does not exist or is not a folder: {base_path.as_posix()}." - # ) - - # self._store_id: uuid.UUID = store_id - # self._base_path: Path = base_path - # self._destinies_path: Path = self._base_path / "destinies" - # self._value_id_path: Path = self._base_path / "value_ids" - - def _retrieve_archive_id(self) -> uuid.UUID: - - return uuid.UUID(self.destiny_store_path.name) - - @property - def destiny_store_path(self) -> Path: - - if self._base_path is not None: - return self._base_path - - self._base_path = Path(self.config.archive_path).absolute() # type: ignore - self._base_path = fix_windows_longpath(self._base_path) - self._base_path.mkdir(parents=True, exist_ok=True) - return self._base_path - - def get_archive_details(self) -> ArchiveDetails: - - size = sum( - f.stat().st_size - for f in self.destiny_store_path.glob("**/*") - if f.is_file() - ) - return ArchiveDetails(size=size) - - @property - def destinies_path(self) -> Path: - return self.destiny_store_path / "destinies" - - @property - def value_id_path(self) -> Path: - return self.destiny_store_path / "value_ids" - - def _translate_destiny_id_to_path(self, destiny_id: uuid.UUID) -> Path: - - tokens = str(destiny_id).split("-") - destiny_path = ( - self.destinies_path.joinpath(*tokens[0:-1]) / f"{tokens[-1]}.json" - ) - return destiny_path - - def _translate_destinies_path_to_id(self, destinies_path: Path) -> uuid.UUID: - - relative = destinies_path.relative_to(self.destinies_path).as_posix()[:-5] - - destninies_id = "-".join(relative.split(os.path.sep)) - - return uuid.UUID(destninies_id) - - def _translate_value_id(self, value_id: uuid.UUID, destiny_alias: str) -> Path: - - tokens = str(value_id).split("-") - value_id_path = self.value_id_path.joinpath(*tokens) - - full_path = value_id_path / f"{destiny_alias}.json" - return full_path - - def _translate_value_id_path(self, value_path: Path) -> uuid.UUID: - - relative = value_path.relative_to(self.value_id_path) - - value_id_str = "-".join(relative.as_posix().split(os.path.sep)) - return uuid.UUID(value_id_str) - - def _translate_alias_path(self, alias_path: Path) -> Tuple[uuid.UUID, str]: - - value_id = self._translate_value_id_path(alias_path.parent) - - alias = alias_path.name[0:-5] - - return value_id, alias - - def get_all_value_ids(self) -> Set[uuid.UUID]: - - all_root_folders = self.value_id_path.glob("*/*/*/*/*") - - result = set() - for folder in all_root_folders: - if not folder.is_dir(): - continue - - value_id = self._translate_value_id_path(folder) - result.add(value_id) - - return result - - def get_destiny_aliases_for_value(self, value_id: uuid.UUID) -> Set[str]: - - tokens = str(value_id).split("-") - value_id_path = self.value_id_path.joinpath(*tokens) - - aliases = value_id_path.glob("*.json") - - return {a.name[0:-5] for a in aliases} - - def get_destiny(self, value_id: uuid.UUID, destiny_alias: str) -> Destiny: - - tokens = str(value_id).split("-") - value_id_path = self.value_id_path.joinpath(*tokens) - - destiny_path = value_id_path / f"{destiny_alias}.json" - - destiny_data = orjson.loads(destiny_path.read_text()) - - destiny = Destiny(**destiny_data) - return destiny - - -class FileSystemDestinyStore(FileSystemDestinyArchive, DestinyStore): - - _archive_type_name = "filesystem_destiny_store" - - def persist_destiny(self, destiny: Destiny): - - destiny_path = self._translate_destiny_id_to_path(destiny_id=destiny.destiny_id) - destiny_path.parent.mkdir(parents=True, exist_ok=True) - destiny_path.write_text(destiny.model_dump_json()) - - for value_id in destiny.fixed_inputs.values(): - - path = self._translate_value_id( - value_id=value_id, destiny_alias=destiny.destiny_alias - ) - if path.exists(): - logger.debug("replace.destiny.file", path=path.as_posix()) - path.unlink() - # raise Exception( - # f"Can't persist destiny '{destiny.destiny_id}': already persisted." - # ) - - path.parent.mkdir(parents=True, exist_ok=True) - fix_windows_symlink(destiny_path, path) diff --git a/src/kiara/registries/destinies/registry.py b/src/kiara/registries/destinies/registry.py deleted file mode 100644 index 854ee9f96..000000000 --- a/src/kiara/registries/destinies/registry.py +++ /dev/null @@ -1,310 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (c) 2021, University of Luxembourg / DHARPA project -# Copyright (c) 2021, Markus Binsteiner -# -# Mozilla Public License, version 2.0 (see LICENSE or https://www.mozilla.org/en-US/MPL/2.0/) - -import uuid -from typing import ( - TYPE_CHECKING, - Callable, - Dict, - Iterable, - List, - Mapping, - Set, - Tuple, - Union, -) - -from kiara.models.events.destiny_registry import DestinyArchiveAddedEvent -from kiara.models.module.destiny import Destiny -from kiara.models.module.manifest import Manifest -from kiara.models.values.value import Value -from kiara.registries.destinies import DestinyArchive, DestinyStore - -if TYPE_CHECKING: - from kiara.context import Kiara - - -class DestinyRegistry(object): - def __init__(self, kiara: "Kiara"): - - self._kiara: Kiara = kiara - - self._event_callback: Callable = self._kiara.event_registry.add_producer(self) - - self._destiny_archives: Dict[str, DestinyArchive] = {} - self._default_destiny_store: Union[str, None] = None - # default_metadata_archive = FileSystemDestinyStore.create_from_kiara_context( - # self._kiara - # ) - # self.register_destiny_archive("metadata", default_metadata_archive) - - self._all_values: Union[Dict[uuid.UUID, Set[str]], None] = None - self._cached_value_aliases: Dict[ - uuid.UUID, Dict[str, Union[Destiny, None]] - ] = {} - - self._destinies: Dict[uuid.UUID, Destiny] = {} - self._destinies_by_value: Dict[uuid.UUID, Dict[str, Destiny]] = {} - self._destiny_store_map: Dict[uuid.UUID, str] = {} - - @property - def default_destiny_store(self) -> DestinyStore: - - if self._default_destiny_store is None: - raise Exception("No default destiny store set (yet).") - - return self._destiny_archives[self._default_destiny_store] # type: ignore - - @property - def destiny_archives(self) -> Mapping[str, DestinyArchive]: - return self._destiny_archives - - def register_destiny_archive( - self, - archive: DestinyArchive, - set_as_default_store: Union[bool, None] = None, - ): - - alias = archive.archive_alias - archive.register_archive(kiara=self._kiara) - - if not alias: - raise Exception("Invalid destiny archive alias: can't be empty.") - - if alias in self._destiny_archives.keys(): - raise Exception( - f"Can't add destiny archive, alias '{alias}' already registered." - ) - - self._destiny_archives[alias] = archive - - is_store = False - is_default_store = False - - if isinstance(archive, DestinyStore): - is_store = True - - if set_as_default_store and self._default_destiny_store is not None: - raise Exception( - f"Can't set data store '{alias}' as default store: default store already set." - ) - - if self._default_destiny_store is None or set_as_default_store: - is_default_store = True - self._default_destiny_store = alias - - event = DestinyArchiveAddedEvent( - kiara_id=self._kiara.id, - destiny_archive_id=archive.archive_id, - destiny_archive_alias=alias, - is_store=is_store, - is_default_store=is_default_store, - ) - self._event_callback(event) - - # if not registered_name.isalnum(): - # raise Exception( - # f"Can't register destiny archive with name '{registered_name}: name must only contain alphanumeric characters.'" - # ) - # - # if registered_name in self._destiny_archives.keys(): - # raise Exception( - # f"Can't register alias store, store id already registered: {registered_name}." - # ) - # - # self._destiny_archives[registered_name] = alias_store - # - # if self._default_destiny_store is None and isinstance( - # alias_store, DestinyStore - # ): - # self._default_destiny_store = registered_name - - def _extract_archive(self, alias: str) -> Tuple[str, str]: - - print(alias) - if "." not in alias: - assert self._default_destiny_store is not None - return (self._default_destiny_store, alias) - - store_id, rest = alias.split(".", maxsplit=1) - - if store_id not in self._destiny_archives.keys(): - assert self._default_destiny_store is not None - return (self._default_destiny_store, alias) - else: - return (store_id, rest) - - def add_destiny( - self, - destiny_alias: str, - values: Dict[str, uuid.UUID], - manifest: Manifest, - result_field_name: Union[str, None] = None, - ) -> Destiny: - """ - Add a destiny for one (or in some rare cases several) values. - - A destiny alias must be unique for every one of the involved input values. - """ - if not values: - raise Exception("Can't add destiny, no values provided.") - - store_id, alias = self._extract_archive(destiny_alias) - - destiny = Destiny.create_from_values( - kiara=self._kiara, - destiny_alias=alias, - manifest=manifest, - result_field_name=result_field_name, - values=values, - ) - - for value_id in destiny.fixed_inputs.values(): - self._destinies[destiny.destiny_id] = destiny - # TODO: store history? - self._destinies_by_value.setdefault(value_id, {})[destiny_alias] = destiny - self._cached_value_aliases.setdefault(value_id, {})[destiny_alias] = destiny - - self._destiny_store_map[destiny.destiny_id] = store_id - - return destiny - - def get_destiny(self, value_id: uuid.UUID, destiny_alias: str) -> Destiny: - - destiny = self._destinies_by_value.get(value_id, {}).get(destiny_alias, None) - if destiny is None: - raise Exception( - f"No destiny '{destiny_alias}' available for value '{value_id}'." - ) - - return destiny - - @property - def _all_values_store_map(self) -> Dict[uuid.UUID, Set[str]]: - - if self._all_values is not None: - return self._all_values - - all_values: Dict[uuid.UUID, Set[str]] = {} - for archive_id, archive in self._destiny_archives.items(): - - all_value_ids = archive.get_all_value_ids() - for v_id in all_value_ids: - all_values.setdefault(v_id, set()).add(archive_id) - - self._all_values = all_values - return self._all_values - - @property - def all_values(self) -> Iterable[uuid.UUID]: - - all_stored_values = set(self._all_values_store_map.keys()) - all_stored_values.update(self._destinies_by_value.keys()) - return all_stored_values - - def get_destiny_aliases_for_value( - self, value_id: uuid.UUID, alias_filter: Union[str, None] = None - ) -> Iterable[str]: - - # TODO: cache the result of this - - if alias_filter is not None: - raise NotImplementedError() - - all_stores = self._all_values_store_map.get(value_id) - aliases: Set[str] = set() - if all_stores: - for prefix in all_stores: - all_aliases = self._destiny_archives[ - prefix - ].get_destiny_aliases_for_value(value_id=value_id) - if all_aliases is not None: - aliases.update((f"{prefix}.{a}" for a in all_aliases)) - - current = self._destinies_by_value.get(value_id, None) - if current: - aliases.update(current.keys()) - - return sorted(aliases) - - # def get_destinies_for_value( - # self, - # value_id: uuid.UUID, - # destiny_alias_filter: Optional[str] = None - # ) -> Mapping[str, Destiny]: - # - # - # - # return self._destinies_by_value.get(value_id, {}) - - def resolve_destiny(self, destiny: Destiny) -> Value: - - results = self._kiara.job_registry.execute_and_retrieve( - manifest=destiny, inputs=destiny.merged_inputs - ) - value = results.get_value_obj(field_name=destiny.result_field_name) - - destiny.result_value_id = value.value_id - - return value - - def attach_as_property( - self, - destiny: Union[uuid.UUID, Destiny], - field_names: Union[Iterable[str], None] = None, - ): - - if field_names: - raise NotImplementedError() - - if isinstance(destiny, uuid.UUID): - destiny = self._destinies[destiny] - - values = self._kiara.data_registry.load_values(destiny.fixed_inputs) - - already_stored: List[uuid.UUID] = [] - for v in values.values(): - if v.is_stored: - already_stored.append(v.value_id) - - if already_stored: - stored = (str(v) for v in already_stored) - raise Exception( - f"Can't attach destiny as property, value(s) already stored: {', '.join(stored)}" - ) - - store_id = self._destiny_store_map[destiny.destiny_id] - - full_path = f"{store_id}.{destiny.destiny_alias}" - - for v in values.values(): - assert destiny.result_value_id is not None - v.add_property( - value_id=destiny.result_value_id, - property_path=full_path, - add_origin_to_property_value=True, - ) - - # def store_destiny(self, destiny_id: Union[Destiny, uuid.UUID]): - # - # try: - # _destiny_id: uuid.UUID = destiny_id.destiny_id # type: ignore - # except Exception: - # # just in case this is a 'Destiny' object - # _destiny_id = destiny_id # type: ignore - # - # store_id = self._destiny_store_map[_destiny_id] - # destiny = self._destinies[_destiny_id] - # store: DestinyStore = self._destiny_archives[store_id] # type: ignore - # - # if not isinstance(store, DestinyStore): - # full_alias = f"{store_id}.{destiny.destiny_alias}" - # raise Exception( - # f"Can't store destiny '{full_alias}': prefix '{store_id}' not writable in this kiara context." - # ) - # - # store.persist_destiny(destiny=destiny) diff --git a/src/kiara/registries/destinies/sqlite_store.py b/src/kiara/registries/destinies/sqlite_store.py deleted file mode 100644 index 16d7b7381..000000000 --- a/src/kiara/registries/destinies/sqlite_store.py +++ /dev/null @@ -1,116 +0,0 @@ -# -*- coding: utf-8 -*- -import uuid -from pathlib import Path -from typing import Set, Union - -from sqlalchemy import Engine, create_engine, text - -from kiara.models.module.destiny import Destiny -from kiara.registries import SqliteArchiveConfig -from kiara.registries.destinies import DestinyArchive, DestinyStore -from kiara.utils.windows import fix_windows_longpath - - -class SqliteDestinyArchive(DestinyArchive): - - _archive_type_name = "sqlite_destiny_archive" - _config_cls = SqliteArchiveConfig - - def __init__(self, archive_alias: str, archive_config: SqliteArchiveConfig): - - DestinyArchive.__init__( - self, archive_alias=archive_alias, archive_config=archive_config - ) - self._db_path: Union[Path, None] = None - self._cached_engine: Union[Engine, None] = None - # self._lock: bool = True - - def _retrieve_archive_id(self) -> uuid.UUID: - sql = text("SELECT value FROM archive_metadata WHERE key='archive_id'") - - with self.sqlite_engine.connect() as connection: - result = connection.execute(sql) - row = result.fetchone() - if row is None: - raise Exception("No archive ID found in metadata") - return uuid.UUID(row[0]) - - @property - def sqlite_path(self): - - if self._db_path is not None: - return self._db_path - - db_path = Path(self.config.sqlite_db_path).resolve() - self._db_path = fix_windows_longpath(db_path) - - if self._db_path.exists(): - return self._db_path - - self._db_path.parent.mkdir(parents=True, exist_ok=True) - return self._db_path - - @property - def db_url(self) -> str: - return f"sqlite:///{self.sqlite_path}" - - @property - def sqlite_engine(self) -> "Engine": - - if self._cached_engine is not None: - return self._cached_engine - - # def _pragma_on_connect(dbapi_con, con_record): - # dbapi_con.execute("PRAGMA query_only = ON") - - self._cached_engine = create_engine(self.db_url, future=True) - create_table_sql = """ -CREATE TABLE IF NOT EXISTS destiny_details ( - job_hash TEXT PRIMARY KEY, - manifest_hash TEXT NOT NULL, - inputs_hash TEXT NOT NULL, - job_metadata TEXT NOT NULL -); -""" - - with self._cached_engine.begin() as connection: - for statement in create_table_sql.split(";"): - if statement.strip(): - connection.execute(text(statement)) - - # if self._lock: - # event.listen(self._cached_engine, "connect", _pragma_on_connect) - return self._cached_engine - - def get_all_value_ids(self) -> Set[uuid.UUID]: - pass - - def get_destiny_aliases_for_value( - self, value_id: uuid.UUID - ) -> Union[Set[str], None]: - pass - - def get_destiny(self, value_id: uuid.UUID, destiny: str) -> Destiny: - pass - - -class SqliteJobStore(SqliteDestinyArchive, DestinyStore): - - _archive_type_name = "sqlite_destiny_store" - - def persist_destiny(self, destiny: Destiny): - - for value_id in destiny.fixed_inputs.values(): - - path = self._translate_value_id( - value_id=value_id, destiny_alias=destiny.destiny_alias - ) - if path.exists(): - logger.debug("replace.destiny.file", path=path.as_posix()) - path.unlink() - # raise Exception( - # f"Can't persist destiny '{destiny.destiny_id}': already persisted." - # ) - - path.parent.mkdir(parents=True, exist_ok=True) - fix_windows_symlink(destiny_path, path)