Skip to content

Commit

Permalink
feat: data compression support for sqilte store
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Jan 30, 2024
1 parent 7745b7b commit 6a5ea9e
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 13 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ dependencies = [
"humanfriendly>=10.0",
"importlib-metadata>=3.0.0.0,<8.0.0",
"jinja2>=3.0.1",
"lz4>=4.3.0",
"patool>=1.12",
"mistune>=3.0.0",
"mmhash3>=3.0.1",
Expand All @@ -92,6 +93,7 @@ dependencies = [
"stevedore>=5.0.0,<6.0.0",
"structlog>=21.5.0",
"tzlocal>=2.1,<6.0",
"zstandard>=0.22.0"
]
dynamic = ["version"]

Expand Down
20 changes: 20 additions & 0 deletions src/kiara/registries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import abc
import os
import uuid
from enum import Enum
from pathlib import Path
from typing import (
TYPE_CHECKING,
Expand All @@ -26,6 +27,10 @@

from kiara.utils import log_message

try:
from typing import Literal # type: ignore
except ImportError:
from typing_extensions import Literal # type: ignore
try:
from typing import Self # type: ignore
except ImportError:
Expand Down Expand Up @@ -372,3 +377,18 @@ def create_new_store_config(
sqlite_db_path: str = Field(
description="The path where the data for this archive is stored."
)


class CHUNK_COMPRESSION_TYPE(Enum):
NONE = 0
ZSTD = 1
LZMA = 2
LZ4 = 3


class SqliteDataStoreConfig(SqliteArchiveConfig):

default_compression_type: Literal["none", "lz4", "zstd"] = Field(
description="The default compression type to use for data in this store.",
default="zstd",
)
2 changes: 1 addition & 1 deletion src/kiara/registries/data/data_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ def _persist_value_data(self, value: Value) -> PersistedData:
scids._data_registry = self.kiara_context.data_registry
chunk_id_map[key] = scids

print("chunks_to_persist")
# print("chunks_to_persist")
# print(chunks_to_persist)

pers_value = PersistedData(
Expand Down
88 changes: 76 additions & 12 deletions src/kiara/registries/data/data_store/sqlite_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,27 @@
import uuid
from io import BytesIO
from pathlib import Path
from typing import Any, Iterable, Iterator, Mapping, Set, Tuple, Union
from typing import Any, Generic, Iterable, Iterator, Mapping, Set, Tuple, Union

from orjson import orjson
from sqlalchemy import Engine, create_engine, text

from kiara.defaults import kiara_app_dirs
from kiara.models.values.value import PersistedData, Value
from kiara.registries import SqliteArchiveConfig
from kiara.registries import (
ARCHIVE_CONFIG_CLS,
CHUNK_COMPRESSION_TYPE,
SqliteArchiveConfig,
SqliteDataStoreConfig,
)
from kiara.registries.data import DataArchive
from kiara.registries.data.data_store import BaseDataStore
from kiara.utils.hashfs import shard
from kiara.utils.json import orjson_dumps
from kiara.utils.windows import fix_windows_longpath


class SqliteDataArchive(DataArchive[SqliteArchiveConfig]):
class SqliteDataArchive(DataArchive[SqliteArchiveConfig], Generic[ARCHIVE_CONFIG_CLS]):

_archive_type_name = "sqlite_data_archive"
_config_cls = SqliteArchiveConfig
Expand Down Expand Up @@ -62,7 +67,7 @@ def _load_store_config(
def __init__(
self,
archive_alias: str,
archive_config: SqliteArchiveConfig,
archive_config: ARCHIVE_CONFIG_CLS,
force_read_only: bool = False,
):

Expand Down Expand Up @@ -159,7 +164,7 @@ def sqlite_engine(self) -> "Engine":
CREATE TABLE IF NOT EXISTS values_data (
chunk_id TEXT PRIMARY KEY,
chunk_data BLOB NOT NULL,
compression_type TEXT NULL
compression_type INTEGER NULL
);
CREATE TABLE IF NOT EXISTS values_pedigree (
value_id TEXT NOT NULL PRIMARY KEY,
Expand Down Expand Up @@ -279,34 +284,58 @@ def retrieve_chunk(
symlink_ok: bool = True,
) -> Union[bytes, str]:

import lzma

import lz4.frame
from zstandard import ZstdDecompressor

dctx = ZstdDecompressor()

if as_file:
chunk_path = self.get_chunk_path(chunk_id)

if chunk_path.exists():
return chunk_path.as_posix()

sql = text("SELECT chunk_data FROM values_data WHERE chunk_id = :chunk_id")
sql = text(
"SELECT chunk_data, compression_type FROM values_data WHERE chunk_id = :chunk_id"
)
params = {"chunk_id": chunk_id}
with self.sqlite_engine.connect() as conn:
cursor = conn.execute(sql, params)
result_bytes = cursor.fetchone()

chunk_data = result_bytes[0]
compression_type = result_bytes[1]
if compression_type not in (None, 0):
if CHUNK_COMPRESSION_TYPE(compression_type) == CHUNK_COMPRESSION_TYPE.ZSTD:
chunk_data = dctx.decompress(chunk_data)
elif (
CHUNK_COMPRESSION_TYPE(compression_type) == CHUNK_COMPRESSION_TYPE.LZMA
):
chunk_data = lzma.decompress(chunk_data)
elif CHUNK_COMPRESSION_TYPE(compression_type) == CHUNK_COMPRESSION_TYPE.LZ4:
chunk_data = lz4.frame.decompress(chunk_data)
else:
raise ValueError(f"Unsupported compression type: {compression_type}")

if not as_file:
return result_bytes[0]
return chunk_data

chunk_path.parent.mkdir(parents=True, exist_ok=True, mode=0o700)
with open(chunk_path, "wb") as file:
file.write(result_bytes[0])
file.write(chunk_data)

return chunk_path.as_posix()

def _delete_archive(self):
os.unlink(self.sqlite_path)


class SqliteDataStore(SqliteDataArchive, BaseDataStore):
class SqliteDataStore(SqliteDataArchive[SqliteDataStoreConfig], BaseDataStore):

_archive_type_name = "sqlite_data_store"
_config_cls = SqliteDataStoreConfig

@classmethod
def _load_store_config(
Expand Down Expand Up @@ -378,7 +407,12 @@ def _persist_chunks(self, chunks: Iterator[Tuple[str, Union[str, BytesIO]]]):

def _persist_chunk(self, chunk_id: str, chunk: Union[str, BytesIO]):

# print(f"sqlite: persisting chunk {chunk_id}")
import lzma

import lz4.frame
from zstandard import ZstdCompressor

cctx = ZstdCompressor()

sql = text(
"SELECT EXISTS(SELECT 1 FROM values_data WHERE chunk_id = :chunk_id)"
Expand All @@ -395,11 +429,41 @@ def _persist_chunk(self, chunk_id: str, chunk: Union[str, BytesIO]):
else:
bytes_io = chunk

compression_type = CHUNK_COMPRESSION_TYPE[
self.config.default_compression_type.upper()
]

if compression_type == CHUNK_COMPRESSION_TYPE.NONE:
final_bytes = bytes_io.getvalue()
elif compression_type == CHUNK_COMPRESSION_TYPE.ZSTD:
bytes_io.seek(0)
data = bytes_io.read()
final_bytes = cctx.compress(data)
elif compression_type == CHUNK_COMPRESSION_TYPE.LZMA:
final_bytes = lzma.compress(bytes_io.getvalue())
elif compression_type == CHUNK_COMPRESSION_TYPE.LZ4:
bytes_io.seek(0)
data = bytes_io.read()
final_bytes = lz4.frame.compress(data)
else:
raise ValueError(
f"Unsupported compression type: {self.config.default_compression_type}"
)

compression_type_value = (
compression_type.value
if compression_type is not CHUNK_COMPRESSION_TYPE.NONE
else None
)
sql = text(
"INSERT INTO values_data (chunk_id, chunk_data) VALUES (:chunk_id, :chunk_data)"
"INSERT INTO values_data (chunk_id, chunk_data, compression_type) VALUES (:chunk_id, :chunk_data, :compression_type)"
)
with self.sqlite_engine.connect() as conn:
params = {"chunk_id": chunk_id, "chunk_data": bytes_io.getvalue()}
params = {
"chunk_id": chunk_id,
"chunk_data": final_bytes,
"compression_type": compression_type_value,
}

conn.execute(sql, params)
conn.commit()
Expand Down

0 comments on commit 6a5ea9e

Please sign in to comment.