diff --git a/src/kiara/context/__init__.py b/src/kiara/context/__init__.py index bf9328d8e..878c8c482 100644 --- a/src/kiara/context/__init__.py +++ b/src/kiara/context/__init__.py @@ -33,7 +33,6 @@ from kiara.registries import KiaraArchive from kiara.registries.aliases import AliasRegistry from kiara.registries.data import DataRegistry -from kiara.registries.destinies.registry import DestinyRegistry from kiara.registries.environment import EnvironmentRegistry from kiara.registries.events.metadata import CreateMetadataDestinies from kiara.registries.events.registry import EventRegistry @@ -140,7 +139,7 @@ def __init__( self._kiara_model_registry: ModelRegistry = ModelRegistry.instance() self._alias_registry: AliasRegistry = AliasRegistry(kiara=self) - self._destiny_registry: DestinyRegistry = DestinyRegistry(kiara=self) + # self._destiny_registry: DestinyRegistry = DestinyRegistry(kiara=self) self._workflow_registry: WorkflowRegistry = WorkflowRegistry(kiara=self) @@ -181,8 +180,8 @@ def __init__( if supported_type == "alias": self.alias_registry.register_archive(archive_obj) # type: ignore - if supported_type == "destiny": - self.destiny_registry.register_destiny_archive(archive_obj) # type: ignore + # if supported_type == "destiny": + # self.destiny_registry.register_destiny_archive(archive_obj) # type: ignore if supported_type == "workflow": self.workflow_registry.register_archive(archive_obj) # type: ignore @@ -259,9 +258,9 @@ def kiara_model_registry(self) -> ModelRegistry: def alias_registry(self) -> AliasRegistry: return self._alias_registry - @property - def destiny_registry(self) -> DestinyRegistry: - return self._destiny_registry + # @property + # def destiny_registry(self) -> DestinyRegistry: + # return self._destiny_registry @property def job_registry(self) -> JobRegistry: @@ -468,8 +467,8 @@ def get_all_archives(self) -> Dict[KiaraArchive, Set[str]]: result.setdefault(archive, set()).add(alias) for alias, archive in self.alias_registry.alias_archives.items(): result.setdefault(archive, set()).add(alias) - for alias, archive in self.destiny_registry.destiny_archives.items(): - result.setdefault(archive, set()).add(alias) + # for alias, archive in self.destiny_registry.destiny_archives.items(): + # result.setdefault(archive, set()).add(alias) for alias, archive in self.job_registry.job_archives.items(): result.setdefault(archive, set()).add(alias) for alias, archive in self.workflow_registry.workflow_archives.items(): diff --git a/src/kiara/interfaces/python_api/models/info.py b/src/kiara/interfaces/python_api/models/info.py index 8d633c487..abc06de80 100644 --- a/src/kiara/interfaces/python_api/models/info.py +++ b/src/kiara/interfaces/python_api/models/info.py @@ -465,8 +465,10 @@ def create_from_instance( is_internal = False if resolve_destinies: - destiny_links = kiara.data_registry.find_destinies_for_value( - value_id=instance.value_id + destiny_links = ( + kiara.data_registry.retrieve_destinies_for_value_from_archives( + value_id=instance.value_id + ) ) filtered_destinies = {} for alias, value_id in destiny_links.items(): @@ -602,8 +604,10 @@ def resolve_aliases(self): def resolve_destinies(self): if self.destiny_links is None: - destiny_links = self._value._data_registry.find_destinies_for_value( - value_id=self.value_id + destiny_links = ( + self._value._data_registry.retrieve_destinies_for_value_from_archives( + value_id=self.value_id + ) ) filtered_destinies = {} for alias, value_id in destiny_links.items(): diff --git a/src/kiara/models/module/destiny.py b/src/kiara/models/module/destiny.py index 4a6a89a94..23b4e555c 100644 --- a/src/kiara/models/module/destiny.py +++ b/src/kiara/models/module/destiny.py @@ -18,6 +18,7 @@ if TYPE_CHECKING: from kiara.context import Kiara + from kiara.models.values.value import Value from kiara.modules import KiaraModule @@ -164,6 +165,19 @@ def module(self) -> "KiaraModule": self._module = m_cls(module_config=self.module_config) return self._module + def execute(self, kiara: "Kiara") -> "Value": + + if self.result_value_id is not None: + raise Exception("Destiny already resolved.") + + results = kiara.job_registry.execute_and_retrieve( + manifest=self, inputs=self.merged_inputs + ) + value = results.get_value_obj(field_name=self.result_field_name) + + self.result_value_id = value.value_id + return value + # def _retrieve_job_config_hash(self) -> int: # obj = {"module_config": self.manifest_data, "inputs": self.merged_inputs} # return compute_cid(obj) diff --git a/src/kiara/registries/data/__init__.py b/src/kiara/registries/data/__init__.py index 7aaed3c53..e5620302e 100644 --- a/src/kiara/registries/data/__init__.py +++ b/src/kiara/registries/data/__init__.py @@ -13,6 +13,7 @@ Any, Callable, Dict, + Iterable, List, Mapping, Protocol, @@ -74,6 +75,8 @@ if TYPE_CHECKING: from kiara.context import Kiara + from kiara.models.module.destiny import Destiny + from kiara.models.module.manifest import Manifest logger = structlog.getLogger() @@ -233,6 +236,13 @@ def __init__(self, kiara: "Kiara"): self._registered_values[NONE_VALUE_ID] = self._none_value self._persisted_value_descs[NONE_VALUE_ID] = NONE_PERSISTED_DATA + self._cached_value_aliases: Dict[ + uuid.UUID, Dict[str, Union[Destiny, None]] + ] = {} + + self._destinies: Dict[uuid.UUID, Destiny] = {} + self._destinies_by_value: Dict[uuid.UUID, Dict[str, Destiny]] = {} + @property def kiara_id(self) -> uuid.UUID: return self._kiara.id @@ -582,7 +592,10 @@ def find_values_for_hash( return {self.get_value(value=v_id) for v_id in stored} - def find_destinies_for_value( + # ============================================================================================== + # destiny stuff + + def retrieve_destinies_for_value_from_archives( self, value_id: uuid.UUID, alias_filter: Union[str, None] = None ) -> Mapping[str, uuid.UUID]: @@ -605,6 +618,117 @@ def find_destinies_for_value( return all_destinies + def get_destiny_aliases_for_value( + self, value_id: uuid.UUID, alias_filter: Union[str, None] = None + ) -> Iterable[str]: + + # TODO: cache the result of this + + if alias_filter is not None: + raise NotImplementedError() + + aliases: Set[str] = set() + aliases.update( + self.retrieve_destinies_for_value_from_archives(value_id=value_id).keys() + ) + + # all_stores = self._all_values_store_map.get(value_id) + # if all_stores: + # for prefix in all_stores: + # all_aliases = self._destiny_archives[ + # prefix + # ].get_destiny_aliases_for_value(value_id=value_id) + # if all_aliases is not None: + # aliases.update((f"{prefix}.{a}" for a in all_aliases)) + + current = self._destinies_by_value.get(value_id, None) + if current: + aliases.update(current.keys()) + + return sorted(aliases) + + def register_destiny( + self, + destiny_alias: str, + values: Dict[str, uuid.UUID], + manifest: "Manifest", + result_field_name: Union[str, None] = None, + ) -> "Destiny": + """ + Add a destiny for one (or in some rare cases several) values. + + A destiny alias must be unique for every one of the involved input values. + """ + if not values: + raise Exception("Can't add destiny, no values provided.") + + from kiara.models.module.destiny import Destiny + + destiny = Destiny.create_from_values( + kiara=self._kiara, + destiny_alias=destiny_alias, + manifest=manifest, + result_field_name=result_field_name, + values=values, + ) + + for value_id in destiny.fixed_inputs.values(): + + self._destinies[destiny.destiny_id] = destiny + # TODO: store history? + self._destinies_by_value.setdefault(value_id, {})[destiny_alias] = destiny + self._cached_value_aliases.setdefault(value_id, {})[destiny_alias] = destiny + + return destiny + + def attach_destiny_as_property( + self, + destiny: Union[uuid.UUID, "Destiny"], + field_names: Union[Iterable[str], None] = None, + ): + + if field_names: + raise NotImplementedError() + + if isinstance(destiny, uuid.UUID): + destiny = self._destinies[destiny] + + values = self.load_values(destiny.fixed_inputs) + + already_stored: List[uuid.UUID] = [] + for v in values.values(): + if v.is_stored: + already_stored.append(v.value_id) + + if already_stored: + stored = (str(v) for v in already_stored) + raise Exception( + f"Can't attach destiny as property, value(s) already stored: {', '.join(stored)}" + ) + + if destiny.result_value_id is None: + destiny.execute(kiara=self._kiara) + + for v in values.values(): + assert destiny.result_value_id is not None + v.add_property( + value_id=destiny.result_value_id, + property_path=destiny.destiny_alias, + add_origin_to_property_value=True, + ) + + def get_registered_destiny( + self, value_id: uuid.UUID, destiny_alias: str + ) -> "Destiny": + + destiny = self._destinies_by_value.get(value_id, {}).get(destiny_alias, None) + if destiny is None: + raise Exception( + f"No destiny '{destiny_alias}' available for value '{value_id}'." + ) + + return destiny + def register_data( self, data: Any, diff --git a/src/kiara/registries/destinies/registry.py b/src/kiara/registries/destinies/registry.py index b14a8091c..854ee9f96 100644 --- a/src/kiara/registries/destinies/registry.py +++ b/src/kiara/registries/destinies/registry.py @@ -125,6 +125,7 @@ def register_destiny_archive( def _extract_archive(self, alias: str) -> Tuple[str, str]: + print(alias) if "." not in alias: assert self._default_destiny_store is not None return (self._default_destiny_store, alias) diff --git a/src/kiara/registries/events/metadata.py b/src/kiara/registries/events/metadata.py index 331d233fb..b80c1fb9f 100644 --- a/src/kiara/registries/events/metadata.py +++ b/src/kiara/registries/events/metadata.py @@ -58,7 +58,14 @@ def attach_metadata(self, value: Value): op_details: ExtractMetadataDetails = op.operation_details # type: ignore input_field_name = op_details.input_field_name result_field_name = op_details.result_field_name - self._kiara.destiny_registry.add_destiny( + + # self._kiara.destiny_registry.add_destiny( + # destiny_alias=f"metadata.{metadata_key}", + # values={input_field_name: value.value_id}, + # manifest=op, + # result_field_name=result_field_name, + # ) + self._kiara.data_registry.register_destiny( destiny_alias=f"metadata.{metadata_key}", values={input_field_name: value.value_id}, manifest=op, @@ -77,13 +84,18 @@ def resolve_all_metadata(self, value: Value): assert not value.is_stored - aliases = self._kiara.destiny_registry.get_destiny_aliases_for_value( + # aliases = self._kiara.destiny_registry.get_destiny_aliases_for_value( + # value_id=value.value_id + # ) + + aliases = self._kiara.data_registry.get_destiny_aliases_for_value( value_id=value.value_id ) for alias in aliases: - destiny = self._kiara.destiny_registry.get_destiny( + destiny = self._kiara.data_registry.get_registered_destiny( value_id=value.value_id, destiny_alias=alias ) - self._kiara.destiny_registry.resolve_destiny(destiny) - self._kiara.destiny_registry.attach_as_property(destiny) + # self._kiara.destiny_registry.resolve_destiny(destiny) + # self._kiara.destiny_registry.attach_as_property(destiny) + self._kiara.data_registry.attach_destiny_as_property(destiny)