Skip to content

Commit

Permalink
refactor: moved destiny stuff into data_registry
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Jan 23, 2024
1 parent f56a268 commit bbe5040
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 19 deletions.
17 changes: 8 additions & 9 deletions src/kiara/context/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from kiara.registries import KiaraArchive
from kiara.registries.aliases import AliasRegistry
from kiara.registries.data import DataRegistry
from kiara.registries.destinies.registry import DestinyRegistry
from kiara.registries.environment import EnvironmentRegistry
from kiara.registries.events.metadata import CreateMetadataDestinies
from kiara.registries.events.registry import EventRegistry
Expand Down Expand Up @@ -140,7 +139,7 @@ def __init__(
self._kiara_model_registry: ModelRegistry = ModelRegistry.instance()

self._alias_registry: AliasRegistry = AliasRegistry(kiara=self)
self._destiny_registry: DestinyRegistry = DestinyRegistry(kiara=self)
# self._destiny_registry: DestinyRegistry = DestinyRegistry(kiara=self)

self._workflow_registry: WorkflowRegistry = WorkflowRegistry(kiara=self)

Expand Down Expand Up @@ -181,8 +180,8 @@ def __init__(
if supported_type == "alias":
self.alias_registry.register_archive(archive_obj) # type: ignore

if supported_type == "destiny":
self.destiny_registry.register_destiny_archive(archive_obj) # type: ignore
# if supported_type == "destiny":
# self.destiny_registry.register_destiny_archive(archive_obj) # type: ignore

if supported_type == "workflow":
self.workflow_registry.register_archive(archive_obj) # type: ignore
Expand Down Expand Up @@ -259,9 +258,9 @@ def kiara_model_registry(self) -> ModelRegistry:
def alias_registry(self) -> AliasRegistry:
return self._alias_registry

@property
def destiny_registry(self) -> DestinyRegistry:
return self._destiny_registry
# @property
# def destiny_registry(self) -> DestinyRegistry:
# return self._destiny_registry

@property
def job_registry(self) -> JobRegistry:
Expand Down Expand Up @@ -468,8 +467,8 @@ def get_all_archives(self) -> Dict[KiaraArchive, Set[str]]:
result.setdefault(archive, set()).add(alias)
for alias, archive in self.alias_registry.alias_archives.items():
result.setdefault(archive, set()).add(alias)
for alias, archive in self.destiny_registry.destiny_archives.items():
result.setdefault(archive, set()).add(alias)
# for alias, archive in self.destiny_registry.destiny_archives.items():
# result.setdefault(archive, set()).add(alias)
for alias, archive in self.job_registry.job_archives.items():
result.setdefault(archive, set()).add(alias)
for alias, archive in self.workflow_registry.workflow_archives.items():
Expand Down
12 changes: 8 additions & 4 deletions src/kiara/interfaces/python_api/models/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,10 @@ def create_from_instance(
is_internal = False

if resolve_destinies:
destiny_links = kiara.data_registry.find_destinies_for_value(
value_id=instance.value_id
destiny_links = (
kiara.data_registry.retrieve_destinies_for_value_from_archives(
value_id=instance.value_id
)
)
filtered_destinies = {}
for alias, value_id in destiny_links.items():
Expand Down Expand Up @@ -602,8 +604,10 @@ def resolve_aliases(self):

def resolve_destinies(self):
if self.destiny_links is None:
destiny_links = self._value._data_registry.find_destinies_for_value(
value_id=self.value_id
destiny_links = (
self._value._data_registry.retrieve_destinies_for_value_from_archives(
value_id=self.value_id
)
)
filtered_destinies = {}
for alias, value_id in destiny_links.items():
Expand Down
14 changes: 14 additions & 0 deletions src/kiara/models/module/destiny.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

if TYPE_CHECKING:
from kiara.context import Kiara
from kiara.models.values.value import Value
from kiara.modules import KiaraModule


Expand Down Expand Up @@ -164,6 +165,19 @@ def module(self) -> "KiaraModule":
self._module = m_cls(module_config=self.module_config)
return self._module

def execute(self, kiara: "Kiara") -> "Value":

if self.result_value_id is not None:
raise Exception("Destiny already resolved.")

results = kiara.job_registry.execute_and_retrieve(
manifest=self, inputs=self.merged_inputs
)
value = results.get_value_obj(field_name=self.result_field_name)

self.result_value_id = value.value_id
return value

# def _retrieve_job_config_hash(self) -> int:
# obj = {"module_config": self.manifest_data, "inputs": self.merged_inputs}
# return compute_cid(obj)
126 changes: 125 additions & 1 deletion src/kiara/registries/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Any,
Callable,
Dict,
Iterable,
List,
Mapping,
Protocol,
Expand Down Expand Up @@ -74,6 +75,8 @@

if TYPE_CHECKING:
from kiara.context import Kiara
from kiara.models.module.destiny import Destiny
from kiara.models.module.manifest import Manifest

logger = structlog.getLogger()

Expand Down Expand Up @@ -233,6 +236,13 @@ def __init__(self, kiara: "Kiara"):
self._registered_values[NONE_VALUE_ID] = self._none_value
self._persisted_value_descs[NONE_VALUE_ID] = NONE_PERSISTED_DATA

self._cached_value_aliases: Dict[
uuid.UUID, Dict[str, Union[Destiny, None]]
] = {}

self._destinies: Dict[uuid.UUID, Destiny] = {}
self._destinies_by_value: Dict[uuid.UUID, Dict[str, Destiny]] = {}

@property
def kiara_id(self) -> uuid.UUID:
return self._kiara.id
Expand Down Expand Up @@ -582,7 +592,10 @@ def find_values_for_hash(

return {self.get_value(value=v_id) for v_id in stored}

def find_destinies_for_value(
# ==============================================================================================
# destiny stuff

def retrieve_destinies_for_value_from_archives(
self, value_id: uuid.UUID, alias_filter: Union[str, None] = None
) -> Mapping[str, uuid.UUID]:

Expand All @@ -605,6 +618,117 @@ def find_destinies_for_value(

return all_destinies

def get_destiny_aliases_for_value(
self, value_id: uuid.UUID, alias_filter: Union[str, None] = None
) -> Iterable[str]:

# TODO: cache the result of this

if alias_filter is not None:
raise NotImplementedError()

aliases: Set[str] = set()
aliases.update(
self.retrieve_destinies_for_value_from_archives(value_id=value_id).keys()
)

# all_stores = self._all_values_store_map.get(value_id)
# if all_stores:
# for prefix in all_stores:
# all_aliases = self._destiny_archives[
# prefix
# ].get_destiny_aliases_for_value(value_id=value_id)
# if all_aliases is not None:
# aliases.update((f"{prefix}.{a}" for a in all_aliases))

current = self._destinies_by_value.get(value_id, None)
if current:
aliases.update(current.keys())

return sorted(aliases)

def register_destiny(
self,
destiny_alias: str,
values: Dict[str, uuid.UUID],
manifest: "Manifest",
result_field_name: Union[str, None] = None,
) -> "Destiny":
"""
Add a destiny for one (or in some rare cases several) values.
A destiny alias must be unique for every one of the involved input values.
"""
if not values:
raise Exception("Can't add destiny, no values provided.")

from kiara.models.module.destiny import Destiny

destiny = Destiny.create_from_values(
kiara=self._kiara,
destiny_alias=destiny_alias,
manifest=manifest,
result_field_name=result_field_name,
values=values,
)

for value_id in destiny.fixed_inputs.values():

self._destinies[destiny.destiny_id] = destiny
# TODO: store history?
self._destinies_by_value.setdefault(value_id, {})[destiny_alias] = destiny
self._cached_value_aliases.setdefault(value_id, {})[destiny_alias] = destiny

return destiny

def attach_destiny_as_property(
self,
destiny: Union[uuid.UUID, "Destiny"],
field_names: Union[Iterable[str], None] = None,
):

if field_names:
raise NotImplementedError()

if isinstance(destiny, uuid.UUID):
destiny = self._destinies[destiny]

values = self.load_values(destiny.fixed_inputs)

already_stored: List[uuid.UUID] = []
for v in values.values():
if v.is_stored:
already_stored.append(v.value_id)

if already_stored:
stored = (str(v) for v in already_stored)
raise Exception(
f"Can't attach destiny as property, value(s) already stored: {', '.join(stored)}"
)

if destiny.result_value_id is None:
destiny.execute(kiara=self._kiara)

for v in values.values():
assert destiny.result_value_id is not None
v.add_property(
value_id=destiny.result_value_id,
property_path=destiny.destiny_alias,
add_origin_to_property_value=True,
)

def get_registered_destiny(
self, value_id: uuid.UUID, destiny_alias: str
) -> "Destiny":

destiny = self._destinies_by_value.get(value_id, {}).get(destiny_alias, None)
if destiny is None:
raise Exception(
f"No destiny '{destiny_alias}' available for value '{value_id}'."
)

return destiny

def register_data(
self,
data: Any,
Expand Down
1 change: 1 addition & 0 deletions src/kiara/registries/destinies/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def register_destiny_archive(

def _extract_archive(self, alias: str) -> Tuple[str, str]:

print(alias)
if "." not in alias:
assert self._default_destiny_store is not None
return (self._default_destiny_store, alias)
Expand Down
22 changes: 17 additions & 5 deletions src/kiara/registries/events/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,14 @@ def attach_metadata(self, value: Value):
op_details: ExtractMetadataDetails = op.operation_details # type: ignore
input_field_name = op_details.input_field_name
result_field_name = op_details.result_field_name
self._kiara.destiny_registry.add_destiny(

# self._kiara.destiny_registry.add_destiny(
# destiny_alias=f"metadata.{metadata_key}",
# values={input_field_name: value.value_id},
# manifest=op,
# result_field_name=result_field_name,
# )
self._kiara.data_registry.register_destiny(
destiny_alias=f"metadata.{metadata_key}",
values={input_field_name: value.value_id},
manifest=op,
Expand All @@ -77,13 +84,18 @@ def resolve_all_metadata(self, value: Value):

assert not value.is_stored

aliases = self._kiara.destiny_registry.get_destiny_aliases_for_value(
# aliases = self._kiara.destiny_registry.get_destiny_aliases_for_value(
# value_id=value.value_id
# )

aliases = self._kiara.data_registry.get_destiny_aliases_for_value(
value_id=value.value_id
)

for alias in aliases:
destiny = self._kiara.destiny_registry.get_destiny(
destiny = self._kiara.data_registry.get_registered_destiny(
value_id=value.value_id, destiny_alias=alias
)
self._kiara.destiny_registry.resolve_destiny(destiny)
self._kiara.destiny_registry.attach_as_property(destiny)
# self._kiara.destiny_registry.resolve_destiny(destiny)
# self._kiara.destiny_registry.attach_as_property(destiny)
self._kiara.data_registry.attach_destiny_as_property(destiny)

0 comments on commit bbe5040

Please sign in to comment.