Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RocksDB Python API #991

Closed
wants to merge 10 commits into from
8 changes: 8 additions & 0 deletions cpp/arcticdb/storage/library_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ namespace arcticdb::storage {

[[nodiscard]] bool has_library(const LibraryPath& path) const;

[[nodiscard]] static inline bool rocksdb_support() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just hang thing off the module like here

storage.attr("CONFIG_LIBRARY_NAME") = py::str(arcticdb::storage::CONFIG_LIBRARY_NAME);
can't we, no need for this to be tied to the library_manager?

#ifdef ARCTICDB_INCLUDE_ROCKSDB
return true;
#else
return false;
#endif
}

private:
[[nodiscard]] arcticdb::proto::storage::LibraryConfig get_config_internal(const LibraryPath& path, const StorageOverride& storage_override) const;

Expand Down
13 changes: 1 addition & 12 deletions cpp/arcticdb/storage/lmdb/lmdb_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,19 +348,8 @@ void LmdbStorage::warn_if_lmdb_already_open(const fs::path &root_path, const std
}
}

LmdbStorage::LmdbStorage(LmdbStorage&& other) noexcept
: Storage(std::move(static_cast<Storage&>(other))),
write_mutex_(std::move(other.write_mutex_)),
env_(std::move(other.env_)),
dbi_by_key_type_(std::move(other.dbi_by_key_type_)),
lib_dir_(std::move(other.lib_dir_)) {
other.lib_dir_ = "";
}

LmdbStorage::~LmdbStorage() {
if (!lib_dir_.empty()) {
--times_path_opened[lib_dir_.string()];
}
--times_path_opened[lib_dir_.string()];
}

void LmdbStorage::reset_warning_counter() {
Expand Down
1 change: 0 additions & 1 deletion cpp/arcticdb/storage/lmdb/lmdb_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class LmdbStorage final : public Storage {
static void reset_warning_counter();

LmdbStorage(const LibraryPath &lib, OpenMode mode, const Config &conf);
LmdbStorage(LmdbStorage&& other) noexcept;
~LmdbStorage() override;

private:
Expand Down
8 changes: 7 additions & 1 deletion cpp/arcticdb/storage/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@

void register_bindings(py::module& storage, py::exception<arcticdb::ArcticException>& base_exception) {
storage.attr("CONFIG_LIBRARY_NAME") = py::str(arcticdb::storage::CONFIG_LIBRARY_NAME);
storage.attr("ROCKSDB_SUPPORT", [](py::object /* self */){ return LibraryManager::rocksdb_support(); });

Check failure on line 38 in cpp/arcticdb/storage/python_bindings.cpp

View workflow job for this annotation

GitHub Actions / Windows Compile / compile (windows, windows-latest, windows-cl, win_amd64, C:/cpp_build, C:/vcpkg_packages, *.pdb, ...

'pybind11::detail::object_api<pybind11::handle>::attr': no overloaded function takes 2 arguments
#ifdef ARCTICDB_INCLUDE_ROCKSDB
return true;

Check failure on line 40 in cpp/arcticdb/storage/python_bindings.cpp

View workflow job for this annotation

GitHub Actions / Windows Compile / compile (windows, windows-latest, windows-cl, win_amd64, C:/cpp_build, C:/vcpkg_packages, *.pdb, ...

'arcticdb::storage::apy::register_bindings': 'void' function returning a value
#else
return false;
#endif

py::enum_<KeyType>(storage, "KeyType")
.value("STREAM_GROUP", KeyType::STREAM_GROUP)
Expand Down Expand Up @@ -167,9 +173,9 @@
res.push_back(lp.to_delim_path());
}
return res;
});
})

py::class_<LibraryIndex, std::shared_ptr<LibraryIndex>>(storage, "LibraryIndex")

Check failure on line 178 in cpp/arcticdb/storage/python_bindings.cpp

View workflow job for this annotation

