Skip to content

Commit c595892

Browse files
committed
chore: first working version of sqlite data store
1 parent 2462c2b commit c595892

File tree

4 files changed

+311
-81
lines changed

4 files changed

+311
-81
lines changed

src/kiara/defaults.py

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
kiara_app_dirs.user_data_dir, "context_locks"
4444
)
4545

46+
4647
KIARA_DEFAULT_STAGES_EXTRACTION_TYPE = "early"
4748

4849
INIT_EXAMPLE_NAME = "init"

src/kiara/registries/data/data_store/__init__.py

+74-7
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,22 @@
77

88
import abc
99
import uuid
10+
from io import BytesIO
1011
from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Set, Union
1112

1213
import structlog
1314
from rich.console import RenderableType
1415

1516
from kiara.models.runtime_environment import RuntimeEnvironment
1617
from kiara.models.values.matchers import ValueMatcher
17-
from kiara.models.values.value import PersistedData, Value, ValuePedigree
18+
from kiara.models.values.value import (
19+
SERIALIZE_TYPES,
20+
PersistedData,
21+
SerializedChunkIDs,
22+
SerializedData,
23+
Value,
24+
ValuePedigree,
25+
)
1826
from kiara.models.values.value_schema import ValueSchema
1927
from kiara.registries import ARCHIVE_CONFIG_CLS, BaseArchive
2028

@@ -316,10 +324,10 @@ def _persist_stored_value_info(self, value: Value, persisted_value: PersistedDat
316324
def _persist_value_details(self, value: Value):
317325
pass
318326

319-
@abc.abstractmethod
320-
def _persist_value_data(self, value: Value) -> PersistedData:
321-
"""Persist the actual value data."""
322-
pass
327+
# @abc.abstractmethod
328+
# def _persist_value_data(self, value: Value) -> PersistedData:
329+
# """Persist the actual value data."""
330+
# pass
323331

324332
@abc.abstractmethod
325333
def _persist_value_pedigree(self, value: Value):
@@ -376,6 +384,64 @@ def store_value(self, value: Value) -> PersistedData:
376384

377385
return persisted_value
378386

387+
@abc.abstractmethod
388+
def _persist_chunk(self, chunk_id: str, chunk: Union[str, BytesIO]):
389+
"""Persist the specified chunk, and return the chunk id.
390+
391+
If the chunk is a string, it represents a local file path, otherwise it is a BytesIO instance representing the actual data of the chunk.
392+
"""
393+
pass
394+
395+
def _persist_value_data(self, value: Value) -> PersistedData:
396+
397+
serialized_value: SerializedData = value.serialized_data
398+
399+
chunk_id_map = {}
400+
for key in serialized_value.get_keys():
401+
402+
data_model = serialized_value.get_serialized_data(key)
403+
404+
if data_model.type == "chunk": # type: ignore
405+
chunks: Iterable[Union[str, BytesIO]] = [BytesIO(data_model.chunk)] # type: ignore
406+
elif data_model.type == "chunks": # type: ignore
407+
chunks = (BytesIO(c) for c in data_model.chunks) # type: ignore
408+
elif data_model.type == "file": # type: ignore
409+
chunks = [data_model.file] # type: ignore
410+
elif data_model.type == "files": # type: ignore
411+
chunks = data_model.files # type: ignore
412+
elif data_model.type == "inline-json": # type: ignore
413+
chunks = [BytesIO(data_model.as_json())] # type: ignore
414+
else:
415+
raise Exception(
416+
f"Invalid serialized data type: {type(data_model)}. Available types: {', '.join(SERIALIZE_TYPES)}"
417+
)
418+
419+
chunk_ids = []
420+
for item in zip(serialized_value.get_cids_for_key(key), chunks):
421+
cid = item[0]
422+
_chunk = item[1]
423+
self._persist_chunk(str(cid), _chunk)
424+
chunk_ids.append(str(cid))
425+
426+
scids = SerializedChunkIDs(
427+
chunk_id_list=chunk_ids,
428+
archive_id=self.archive_id,
429+
size=data_model.get_size(),
430+
)
431+
scids._data_registry = self.kiara_context.data_registry
432+
chunk_id_map[key] = scids
433+
434+
pers_value = PersistedData(
435+
archive_id=self.archive_id,
436+
chunk_id_map=chunk_id_map,
437+
data_type=serialized_value.data_type,
438+
data_type_config=serialized_value.data_type_config,
439+
serialization_profile=serialized_value.serialization_profile,
440+
metadata=serialized_value.metadata,
441+
)
442+
443+
return pers_value
444+
379445
def _persist_value(self, value: Value) -> PersistedData:
380446

381447
# TODO: check if value id is already persisted?
@@ -402,8 +468,9 @@ def _persist_value(self, value: Value) -> PersistedData:
402468
value=value, persisted_value=persisted_value_info
403469
)
404470
self._persist_value_details(value=value)
405-
if value.destiny_backlinks:
406-
self._persist_destiny_backlinks(value=value)
471+
# TODO: re-enable?
472+
# if value.destiny_backlinks:
473+
# self._persist_destiny_backlinks(value=value)
407474

408475
return persisted_value_info
409476

src/kiara/registries/data/data_store/filesystem_store.py

