Skip to content

Commit

Permalink
chore: misc optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Feb 5, 2024
1 parent e552b5a commit 8f77030
Show file tree
Hide file tree
Showing 12 changed files with 298 additions and 68 deletions.
5 changes: 4 additions & 1 deletion src/kiara/data_types/included_core_types/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,10 @@ def serialize(self, data: KiaraFileBundle) -> "SerializedData":
file_data[rel_path] = {"type": "file", "codec": "raw", "file": file.path}
file_metadata[rel_path] = {
"file_name": file.file_name,
# "import_time": file.import_time,
"size": file.size,
"mime_type": file.mime_type,
"metadata": file.metadata,
"metadata_schemas": file.metadata_schemas,
}

# bundle_metadata = orjson_dumps(data.metadata)
Expand Down
7 changes: 5 additions & 2 deletions src/kiara/interfaces/cli/data/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from kiara.exceptions import InvalidCommandLineInvocation
from kiara.utils import log_exception, log_message
from kiara.utils.cli import output_format_option, terminal_print, terminal_print_model
from kiara.utils.cli.exceptions import handle_exception

# from kiara.interfaces.python_api.models.info import RENDER_FIELDS, ValueInfo, ValuesInfo
# from kiara.interfaces.tui.pager import PagerApp
Expand All @@ -38,6 +39,7 @@
if TYPE_CHECKING:
from kiara.api import Kiara, KiaraAPI
from kiara.operations.included_core_operations.filter import FilterOperationType
from kiara.registries.data import DataArchive


