Skip to content

Commit

Permalink
chore: before destiny refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Jan 23, 2024
1 parent 0cf7385 commit f56a268
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/kiara/context/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,6 @@ def _validate_context(self, context_config: KiaraContextConfig) -> bool:

changed = False


sqlite_base_path = os.path.join(self.stores_base_path, "sqlite_stores")
filesystem_base_path = os.path.join(self.stores_base_path, "filesystem_stores")

Expand Down Expand Up @@ -521,6 +520,7 @@ def _validate_context(self, context_config: KiaraContextConfig) -> bool:
if DEFAULT_WORKFLOW_STORE_MARKER not in context_config.archives.keys():

workflow_store_type = "filesystem_workflow_store"

workflow_store = create_default_store_config(
store_type=workflow_store_type,
stores_base_path=os.path.join(filesystem_base_path, "workflows"),
Expand Down
1 change: 0 additions & 1 deletion src/kiara/registries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ def create_new_store_config(
# Connect to the SQLite database (or create it if it doesn't exist)
import sqlite3

print(archive_path)
conn = sqlite3.connect(archive_path)

# Create a cursor object
Expand Down
10 changes: 5 additions & 5 deletions src/kiara/registries/aliases/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,13 @@ def __init__(self, kiara: "Kiara"):
def register_archive(
self,
archive: AliasArchive,
alias: Union[str, None] = None,
set_as_default_store: Union[bool, None] = None,
):

alias_archive_id = archive.archive_id
archive.register_archive(kiara=self._kiara)
alias = archive.archive_alias

if alias is None:
alias = str(alias_archive_id)
if not alias:
raise Exception("Invalid alias archive alias: can't be empty.")

if "." in alias:
raise Exception(
Expand All @@ -113,6 +111,8 @@ def register_archive(
if alias in self._alias_archives.keys():
raise Exception(f"Can't add store, alias '{alias}' already registered.")

archive.register_archive(kiara=self._kiara)

self._alias_archives[alias] = archive
is_store = False
is_default_store = False
Expand Down
12 changes: 7 additions & 5 deletions src/kiara/registries/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,19 +258,21 @@ def retrieve_all_available_value_ids(self) -> Set[uuid.UUID]:
def register_data_archive(
self,
archive: DataArchive,
alias: Union[str, None] = None,
set_as_default_store: Union[bool, None] = None,
) -> str:

data_store_id = archive.archive_id
archive.register_archive(kiara=self._kiara)
if alias is None:
alias = str(data_store_id)
alias = archive.archive_alias

if not alias:
raise Exception("Invalid data archive alias: can't be empty.")

if alias in self._data_archives.keys():
raise Exception(
f"Can't add data archive, alias '{alias}' already registered."
)

archive.register_archive(kiara=self._kiara)

self._data_archives[alias] = archive
is_store = False
is_default_store = False
Expand Down
46 changes: 23 additions & 23 deletions src/kiara/registries/destinies/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ def destiny_archives(self) -> Mapping[str, DestinyArchive]:
def register_destiny_archive(
self,
archive: DestinyArchive,
alias: Union[str, None] = None,
set_as_default_store: Union[bool, None] = None,
):

destiny_store_id = archive.archive_id
alias = archive.archive_alias
archive.register_archive(kiara=self._kiara)
if alias is None:
alias = str(destiny_store_id)

if not alias:
raise Exception("Invalid destiny archive alias: can't be empty.")

if alias in self._destiny_archives.keys():
raise Exception(
Expand Down Expand Up @@ -288,22 +288,22 @@ def attach_as_property(
add_origin_to_property_value=True,
)

def store_destiny(self, destiny_id: Union[Destiny, uuid.UUID]):

try:
_destiny_id: uuid.UUID = destiny_id.destiny_id # type: ignore
except Exception:
# just in case this is a 'Destiny' object
_destiny_id = destiny_id # type: ignore

store_id = self._destiny_store_map[_destiny_id]
destiny = self._destinies[_destiny_id]
store: DestinyStore = self._destiny_archives[store_id] # type: ignore

if not isinstance(store, DestinyStore):
full_alias = f"{store_id}.{destiny.destiny_alias}"
raise Exception(
f"Can't store destiny '{full_alias}': prefix '{store_id}' not writable in this kiara context."
)

store.persist_destiny(destiny=destiny)
# def store_destiny(self, destiny_id: Union[Destiny, uuid.UUID]):
#
# try:
# _destiny_id: uuid.UUID = destiny_id.destiny_id # type: ignore
# except Exception:
# # just in case this is a 'Destiny' object
# _destiny_id = destiny_id # type: ignore
#
# store_id = self._destiny_store_map[_destiny_id]
# destiny = self._destinies[_destiny_id]
# store: DestinyStore = self._destiny_archives[store_id] # type: ignore
#
# if not isinstance(store, DestinyStore):
# full_alias = f"{store_id}.{destiny.destiny_alias}"
# raise Exception(
# f"Can't store destiny '{full_alias}': prefix '{store_id}' not writable in this kiara context."
# )
#
# store.persist_destiny(destiny=destiny)
116 changes: 116 additions & 0 deletions src/kiara/registries/destinies/sqlite_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
import uuid
from pathlib import Path
from typing import Set, Union

from sqlalchemy import Engine, create_engine, text

from kiara.models.module.destiny import Destiny
from kiara.registries import SqliteArchiveConfig
from kiara.registries.destinies import DestinyArchive, DestinyStore
from kiara.utils.windows import fix_windows_longpath


class SqliteDestinyArchive(DestinyArchive):

_archive_type_name = "sqlite_destiny_archive"
_config_cls = SqliteArchiveConfig

def __init__(self, archive_alias: str, archive_config: SqliteArchiveConfig):

DestinyArchive.__init__(
self, archive_alias=archive_alias, archive_config=archive_config
)
self._db_path: Union[Path, None] = None
self._cached_engine: Union[Engine, None] = None
# self._lock: bool = True

def _retrieve_archive_id(self) -> uuid.UUID:
sql = text("SELECT value FROM archive_metadata WHERE key='archive_id'")

with self.sqlite_engine.connect() as connection:
result = connection.execute(sql)
row = result.fetchone()
if row is None:
raise Exception("No archive ID found in metadata")
return uuid.UUID(row[0])

@property
def sqlite_path(self):

if self._db_path is not None:
return self._db_path

db_path = Path(self.config.sqlite_db_path).resolve()
self._db_path = fix_windows_longpath(db_path)

if self._db_path.exists():
return self._db_path

self._db_path.parent.mkdir(parents=True, exist_ok=True)
return self._db_path

@property
def db_url(self) -> str:
return f"sqlite:///{self.sqlite_path}"

@property
def sqlite_engine(self) -> "Engine":

if self._cached_engine is not None:
return self._cached_engine

# def _pragma_on_connect(dbapi_con, con_record):
# dbapi_con.execute("PRAGMA query_only = ON")

self._cached_engine = create_engine(self.db_url, future=True)
create_table_sql = """
CREATE TABLE IF NOT EXISTS destiny_details (
job_hash TEXT PRIMARY KEY,
manifest_hash TEXT NOT NULL,
inputs_hash TEXT NOT NULL,
job_metadata TEXT NOT NULL
);
"""

with self._cached_engine.begin() as connection:
for statement in create_table_sql.split(";"):
if statement.strip():
connection.execute(text(statement))

# if self._lock:
# event.listen(self._cached_engine, "connect", _pragma_on_connect)
return self._cached_engine

def get_all_value_ids(self) -> Set[uuid.UUID]:
pass

def get_destiny_aliases_for_value(
self, value_id: uuid.UUID
) -> Union[Set[str], None]:
pass

def get_destiny(self, value_id: uuid.UUID, destiny: str) -> Destiny:
pass


class SqliteJobStore(SqliteDestinyArchive, DestinyStore):

_archive_type_name = "sqlite_destiny_store"

def persist_destiny(self, destiny: Destiny):

for value_id in destiny.fixed_inputs.values():

path = self._translate_value_id(
value_id=value_id, destiny_alias=destiny.destiny_alias
)
if path.exists():
logger.debug("replace.destiny.file", path=path.as_posix())
path.unlink()
# raise Exception(
# f"Can't persist destiny '{destiny.destiny_id}': already persisted."
# )

path.parent.mkdir(parents=True, exist_ok=True)
fix_windows_symlink(destiny_path, path)
9 changes: 6 additions & 3 deletions src/kiara/registries/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,19 @@ def suppoerted_event_types(self) -> Iterable[Type[KiaraEvent]]:

return [JobArchiveAddedEvent, JobRecordPreStoreEvent, JobRecordStoredEvent]

def register_job_archive(self, archive: JobArchive, alias: Union[str, None] = None):
def register_job_archive(self, archive: JobArchive):

if alias is None:
alias = str(archive.archive_id)
alias = archive.archive_alias

if not alias:
raise Exception("Invalid job archive alias: can't be empty.")

if alias in self._job_archives.keys():
raise Exception(
f"Can't register job store, store id already registered: {alias}."
)

archive.register_archive(self._kiara)
self._job_archives[alias] = archive

is_store = False
Expand Down
12 changes: 5 additions & 7 deletions src/kiara/registries/workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,11 @@ def __init__(self, kiara: "Kiara"):
def register_archive(
self,
archive: WorkflowArchive,
alias: Union[str, None] = None,
set_as_default_store: Union[bool, None] = None,
):

workflow_archive_id = archive.archive_id
archive.register_archive(kiara=self._kiara)

if alias is None:
alias = str(workflow_archive_id)
alias = archive.archive_alias
if not alias:
raise Exception("Invalid workflows archive alias: can't be empty.")

if "." in alias:
raise Exception(
Expand All @@ -167,6 +163,8 @@ def register_archive(
f"Can't add store, workflow archive alias '{alias}' already registered."
)

archive.register_archive(kiara=self._kiara)

self._workflow_archives[alias] = archive
is_store = False
is_default_store = False
Expand Down

0 comments on commit f56a268

Please sign in to comment.