GitHub Actions / Windows Compile / compile (windows, windows-latest, windows-cl, win_amd64, C:/cpp_build, C:/vcpkg_packages, *.pdb, ...

syntax error: missing ';' before 'pybind11::class_<arcticdb::storage::LibraryIndex,std::shared_ptr<arcticdb::storage::LibraryIndex>>'
.def(py::init<>([](const std::string &environment_name) {
auto resolver = std::make_shared<details::InMemoryConfigResolver>();
return std::make_unique<LibraryIndex>(EnvironmentName{environment_name}, resolver);
Expand Down
38 changes: 30 additions & 8 deletions cpp/arcticdb/storage/rocksdb/rocksdb_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <rocksdb/options.h>
#include <rocksdb/utilities/options_util.h>
#include <rocksdb/slice.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/table.h>

namespace arcticdb::storage::rocksdb {

Expand Down Expand Up @@ -68,6 +70,13 @@ RocksDBStorage::RocksDBStorage(const LibraryPath &library_path, OpenMode mode, c
fs::create_directories(lib_dir);
db_options.create_if_missing = true;
db_options.create_missing_column_families = true;
db_options.IncreaseParallelism(); // TODO: add a method to task_scheduler.hpp to return the nubmer of IOThreads configured there, and use that rather than the default 16.
for (auto& desc: column_families) {
desc.options.OptimizeLevelStyleCompaction();
::rocksdb::BlockBasedTableOptions table_options;
table_options.filter_policy.reset(::rocksdb::NewBloomFilterPolicy(10));
desc.options.table_factory.reset(::rocksdb::NewBlockBasedTableFactory(table_options));
}
} else {
util::raise_rte(DEFAULT_ROCKSDB_NOT_OK_ERROR + s.ToString());
}
Expand All @@ -85,14 +94,18 @@ RocksDBStorage::RocksDBStorage(const LibraryPath &library_path, OpenMode mode, c
}

RocksDBStorage::~RocksDBStorage() {
for (const auto& [key_type_name, handle]: handles_by_key_type_) {
auto s = db_->DestroyColumnFamilyHandle(handle);
if (db_ == nullptr) {
util::check(handles_by_key_type_.empty(), "Handles not empty but db_ is nullptr");
} else {
for (const auto &[key_type_name, handle]: handles_by_key_type_) {
auto s = db_->DestroyColumnFamilyHandle(handle);
util::check(s.ok(), DEFAULT_ROCKSDB_NOT_OK_ERROR + s.ToString());
}
handles_by_key_type_.clear();
auto s = db_->Close();
util::check(s.ok(), DEFAULT_ROCKSDB_NOT_OK_ERROR + s.ToString());
delete db_;
}
handles_by_key_type_.clear();
auto s = db_->Close();
util::check(s.ok(), DEFAULT_ROCKSDB_NOT_OK_ERROR + s.ToString());
delete db_;
}

void RocksDBStorage::do_write(Composite<KeySegmentPair>&& kvs) {
Expand All @@ -118,17 +131,26 @@ void RocksDBStorage::do_read(Composite<VariantKey>&& ks, const ReadVisitor& visi
ARCTICDB_SAMPLE(RocksDBStorageRead, 0)
auto grouper = [](auto &&k) { return variant_key_type(k); };

std::vector<VariantKey> failed_reads;
(fg::from(ks.as_range()) | fg::move | fg::groupBy(grouper)).foreach([&](auto &&group) {
auto key_type_name = fmt::format("{}", group.key());
auto handle = handles_by_key_type_.at(key_type_name);
for (const auto &k : group.values()) {
std::string k_str = to_serialized_key(k);
std::string value;
// TODO: Once PR: 975 has been merged we can use ::rocksdb::PinnableSlice to avoid the copy in
// the consturction of the segment
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// the consturction of the segment
// the construction of the segment

auto s = db_->Get(::rocksdb::ReadOptions(), handle, ::rocksdb::Slice(k_str), &value);
util::check(s.ok(), DEFAULT_ROCKSDB_NOT_OK_ERROR + s.ToString());
visitor(k, Segment::from_bytes(reinterpret_cast<uint8_t*>(value.data()), value.size()));
if (s.IsNotFound()) {
failed_reads.push_back(k);
} else {
util::check(s.ok(), DEFAULT_ROCKSDB_NOT_OK_ERROR + s.ToString());
visitor(k, Segment::from_bytes(reinterpret_cast<uint8_t*>(value.data()), value.size(), true));
}
}
});
if(!failed_reads.empty())
throw KeyNotFoundException(Composite<VariantKey>(std::move(failed_reads)));
}

bool RocksDBStorage::do_key_exists(const VariantKey& key) {
Expand Down
18 changes: 9 additions & 9 deletions cpp/arcticdb/storage/rocksdb/rocksdb_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,25 @@ namespace arcticdb::storage::rocksdb {
~RocksDBStorage() override;

private:
void do_write(Composite<KeySegmentPair>&& kvs) final override;
void do_write(Composite<KeySegmentPair>&& kvs) final;

void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) final override;
void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) final;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final override;
void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final;

void do_remove(Composite<VariantKey>&& ks, RemoveOpts opts) final override;
void do_remove(Composite<VariantKey>&& ks, RemoveOpts opts) final;

bool do_key_exists(const VariantKey& key) final override;
bool do_key_exists(const VariantKey& key) final;

std::string do_key_path(const VariantKey&) const final override { return {}; };
std::string do_key_path(const VariantKey&) const final { return {}; };

bool do_supports_prefix_matching() const final override {
bool do_supports_prefix_matching() const final {
return false;
}

inline bool do_fast_delete() final override;
inline bool do_fast_delete() final;

void do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string & prefix) final override;
void do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string & prefix) final;

// The _internal methods remove code duplication across update, write, and read methods.
void do_write_internal(Composite<KeySegmentPair>&& kvs);
Expand Down
5 changes: 1 addition & 4 deletions cpp/arcticdb/storage/storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,7 @@ class Storage {
mode_(mode) {}

virtual ~Storage() = default;
Storage(const Storage&) = delete;
Storage& operator=(const Storage&) = delete;
Storage(Storage&&) = default;
Storage& operator=(Storage&&) = delete;
ARCTICDB_NO_MOVE_OR_COPY(Storage)

void write(Composite<KeySegmentPair> &&kvs) {
ARCTICDB_SAMPLE(StorageWrite, 0)
Expand Down
37 changes: 18 additions & 19 deletions cpp/arcticdb/storage/storage_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
#include <arcticdb/storage/storage_factory.hpp>
#include <arcticdb/storage/lmdb/lmdb_storage.hpp>
#include <arcticdb/storage/memory/memory_storage.hpp>
#ifdef ARCTICDB_INCLUDE_ROCKSDB
#include <arcticdb/storage/rocksdb/rocksdb_storage.hpp>
#endif
#include <arcticdb/storage/mongo/mongo_storage.hpp>
#include <arcticdb/storage/azure/azure_storage.hpp>
#include <arcticdb/storage/s3/s3_storage.hpp>
Expand All @@ -27,39 +30,35 @@ std::unique_ptr<Storage> create_storage(
if (type_name == s3::S3Storage::Config::descriptor()->full_name()) {
s3::S3Storage::Config s3_config;
storage_descriptor.config().UnpackTo(&s3_config);
storage = std::make_unique<s3::S3Storage>(
s3::S3Storage(library_path, mode, s3_config)
);
storage = std::make_unique<s3::S3Storage>(library_path, mode, s3_config);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah wow, nice cleanup, much better.

} else if (type_name == lmdb::LmdbStorage::Config::descriptor()->full_name()) {
lmdb::LmdbStorage::Config lmbd_config;
storage_descriptor.config().UnpackTo(&lmbd_config);
storage = std::make_unique<lmdb::LmdbStorage>(
lmdb::LmdbStorage(library_path, mode, lmbd_config)
);
storage = std::make_unique<lmdb::LmdbStorage>(library_path, mode, lmbd_config);
} else if (type_name == mongo::MongoStorage::Config::descriptor()->full_name()) {
mongo::MongoStorage::Config mongo_config;
storage_descriptor.config().UnpackTo(&mongo_config);
storage = std::make_unique<mongo::MongoStorage>(
mongo::MongoStorage(library_path, mode, mongo_config)
);
storage = std::make_unique<mongo::MongoStorage>(library_path, mode, mongo_config);
} else if (type_name == memory::MemoryStorage::Config::descriptor()->full_name()) {
memory::MemoryStorage::Config memory_config;
storage_descriptor.config().UnpackTo(&memory_config);
storage = std::make_unique<memory::MemoryStorage>(
memory::MemoryStorage(library_path, mode, memory_config)
);
} else if (type_name == nfs_backed::NfsBackedStorage::Config::descriptor()->full_name()) {
storage = std::make_unique<memory::MemoryStorage>(library_path, mode, memory_config);
}
#ifdef ARCTICDB_INCLUDE_ROCKSDB
else if (type_name == rocksdb::RocksDBStorage::Config::descriptor()->full_name()) {
rocksdb::RocksDBStorage::Config rocksdb_config;
storage_descriptor.config().UnpackTo(&rocksdb_config);
storage = std::make_unique<rocksdb::RocksDBStorage>(library_path, mode, rocksdb_config);
}
#endif
else if (type_name == nfs_backed::NfsBackedStorage::Config::descriptor()->full_name()) {
nfs_backed::NfsBackedStorage::Config nfs_backed_config;
storage_descriptor.config().UnpackTo(&nfs_backed_config);
storage = std::make_unique<nfs_backed::NfsBackedStorage>(
nfs_backed::NfsBackedStorage(library_path, mode, nfs_backed_config)
);
storage = std::make_unique<nfs_backed::NfsBackedStorage>(library_path, mode, nfs_backed_config);
} else if (type_name == azure::AzureStorage::Config::descriptor()->full_name()) {
azure::AzureStorage::Config azure_config;
storage_descriptor.config().UnpackTo(&azure_config);
storage = std::make_unique<azure::AzureStorage >(
azure::AzureStorage(library_path, mode, azure_config)
);
storage = std::make_unique<azure::AzureStorage>(library_path, mode, azure_config);
} else
throw std::runtime_error(fmt::format("Unknown config type {}", type_name));

Expand Down
78 changes: 78 additions & 0 deletions python/arcticdb/adapters/rocksdb_library_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
Copyright 2023 Man Group Operations Limited

Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.

As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""
import re
import os

# from dataclasses import dataclass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm



from arcticdb.options import LibraryOptions
from arcticc.pb2.storage_pb2 import EnvironmentConfigsMap, LibraryConfig
from arcticdb.version_store.helper import add_rocksdb_library_to_env
from arcticdb.config import _DEFAULT_ENV
from arcticdb.version_store._store import NativeVersionStore
from arcticdb.adapters.arctic_library_adapter import ArcticLibraryAdapter, set_library_options
from arcticdb.encoding_version import EncodingVersion
from arcticdb_ext.storage import CONFIG_LIBRARY_NAME


class RocksDBLibraryAdapter(ArcticLibraryAdapter):
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document in here that this is a work in progress, beta API and should not be relied upon?
And also log some warnings if anyone does actually use the RocksDB adapter so they see at runtime we're not making any promises about it.

Connect to a RocksDB backend.

Only supports the URI ``"rocksdb://"``. TODO: Complete this
"""

REGEX = r"rocksdb://(?P<path>[^?]*)$"

@staticmethod
def supports_uri(uri: str) -> bool:
return uri.startswith("rocksdb://")

def __init__(self, uri: str, encoding_version: EncodingVersion, *args, **kwargs):
match = re.match(self.REGEX, uri)
match_groups = match.groupdict()

self._path = os.path.abspath(match_groups["path"])
self._encoding_version = encoding_version

os.makedirs(self._path, exist_ok=True)

super().__init__(uri, self._encoding_version)

def __repr__(self):
return "ROCKSDB()"

@property
def config_library(self):
env_cfg = EnvironmentConfigsMap()

add_rocksdb_library_to_env(env_cfg, lib_name=CONFIG_LIBRARY_NAME, env_name=_DEFAULT_ENV, db_dir=self._path)

lib = NativeVersionStore.create_store_from_config(
env_cfg, _DEFAULT_ENV, CONFIG_LIBRARY_NAME, encoding_version=self._encoding_version
)

return lib._library

def get_library_config(self, name, library_options: LibraryOptions):
env_cfg = EnvironmentConfigsMap()

add_rocksdb_library_to_env(env_cfg, lib_name=name, env_name=_DEFAULT_ENV, db_dir=self._path)

library_options.encoding_version = (
library_options.encoding_version if library_options.encoding_version is not None else self._encoding_version
)
set_library_options(env_cfg.env_by_id[_DEFAULT_ENV].lib_by_path[name], library_options)

return NativeVersionStore.create_library_config(
env_cfg, _DEFAULT_ENV, name, encoding_version=library_options.encoding_version
)

# TODO: def cleanup_library should do something similar to LMDB
# See PR: 918
3 changes: 3 additions & 0 deletions python/arcticdb/arctic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from arcticdb.adapters.azure_library_adapter import AzureLibraryAdapter
from arcticdb.adapters.mongo_library_adapter import MongoLibraryAdapter
from arcticdb.adapters.in_memory_library_adapter import InMemoryLibraryAdapter
from arcticdb.adapters.rocksdb_library_adapter import RocksDBLibraryAdapter
from arcticdb.encoding_version import EncodingVersion


Expand All @@ -33,6 +34,8 @@ class Arctic:
MongoLibraryAdapter,
InMemoryLibraryAdapter,
]
if LibraryManager.rocksdb_support:
_LIBRARY_ADAPTERS.append(RocksDBLibraryAdapter)

def __init__(self, uri: str, encoding_version: EncodingVersion = DEFAULT_ENCODING_VERSION):
"""
Expand Down
14 changes: 13 additions & 1 deletion python/arcticdb/version_store/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from arcticc.pb2.s3_storage_pb2 import Config as S3Config
from arcticc.pb2.azure_storage_pb2 import Config as AzureConfig
from arcticc.pb2.in_memory_storage_pb2 import Config as MemoryConfig
from arcticc.pb2.rocksdb_storage_pb2 import Config as RocksDBConfig
from arcticc.pb2.mongo_storage_pb2 import Config as MongoConfig
from arcticc.pb2.nfs_backed_storage_pb2 import Config as NfsConfig
from arcticc.pb2.storage_pb2 import (
EnvironmentConfigsMap,
EnvironmentConfig,
Expand Down Expand Up @@ -190,6 +190,18 @@ def add_memory_library_to_env(cfg, lib_name, env_name, description=None):
_add_lib_desc_to_env(env, lib_name, sid, description)


def add_rocksdb_library_to_env(cfg, lib_name, env_name, db_dir=Defaults.DATA_DIR, description=None, rocksdb_config={}):
env = cfg.env_by_id[env_name]
rocksdb = RocksDBConfig()
rocksdb.path = db_dir
for k, v in rocksdb_config.items():
setattr(rocksdb, k, v)

sid, storage = get_storage_for_lib_name(lib_name, env)
storage.config.Pack(rocksdb, type_url_prefix="cxx.arctic.org")
_add_lib_desc_to_env(env, lib_name, sid, description)


def get_mongo_proto(cfg, lib_name, env_name, uri):
env = cfg.env_by_id[env_name]
mongo = MongoConfig()
Expand Down
Loading
Loading