logger = structlog.getLogger()
Expand Down Expand Up @@ -236,6 +238,7 @@ def list_values(
)
@output_format_option()
@click.pass_context
@handle_exception()
def explain_value(
ctx,
value_id: Tuple[str],
Expand Down Expand Up @@ -303,6 +306,7 @@ def explain_value(
# is_flag=True,
# )
@click.pass_context
@handle_exception()
def load_value(ctx, value: str):
"""Load a stored value and print it in a format suitable for the terminal."""
# kiara_obj: Kiara = ctx.obj["kiara"]
Expand Down Expand Up @@ -659,7 +663,6 @@ def export_data_store(
@click.pass_context
def import_data_store(ctx, archive: str):

from kiara.registries.data import DataArchive
from kiara.utils.stores import check_external_archive

kiara_api: KiaraAPI = ctx.obj.kiara_api
Expand All @@ -668,7 +671,7 @@ def import_data_store(ctx, archive: str):

archives = check_external_archive(archive)

data_archive: DataArchive = archives.get("data", None) # type: ignore
data_archive: "DataArchive" = archives.get("data", None) # type: ignore

if not data_archive:
terminal_print(f"[red]Error[/red]: No data archives found in '{archive}'")
Expand Down
19 changes: 14 additions & 5 deletions src/kiara/interfaces/python_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@
ValuesInfo,
)
from kiara.interfaces.python_api.models.job import JobDesc
from kiara.interfaces.python_api.models.workflow import WorkflowMatcher
from kiara.interfaces.python_api.value import StoreValueResult, StoreValuesResult
from kiara.interfaces.python_api.workflow import Workflow
from kiara.models.context import ContextInfo, ContextInfos
from kiara.models.module.jobs import ActiveJob
from kiara.models.module.manifest import Manifest
Expand Down Expand Up @@ -94,6 +92,7 @@
PipelinesMap,
WorkflowsMap,
)
from kiara.interfaces.python_api.workflow import Workflow
from kiara.models.module.pipeline import PipelineConfig, PipelineStructure
from kiara.models.module.pipeline.pipeline import PipelineGroupInfo, PipelineInfo

Expand Down Expand Up @@ -1583,6 +1582,7 @@ def assemble_value_map(
if register_data:
temp: Dict[str, Union[str, Value, uuid.UUID, None]] = {}
for k, v in values.items():

if isinstance(v, (Value, uuid.UUID)):
temp[k] = v
continue
Expand All @@ -1604,6 +1604,9 @@ def assemble_value_map(
if v.startswith("alias:"):
temp[k] = v
continue
elif v.startswith("archive:"):
temp[k] = v
continue

try:
v = uuid.UUID(v)
Expand Down Expand Up @@ -2330,7 +2333,7 @@ def list_workflow_alias_names(self) -> List[str]:

def get_workflow(
self, workflow: Union[str, uuid.UUID], create_if_necessary: bool = True
) -> Workflow:
) -> "Workflow":
"""Retrieve the workflow instance with the specified id or alias.
NOTE: this is a provisional endpoint, don't use in anger yet
Expand Down Expand Up @@ -2380,13 +2383,15 @@ def get_workflow(
return workflow_obj

def retrieve_workflow_info(
self, workflow: Union[str, uuid.UUID, Workflow]
self, workflow: Union[str, uuid.UUID, "Workflow"]
) -> WorkflowInfo:
"""Retrieve information about the specified workflow.
NOTE: this is a provisional endpoint, don't use in anger yet
"""

from kiara.interfaces.python_api.workflow import Workflow

if isinstance(workflow, Workflow):
_workflow: Workflow = workflow
else:
Expand All @@ -2398,6 +2403,7 @@ def list_workflows(self, **matcher_params) -> "WorkflowsMap":
"""List all available workflow sessions, indexed by their unique id."""

from kiara.interfaces.python_api.models.doc import WorkflowsMap
from kiara.interfaces.python_api.models.workflow import WorkflowMatcher

workflows = {}

Expand All @@ -2424,6 +2430,7 @@ def list_workflow_aliases(self, **matcher_params) -> "WorkflowsMap":
"""

from kiara.interfaces.python_api.models.doc import WorkflowsMap
from kiara.interfaces.python_api.workflow import Workflow

if matcher_params:
matcher_params["has_alias"] = True
Expand Down Expand Up @@ -2487,12 +2494,14 @@ def create_workflow(
documentation: Union[Any, None] = None,
save: bool = False,
force_alias: bool = False,
) -> Workflow:
) -> "Workflow":
"""Create a workflow instance.
NOTE: this is a provisional endpoint, don't use in anger yet
"""

from kiara.interfaces.python_api.workflow import Workflow

if workflow_alias is not None:
try:
uuid.UUID(workflow_alias)
Expand Down
8 changes: 6 additions & 2 deletions src/kiara/models/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import shutil
import tempfile
from pathlib import Path
from typing import Any, ClassVar, Dict, List, Mapping, Union
from typing import Any, Callable, ClassVar, Dict, List, Mapping, Union

import structlog
from deepdiff import DeepHash
Expand Down Expand Up @@ -116,6 +116,7 @@ def load_file(
)

_path: Union[str, None] = PrivateAttr(default=None)
_path_resolver: Union[Callable, None] = PrivateAttr(default=None)
_file_hash: Union[str, None] = PrivateAttr(default=None)
_file_cid: Union[CID, None] = PrivateAttr(default=None)

Expand All @@ -126,7 +127,10 @@ def load_file(
@property
def path(self) -> str:
if self._path is None:
raise Exception("File path not set for file model.")
if self._path_resolver is not None:
self._path = self._path_resolver()
else:
raise Exception("File path not set for file model.")
return self._path

def _retrieve_data_to_hash(self) -> Any:
Expand Down
53 changes: 28 additions & 25 deletions src/kiara/models/values/value.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
Any,
ClassVar,
Dict,
Generator,
Iterable,
List,
Literal,
Expand Down Expand Up @@ -79,7 +80,7 @@ class SerializedChunks(BaseModel, abc.ABC):
@abc.abstractmethod
def get_chunks(
self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
) -> Iterable[Union[str, BytesLike]]:
) -> Generator[Union[str, "BytesLike"], None, None]:
"""
Retrieve the chunks belonging to this data instance.
Expand Down Expand Up @@ -365,31 +366,33 @@ class SerializedChunkIDs(SerializedChunks):
_data_registry: "DataRegistry" = PrivateAttr(default=None)

def get_chunks(
self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
) -> Iterable[Union[str, BytesLike]]:
self, as_files: bool = True, symlink_ok: bool = True
) -> Generator[Union[str, BytesLike], None, None]:
"""Retrieve the chunks of this value data.
if isinstance(as_files, (bool, str)):
return (
self._data_registry.retrieve_chunk(
chunk_id=chunk,
archive_id=self.archive_id,
as_file=as_files,
symlink_ok=symlink_ok,
)
for chunk in self.chunk_id_list
)
else:
result = []
for idx, chunk_id in enumerate(self.chunk_id_list):
file = as_files[idx]
self._data_registry.retrieve_chunk(
chunk_id=chunk_id,
archive_id=self.archive_id,
as_file=file,
symlink_ok=symlink_ok,
)
result.append(file)
return result
If 'as_files' is 'True', it will return strings representing paths to files containing the chunk data. If symlink_ok is also set to 'True', the returning Path could potentially be a symlink, which means the underlying function might not need to copy the file. In this case, you are responsible to not change the contents of the path, ever.
If 'as_files' is 'False', BytesLike objects will be returned, containing the chunk data bytes directly.
"""

chunk_ids = self.chunk_id_list
return self._data_registry.retrieve_chunks(
chunk_ids=chunk_ids,
as_files=as_files,
symlink_ok=symlink_ok,
archive_id=self.archive_id,
)

# return (
# self._data_registry.retrieve_chunk(
# chunk_id=chunk,
# archive_id=self.archive_id,
# as_file=as_files,
# symlink_ok=symlink_ok,
# )
# for chunk in self.chunk_id_list
# )

def get_number_of_chunks(self) -> int:
return len(self.chunk_id_list)
Expand Down
33 changes: 24 additions & 9 deletions src/kiara/modules/included_core_modules/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,16 +196,31 @@ def to__python_object(self, data: SerializedData, **config: Any):
included_files = {}
for rel_path in keys:

chunks = data.get_serialized_data(rel_path)
assert chunks.get_number_of_chunks() == 1
if "size" not in file_metadata[rel_path].keys():
# old style, can be removed at some point
# file metadata was added feb 2024

chunks = data.get_serialized_data(rel_path)
assert chunks.get_number_of_chunks() == 1
files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
assert len(files) == 1

file: str = files[0] # type: ignore
file_name = file_metadata[rel_path]["file_name"]
# import_time = file_metadata[rel_path]["import_time"]
fm = KiaraFile.load_file(source=file, file_name=file_name)
else:
fm = KiaraFile(**file_metadata[rel_path])

def _load_file():
chunks = data.get_serialized_data(rel_path)
assert chunks.get_number_of_chunks() == 1
files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
assert len(files) == 1
return files[0]

fm._path_resolver = _load_file

files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
assert len(files) == 1

file: str = files[0] # type: ignore
file_name = file_metadata[rel_path]["file_name"]
# import_time = file_metadata[rel_path]["import_time"]
fm = KiaraFile.load_file(source=file, file_name=file_name)
included_files[rel_path] = fm

fb_metadata = metadata.pop("metadata")
Expand Down
3 changes: 2 additions & 1 deletion src/kiara/registries/aliases/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@ def aliases(self) -> Mapping[str, AliasItem]:
return self._cached_aliases

# TODO: multithreading lock

all_aliases: Dict[str, AliasItem] = {}
all_aliases_by_id: Dict[uuid.UUID, Set[AliasItem]] = {}
dynamic_stores = []

for archive_alias, archive in self._alias_archives.items():
alias_map = archive.retrieve_all_aliases()
if alias_map is None:
Expand Down Expand Up @@ -261,6 +261,7 @@ def aliases(self) -> Mapping[str, AliasItem]:
self._cached_aliases = {k: all_aliases[k] for k in sorted(all_aliases.keys())}
self._cached_aliases_by_id = all_aliases_by_id
self._dynamic_stores = dynamic_stores
self._cached_dynamic_aliases = {}

return self._cached_aliases

Expand Down
Loading

0 comments on commit 8f77030

Please sign in to comment.