Skip to content

Commit

Permalink
Changes from PR review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
joe-iddon committed Oct 19, 2023
1 parent 137221b commit eb0d3dc
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 32 deletions.
9 changes: 8 additions & 1 deletion cpp/CMake/FindLZ4.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,14 @@ find_package_handle_standard_args(LZ4
VERSION_VAR LZ4_VERSION)

if (LZ4_FOUND)
# On conda+mac rocksdb depends on lowercase lz4 so needs this and the alias below. See PR 961.
# `lz4_FOUND` needs to be defined because:
# - Other dependencies (such as RocksDB) also resolve LZ4 using `find_package(lz4 ...)`
# - CMake's syntax is case-sensitive
#
# See:
# - https://github.com/facebook/rocksdb/blob/0836a2b26dfbbe30c15e8cebf47771917d55e760/cmake/RocksDBConfig.cmake.in#L36
# - https://github.com/facebook/rocksdb/blob/0836a2b26dfbbe30c15e8cebf47771917d55e760/cmake/modules/Findlz4.cmake#L17
# - https://github.com/man-group/ArcticDB/pull/961
set(lz4_FOUND TRUE)
set(LZ4_INCLUDE_DIRS "${LZ4_INCLUDE_DIR}")
set(LZ4_LIBRARIES "${LZ4_LIBRARY}")
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ if(${TEST})
processing/test/test_set_membership.cpp
processing/test/test_signed_unsigned_comparison.cpp
processing/test/test_type_comparison.cpp
storage/test/test_lmdb_mem_rocksdb.cpp
storage/test/test_embedded.cpp
storage/test/test_memory_storage.cpp
storage/test/test_mongo_storage.cpp
storage/test/test_s3_storage.cpp
Expand Down
36 changes: 17 additions & 19 deletions cpp/arcticdb/storage/rocksdb/rocksdb_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ RocksDBStorage::RocksDBStorage(const LibraryPath &library_path, OpenMode mode, c
std::vector<::rocksdb::ColumnFamilyDescriptor> column_families;
::rocksdb::DBOptions db_options;

::rocksdb::ConfigOptions cfg_opts; // not used
::rocksdb::ConfigOptions cfg_opts;
auto s = ::rocksdb::LoadLatestOptions(cfg_opts, db_name, &db_options, &column_families);
if (s.ok()) {
std::set<std::string> existing_key_names{};
Expand All @@ -61,7 +61,8 @@ RocksDBStorage::RocksDBStorage(const LibraryPath &library_path, OpenMode mode, c
column_families.emplace_back(::rocksdb::kDefaultColumnFamilyName, ::rocksdb::ColumnFamilyOptions());
for (const auto& key_name: key_names) {
util::check(key_name != ::rocksdb::kDefaultColumnFamilyName,
"Key name clash with mandatory default column name");
"Key name clash with mandatory default column family name: \"" +
::rocksdb::kDefaultColumnFamilyName + "\"");
column_families.emplace_back(key_name, ::rocksdb::ColumnFamilyOptions());
}
fs::create_directories(lib_dir);
Expand Down Expand Up @@ -135,12 +136,16 @@ bool RocksDBStorage::do_key_exists(const VariantKey& key) {
std::string value; // unused
auto key_type_name = fmt::format("{}", variant_key_type(key));
auto k_str = to_serialized_key(key);
auto s = db_->Get(::rocksdb::ReadOptions(), handles_by_key_type_.at(key_type_name), ::rocksdb::Slice(k_str), &value);
auto handle = handles_by_key_type_.at(key_type_name);
if (!db_->KeyMayExist(::rocksdb::ReadOptions(), handle, ::rocksdb::Slice(k_str), &value)) {
return false;
}
auto s = db_->Get(::rocksdb::ReadOptions(), handle, ::rocksdb::Slice(k_str), &value);
util::check(s.IsNotFound() || s.ok(), DEFAULT_ROCKSDB_NOT_OK_ERROR + s.ToString());
return !s.IsNotFound();
}

void RocksDBStorage::do_remove(Composite<VariantKey>&& ks, RemoveOpts opts)
{
void RocksDBStorage::do_remove(Composite<VariantKey>&& ks, RemoveOpts opts) {
ARCTICDB_SAMPLE(RocksDBStorageRemove, 0)

auto failed_deletes = do_remove_internal(std::move(ks), opts);
Expand Down Expand Up @@ -169,7 +174,7 @@ void RocksDBStorage::do_iterate_type(KeyType key_type, const IterateTypeVisitor&

auto key_type_name = fmt::format("{}", key_type);
auto handle = handles_by_key_type_.at(key_type_name);
::rocksdb::Iterator* it = db_->NewIterator(::rocksdb::ReadOptions(), handle);
auto it = std::unique_ptr<::rocksdb::Iterator>(db_->NewIterator(::rocksdb::ReadOptions(), handle));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
auto key_slice = it->key();
auto k = variant_key_from_bytes(reinterpret_cast<const uint8_t *>(key_slice.data()), key_slice.size(), key_type);
Expand All @@ -181,24 +186,20 @@ void RocksDBStorage::do_iterate_type(KeyType key_type, const IterateTypeVisitor&
}
auto s = it->status();
util::check(s.ok(), DEFAULT_ROCKSDB_NOT_OK_ERROR + s.ToString());
delete it;
}

std::vector<VariantKey> RocksDBStorage::do_remove_internal(Composite<VariantKey>&& ks, RemoveOpts opts)
{
std::vector<VariantKey> RocksDBStorage::do_remove_internal(Composite<VariantKey>&& ks, RemoveOpts opts) {
auto grouper = [](auto &&k) { return variant_key_type(k); };
std::vector<VariantKey> failed_deletes;

(fg::from(ks.as_range()) | fg::move | fg::groupBy(grouper)).foreach([&](auto &&group) {
auto key_type_name = fmt::format("{}", group.key());
// If no key of this type has been written before, this can fail
auto handle = handles_by_key_type_.at(key_type_name);
auto options = ::rocksdb::WriteOptions(); // Should this be const class attr? Used in write_internal too.
options.sync = true;
for (const auto &k : group.values()) {
if (do_key_exists(k)) {
auto k_str = to_serialized_key(k);
auto s = db_->Delete(options, handle, ::rocksdb::Slice(k_str));
auto s = db_->Delete(::rocksdb::WriteOptions(), handle, ::rocksdb::Slice(k_str));
util::check(s.ok(), DEFAULT_ROCKSDB_NOT_OK_ERROR + s.ToString());
ARCTICDB_DEBUG(log::storage(), "Deleted segment for key {}", variant_key_view(k));
} else if (!opts.ignores_missing_key_) {
Expand All @@ -210,15 +211,12 @@ std::vector<VariantKey> RocksDBStorage::do_remove_internal(Composite<VariantKey>
return failed_deletes;
}

void RocksDBStorage::do_write_internal(Composite<KeySegmentPair>&& kvs)
{
void RocksDBStorage::do_write_internal(Composite<KeySegmentPair>&& kvs) {
auto grouper = [](auto &&kv) { return kv.key_type(); };
(fg::from(kvs.as_range()) | fg::move | fg::groupBy(grouper)).foreach([&](auto &&group) {
auto key_type_name = fmt::format("{}", group.key());
auto handle = handles_by_key_type_.at(key_type_name);
for (auto &kv : group.values()) {
auto options = ::rocksdb::WriteOptions();
options.sync = true;
auto k_str = to_serialized_key(kv.variant_key());

auto& seg = kv.segment();
Expand All @@ -227,11 +225,11 @@ void RocksDBStorage::do_write_internal(Composite<KeySegmentPair>&& kvs)
std::string seg_data;
seg_data.resize(total_sz);
seg.write_to(reinterpret_cast<std::uint8_t *>(seg_data.data()), hdr_sz);
auto override = std::holds_alternative<RefKey>(kv.variant_key());
if (!override && do_key_exists(kv.variant_key())) {
auto allow_override = std::holds_alternative<RefKey>(kv.variant_key());
if (!allow_override && do_key_exists(kv.variant_key())) {
throw DuplicateKeyException(kv.variant_key());
}
auto s = db_->Put(options, handle, ::rocksdb::Slice(k_str), ::rocksdb::Slice(seg_data));
auto s = db_->Put(::rocksdb::WriteOptions(), handle, ::rocksdb::Slice(k_str), ::rocksdb::Slice(seg_data));
util::check(s.ok(), DEFAULT_ROCKSDB_NOT_OK_ERROR + s.ToString());
}
});
Expand Down
20 changes: 10 additions & 10 deletions cpp/arcticdb/storage/rocksdb/rocksdb_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,28 @@ namespace arcticdb::storage::rocksdb {
using Config = arcticdb::proto::rocksdb_storage::Config;

RocksDBStorage(const LibraryPath &lib, OpenMode mode, const Config &conf);
~RocksDBStorage();
~RocksDBStorage() override;

private:
void do_write(Composite<KeySegmentPair>&& kvs) final;
void do_write(Composite<KeySegmentPair>&& kvs) final override;

void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) final;
void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) final override;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final;
void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final override;

void do_remove(Composite<VariantKey>&& ks, RemoveOpts opts) final;
void do_remove(Composite<VariantKey>&& ks, RemoveOpts opts) final override;

bool do_key_exists(const VariantKey& key) final;
bool do_key_exists(const VariantKey& key) final override;

std::string do_key_path(const VariantKey&) const final { return {}; };
std::string do_key_path(const VariantKey&) const final override { return {}; };

bool do_supports_prefix_matching() const final {
bool do_supports_prefix_matching() const final override {
return false;
}

inline bool do_fast_delete() final;
inline bool do_fast_delete() final override;

void do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string & prefix) final;
void do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string & prefix) final override;

// The _internal methods remove code duplication across update, write, and read methods.
void do_write_internal(Composite<KeySegmentPair>&& kvs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class BackendGenerator {
arcticdb::proto::lmdb_storage::Config cfg;
fs::path db_name = "test_lmdb";
cfg.set_path((TEST_DATABASES_PATH / db_name).generic_string());
cfg.set_map_size(128ULL * (1ULL << 20) );
cfg.set_recreate_if_exists(true);

as::LibraryPath library_path{"a", "b"};
Expand Down Expand Up @@ -199,6 +200,6 @@ TEST_P(SimpleTestSuite, Strings) {

using namespace std::string_literals;
std::vector<BackendGenerator> backend_generators{"lmdb"s, "mem"s, "rocksdb"s};
INSTANTIATE_TEST_SUITE_P(TestLmdbMemRocksDBStorages, SimpleTestSuite, testing::ValuesIn(backend_generators),
INSTANTIATE_TEST_SUITE_P(TestEmbedded, SimpleTestSuite, testing::ValuesIn(backend_generators),
[](const testing::TestParamInfo<SimpleTestSuite::ParamType>& info) { return info.param.get_name(); });

0 comments on commit eb0d3dc

Please sign in to comment.