+56-51
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717
from kiara.exceptions import KiaraException
1818
from kiara.models.module.jobs import JobRecord
1919
from kiara.models.values.value import (
20-
SERIALIZE_TYPES,
2120
PersistedData,
22-
SerializedChunkIDs,
23-
SerializedData,
2421
Value,
2522
)
2623
from kiara.registries import ArchiveDetails, FileSystemArchiveConfig
@@ -398,57 +395,65 @@ def _persist_destiny_backlinks(self, value: Value):
398395

399396
fix_windows_symlink(value_file, destiny_file)
400397

401-
def _persist_value_data(self, value: Value) -> PersistedData:
398+
def _persist_chunk(self, chunk_id: str, chunk: Union[str, BytesIO]):
402399

403-
serialized_value: SerializedData = value.serialized_data
400+
addr: HashAddress = self.hashfs.put_with_precomputed_hash(chunk, chunk_id)
404401

405-
chunk_id_map = {}
406-
for key in serialized_value.get_keys():
402+
assert addr.id == chunk_id
403+
# return addr
404+
# chunk_ids.append(addr.id)
407405

408-
data_model = serialized_value.get_serialized_data(key)
409-
410-
if data_model.type == "chunk": # type: ignore
411-
chunks: Iterable[Union[str, BytesIO]] = [BytesIO(data_model.chunk)] # type: ignore
412-
elif data_model.type == "chunks": # type: ignore
413-
chunks = (BytesIO(c) for c in data_model.chunks) # type: ignore
414-
elif data_model.type == "file": # type: ignore
415-
chunks = [data_model.file] # type: ignore
416-
elif data_model.type == "files": # type: ignore
417-
chunks = data_model.files # type: ignore
418-
elif data_model.type == "inline-json": # type: ignore
419-
chunks = [BytesIO(data_model.as_json())] # type: ignore
420-
else:
421-
raise Exception(
422-
f"Invalid serialized data type: {type(data_model)}. Available types: {', '.join(SERIALIZE_TYPES)}"
423-
)
424-
425-
chunk_ids = []
426-
for item in zip(serialized_value.get_cids_for_key(key), chunks):
427-
cid = item[0]
428-
_chunk = item[1]
429-
addr: HashAddress = self.hashfs.put_with_precomputed_hash(
430-
_chunk, str(cid)
431-
)
432-
chunk_ids.append(addr.id)
433-
434-
scids = SerializedChunkIDs(
435-
chunk_id_list=chunk_ids,
436-
archive_id=self.archive_id,
437-
size=data_model.get_size(),
438-
)
439-
scids._data_registry = self.kiara_context.data_registry
440-
chunk_id_map[key] = scids
441-
442-
pers_value = PersistedData(
443-
archive_id=self.archive_id,
444-
chunk_id_map=chunk_id_map,
445-
data_type=serialized_value.data_type,
446-
data_type_config=serialized_value.data_type_config,
447-
serialization_profile=serialized_value.serialization_profile,
448-
metadata=serialized_value.metadata,
449-
)
450-
451-
return pers_value
406+
# def _persist_value_data(self, value: Value) -> PersistedData:
407+
#
408+
# serialized_value: SerializedData = value.serialized_data
409+
#
410+
# chunk_id_map = {}
411+
# for key in serialized_value.get_keys():
412+
#
413+
# data_model = serialized_value.get_serialized_data(key)
414+
#
415+
# if data_model.type == "chunk": # type: ignore
416+
# chunks: Iterable[Union[str, BytesIO]] = [BytesIO(data_model.chunk)] # type: ignore
417+
# elif data_model.type == "chunks": # type: ignore
418+
# chunks = (BytesIO(c) for c in data_model.chunks) # type: ignore
419+
# elif data_model.type == "file": # type: ignore
420+
# chunks = [data_model.file] # type: ignore
421+
# elif data_model.type == "files": # type: ignore
422+
# chunks = data_model.files # type: ignore
423+
# elif data_model.type == "inline-json": # type: ignore
424+
# chunks = [BytesIO(data_model.as_json())] # type: ignore
425+
# else:
426+
# raise Exception(
427+
# f"Invalid serialized data type: {type(data_model)}. Available types: {', '.join(SERIALIZE_TYPES)}"
428+
# )
429+
#
430+
# chunk_ids = []
431+
# for item in zip(serialized_value.get_cids_for_key(key), chunks):
432+
# cid = item[0]
433+
# _chunk = item[1]
434+
# addr: HashAddress = self.hashfs.put_with_precomputed_hash(
435+
# _chunk, str(cid)
436+
# )
437+
# chunk_ids.append(addr.id)
438+
#
439+
# scids = SerializedChunkIDs(
440+
# chunk_id_list=chunk_ids,
441+
# archive_id=self.archive_id,
442+
# size=data_model.get_size(),
443+
# )
444+
# scids._data_registry = self.kiara_context.data_registry
445+
# chunk_id_map[key] = scids
446+
#
447+
# pers_value = PersistedData(
448+
# archive_id=self.archive_id,
449+
# chunk_id_map=chunk_id_map,
450+
# data_type=serialized_value.data_type,
451+
# data_type_config=serialized_value.data_type_config,
452+
# serialization_profile=serialized_value.serialization_profile,
453+
# metadata=serialized_value.metadata,
454+
# )
455+
#
456+
# return pers_value
452457

453458
def _persist_value_pedigree(self, value: Value):
454459

0 commit comments

Comments
 (0)