Skip to content

Commit

Permalink
refactor: store management
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Jan 22, 2024
1 parent 8d8d061 commit 0cf7385
Show file tree
Hide file tree
Showing 15 changed files with 576 additions and 166 deletions.
50 changes: 42 additions & 8 deletions src/kiara/context/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# from alembic import command # type: ignore
from pydantic import Field

from kiara.context.config import KiaraConfig, KiaraContextConfig
from kiara.context.config import KiaraArchiveReference, KiaraConfig, KiaraContextConfig
from kiara.context.runtime_config import KiaraRuntimeConfig
from kiara.data_types import DataType
from kiara.exceptions import KiaraContextException
Expand Down Expand Up @@ -45,7 +45,7 @@
from kiara.registries.rendering import RenderRegistry
from kiara.registries.types import TypeRegistry
from kiara.registries.workflows import WorkflowRegistry
from kiara.utils import log_exception
from kiara.utils import log_exception, log_message
from kiara.utils.class_loading import find_all_archive_types
from kiara.utils.operations import filter_operations

Expand Down Expand Up @@ -169,23 +169,23 @@ def __init__(

config_cls = archive_cls._config_cls
archive_config = config_cls(**archive.config)
archive_obj = archive_cls(archive_id=archive.archive_uuid, config=archive_config) # type: ignore
archive_obj = archive_cls(archive_alias=archive_alias, archive_config=archive_config) # type: ignore
for supported_type in archive_obj.supported_item_types():
if supported_type == "data":
self.data_registry.register_data_archive(
archive_obj, alias=archive_alias # type: ignore
archive_obj, # type: ignore
)
if supported_type == "job_record":
self.job_registry.register_job_archive(archive_obj, alias=archive_alias) # type: ignore
self.job_registry.register_job_archive(archive_obj) # type: ignore

if supported_type == "alias":
self.alias_registry.register_archive(archive_obj, alias=archive_alias) # type: ignore
self.alias_registry.register_archive(archive_obj) # type: ignore

if supported_type == "destiny":
self.destiny_registry.register_destiny_archive(archive_obj, alias=archive_alias) # type: ignore
self.destiny_registry.register_destiny_archive(archive_obj) # type: ignore

if supported_type == "workflow":
self.workflow_registry.register_archive(archive_obj, alias=archive_alias) # type: ignore
self.workflow_registry.register_archive(archive_obj) # type: ignore

if self._runtime_config.lock_context:
self.lock_context()
Expand Down Expand Up @@ -314,6 +314,40 @@ def module_type_names(self) -> Iterable[str]:
# ===================================================================================================
# kiara session API methods

def register_external_archive(
self,
archive: Union[str, KiaraArchive, List[KiaraArchive], List[str]],
allow_write_access: bool = False,
):

if isinstance(archive, (KiaraArchive, str)):
_archives = [archive]
else:
_archives = archive

archive_instances = set()
for _archive in _archives:

if isinstance(_archive, KiaraArchive):
archive_instances.add(_archive)
# TODO: handle write access
continue

loaded = KiaraArchiveReference.load_existing_archive(
archive_uri=_archive, allow_write_access=allow_write_access
)

archive_instances.update(loaded)

for _archve_inst in archive_instances:
log_message(
"register.external.archive",
archive=_archve_inst.archive_alias,
allow_write_access=allow_write_access,
)

return list(archive_instances)

def create_manifest(
self, module_or_operation: str, config: Union[Mapping[str, Any], None] = None
) -> Manifest:
Expand Down
203 changes: 164 additions & 39 deletions src/kiara/context/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
from kiara.registries.ids import ID_REGISTRY
from kiara.utils import log_message
from kiara.utils.files import get_data_from_file
from kiara.utils.stores import create_store

if TYPE_CHECKING:
from kiara.context import Kiara
Expand Down Expand Up @@ -62,16 +61,137 @@ def config_file_settings_source(settings: BaseSettings) -> Dict[str, Any]:


class KiaraArchiveConfig(BaseModel):
"""Configuration data that can be used to load an existing kiara archive."""

archive_id: str = Field(description="The unique archive id.")
# archive_alias: str = Field(description="The unique archive id.")
archive_type: str = Field(description="The archive type.")
config: Mapping[str, Any] = Field(
description="Archive type specific config.", default_factory=dict
)


class KiaraArchiveReference(BaseModel):
@classmethod
def load_existing_archive(
cls,
archive_uri: str,
store_type: Union[str, None, Iterable[str]] = None,
allow_write_access: bool = False,
**kwargs: Any,
) -> "KiaraArchiveReference":

from kiara.utils.class_loading import find_all_archive_types

archive_types = find_all_archive_types()

archive_configs: List[KiaraArchiveConfig] = []
archives: List[KiaraArchive] = []

archive_alias = archive_uri

if store_type:
if isinstance(store_type, str):
archive_cls: Type[KiaraArchive] = archive_types.get(store_type, None)
if archive_cls is None:
raise Exception(
f"Can't create context: no archive type '{store_type}' available. Available types: {', '.join(archive_types.keys())}"
)
data = archive_cls.load_store_config(
store_uri=archive_uri,
allow_write_access=allow_write_access,
**kwargs,
)
archive: KiaraArchive = archive_cls(
archive_alias=archive_alias, archive_config=data
)
archive_configs.append(data)
archives.append(archive)
else:
for st in store_type:
archive_cls = archive_types.get(st, None)
if archive_cls is None:
raise Exception(
f"Can't create context: no archive type '{store_type}' available. Available types: {', '.join(archive_types.keys())}"
)
data = archive_cls.load_store_config(
store_uri=archive_uri,
allow_write_access=allow_write_access,
**kwargs,
)
archive: KiaraArchive = archive_cls(
archive_alias=archive_alias, archive_config=data
)
archive_configs.append(data)
archives.append(archive)
else:
for archive_type, archive_cls in archive_types.items():

data = archive_cls.load_store_config(
store_uri=archive_uri,
allow_write_access=allow_write_access,
**kwargs,
)
if data is None:
continue
archive: KiaraArchive = archive_cls(
archive_alias=archive_alias, archive_config=data
)
archive_configs.append(data)
archives.append(archive)

if archives is None:
raise Exception(
f"Can't create context: no valid archive found at '{archive_uri}'"
)

result = cls(
archive_uri=archive_uri,
archive_alias=archive_alias,
allow_write_access=allow_write_access,
archive_configs=archive_configs,
)
result._archives = archives
return result

archive_uri: str = Field(description="The uri that points to the archive.")
archive_alias: str = Field(
description="The alias that is used for the archives contained in here."
)
allow_write_access: bool = Field(
description="Whether to allow write access to the archives contained here.",
default=False,
)
archive_configs: List[KiaraArchiveConfig] = Field(
description="All the archives this kiara context can use and the aliases they are registered with."
)
_archives: Union[None, List["KiaraArchive"]] = PrivateAttr(default=None)

@property
def archive_uuid(self) -> uuid.UUID:
return uuid.UUID(self.archive_id)
def archives(self) -> List["KiaraArchive"]:

if self._archives is not None:
return self._archives

from kiara.utils.class_loading import find_all_archive_types

archive_types = find_all_archive_types()

result = []
for config in self.archive_configs:
if config.archive_type not in archive_types.keys():
raise Exception(
f"Can't create context: no archive type '{config.archive_type}' available. Available types: {', '.join(archive_types.keys())}"
)

archive_cls = archive_types[config.archive_type]
archive = archive_cls.load_store_config(
archive_uri=self.archive_uri,
allow_write_access=self.allow_write_access,
)
result.append(archive)

self._archives = result
return self._archives


class KiaraContextConfig(BaseModel):
Expand All @@ -98,22 +218,22 @@ def add_pipelines(self, *pipelines: str):
"ignore.pipeline", reason="path does not exist", path=pipeline
)

def create_archive(
self, archive_alias: str, allow_write_access: bool = False
) -> "KiaraArchive":
"""Create the kiara archive with the specified alias.
Make sure you know what you are doing when setting 'allow_write_access' to True.
"""

store_config = self.archives[archive_alias]
store = create_store(
archive_id=store_config.archive_uuid,
store_type=store_config.archive_type,
store_config=store_config.config,
allow_write_access=allow_write_access,
)
return store
# def create_archive(
# self, archive_alias: str, allow_write_access: bool = False
# ) -> "KiaraArchive":
# """Create the kiara archive with the specified alias.
#
# Make sure you know what you are doing when setting 'allow_write_access' to True.
# """
#
# store_config = self.archives[archive_alias]
# store = create_store(
# archive_id=store_config.archive_uuid,
# store_type=store_config.archive_type,
# store_config=store_config.config,
# allow_write_access=allow_write_access,
# )
# return store


class KiaraSettings(BaseSettings):
Expand All @@ -130,8 +250,8 @@ class KiaraSettings(BaseSettings):
KIARA_SETTINGS = KiaraSettings()


def create_default_store(
store_id: uuid.UUID, store_type: str, stores_base_path: str
def create_default_store_config(
store_type: str, stores_base_path: str
) -> KiaraArchiveConfig:

env_registry = EnvironmentRegistry.instance()
Expand All @@ -144,10 +264,18 @@ def create_default_store(

archive_info: ArchiveTypeInfo = available_archives.item_infos[store_type]
cls: Type[BaseArchive] = archive_info.python_class.get_class() # type: ignore

Check failure on line 266 in src/kiara/context/config.py

View workflow job for this annotation

GitHub Actions / linting-linux

Ruff (F821)

src/kiara/context/config.py:266:15: F821 Undefined name `BaseArchive`
config = cls.create_new_config(store_id=store_id, stores_base_path=stores_base_path)

log_message(
"create_new_store",
stores_base_path=stores_base_path,
store_type=cls.__name__,
)

config = cls._config_cls.create_new_store_config(store_base_path=stores_base_path)

# store_id: uuid.UUID = config.get_archive_id()

data_store = KiaraArchiveConfig(
archive_id=store_id,
archive_type=store_type,
config=config.model_dump(),
)
Expand Down Expand Up @@ -349,17 +477,18 @@ def _validate_context(self, context_config: KiaraContextConfig) -> bool:

changed = False

default_store_id = context_config.context_id

sqlite_base_path = os.path.join(self.stores_base_path, "sqlite_stores")
filesystem_base_path = os.path.join(self.stores_base_path, "filesystem_stores")

if DEFAULT_DATA_STORE_MARKER not in context_config.archives.keys():
# data_store_type = "filesystem_data_store"
# TODO: change that back
data_store_type = "sqlite_data_store"

data_store = create_default_store(
store_id=default_store_id,
data_store = create_default_store_config(
store_type=data_store_type,
stores_base_path=self.stores_base_path,
stores_base_path=sqlite_base_path,
)
context_config.archives[DEFAULT_DATA_STORE_MARKER] = data_store
changed = True
Expand All @@ -369,10 +498,9 @@ def _validate_context(self, context_config: KiaraContextConfig) -> bool:
# job_store_type = "filesystem_job_store"
job_store_type = "sqlite_job_store"

job_store = create_default_store(
store_id=default_store_id,
job_store = create_default_store_config(
store_type=job_store_type,
stores_base_path=self.stores_base_path,
stores_base_path=sqlite_base_path,
)

context_config.archives[DEFAULT_JOB_STORE_MARKER] = job_store
Expand All @@ -383,8 +511,7 @@ def _validate_context(self, context_config: KiaraContextConfig) -> bool:
alias_store_type = "filesystem_alias_store"
alias_store_type = "sqlite_alias_store"

alias_store = create_default_store(
store_id=default_store_id,
alias_store = create_default_store_config(
store_type=alias_store_type,
stores_base_path=self.stores_base_path,
)
Expand All @@ -394,10 +521,9 @@ def _validate_context(self, context_config: KiaraContextConfig) -> bool:
if DEFAULT_WORKFLOW_STORE_MARKER not in context_config.archives.keys():

workflow_store_type = "filesystem_workflow_store"
workflow_store = create_default_store(
store_id=default_store_id,
workflow_store = create_default_store_config(
store_type=workflow_store_type,
stores_base_path=self.stores_base_path,
stores_base_path=os.path.join(filesystem_base_path, "workflows"),
)
context_config.archives[DEFAULT_WORKFLOW_STORE_MARKER] = workflow_store
changed = True
Expand All @@ -406,10 +532,9 @@ def _validate_context(self, context_config: KiaraContextConfig) -> bool:

destiny_store_type = "filesystem_destiny_store"

destiny_store = create_default_store(
store_id=default_store_id,
destiny_store = create_default_store_config(
store_type=destiny_store_type,
stores_base_path=self.stores_base_path,
stores_base_path=os.path.join(filesystem_base_path, "destinies"),
)
context_config.archives[METADATA_DESTINY_STORE_MARKER] = destiny_store
changed = True
Expand Down
2 changes: 2 additions & 0 deletions src/kiara/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@

KIARA_DEFAULT_ROOT_NODE_ID = "__self__"

KIARA_SQLITE_STORE_EXTENSION = "kiara"

VALUE_ATTR_DELIMITER = "::"
VALID_VALUE_QUERY_CATEGORIES = ["data", "properties"]

Expand Down
Loading

0 comments on commit 0cf7385

Please sign in to comment.