Skip to content

Commit

Permalink
Implement support for very old style symbol list entries. (#1630)
Browse files Browse the repository at this point in the history
#### Reference Issues/PRs

arcticdb-man#116

#### What does this implement or fix?

See arcticdb-man#116. Some very old symbol list keys have non zero
version numbers. This was breaking some `list_symbols` calls with
ArcticDB.

In particular, a very old style `Delete` with a non-zero version number
would always "win" over a more recent old style `Add`, which would have
a zero version number.

Co-authored-by: Alex Seaton <[email protected]>
  • Loading branch information
poodlewars and Alex Seaton committed Jun 25, 2024
1 parent fc9c754 commit 7d78797
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 15 deletions.
27 changes: 16 additions & 11 deletions cpp/arcticdb/version/symbol_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ auto warning_threshold() {
.value_or( ConfigsMap::instance()->get_int("SymbolList.MaxCompactionThreshold", 700)));
}

bool is_new_style_key(const AtomKey& key) {
return util::variant_match(key.end_index(),
[] (std::string_view str) {
return str == version_string;
},
[] (NumericIndex n) {
return n == version_identifier;
});
}

std::vector<SymbolListEntry> load_previous_from_version_keys(
const std::shared_ptr<Store>& store,
SymbolListData& data) {
Expand Down Expand Up @@ -111,7 +121,12 @@ std::vector<AtomKey> get_all_symbol_list_keys(
});

std::sort(output.begin(), output.end(), [] (const AtomKey& left, const AtomKey& right) {
return std::tie(left.start_index(), left.version_id(), left.creation_ts()) < std::tie(right.start_index(), right.version_id(), right.creation_ts());
// Some very old symbol list keys have a non-zero version number, but with different semantics to the new style,
// so ignore it. See arcticdb-man#116. Most old style symbol list keys have version ID 0 anyway.
auto left_version = is_new_style_key(left) ? left.version_id() : 0;
auto right_version = is_new_style_key(right) ? right.version_id() : 0;
return std::tie(left.start_index(), left_version, left.creation_ts())
< std::tie(right.start_index(), right_version, right.creation_ts());
});
return output;
}
Expand Down Expand Up @@ -240,16 +255,6 @@ std::vector<SymbolListEntry> read_from_storage(
return read_new_style_list_from_storage(seg);
}

bool is_new_style_key(const AtomKey& key) {
return util::variant_match(key.end_index(),
[] (std::string_view str) {
return str == version_string;
},
[] (NumericIndex n) {
return n == version_identifier;
});
}

MapType load_journal_keys(const std::vector<AtomKey>& keys) {
MapType map;
for(const auto& key : keys) {
Expand Down
17 changes: 13 additions & 4 deletions cpp/arcticdb/version/test/symbol_list_backwards_compat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ inline StreamDescriptor backwards_compat_symbol_stream_descriptor(const StreamId
)};
}

inline StreamDescriptor baackward_compat_journal_stream_descriptor(const StreamId& action, const StreamId& id) {
inline StreamDescriptor backward_compat_journal_stream_descriptor(const StreamId& action, const StreamId& id) {
return util::variant_match(id,
[&action] (const NumericId&) {
return StreamDescriptor{stream_descriptor(action, RowCountIndex(), { scalar_field(DataType::UINT64, "symbol") })};
Expand Down Expand Up @@ -138,8 +138,12 @@ folly::Future<VariantKey> backwards_compat_write(
return store->write(KeyType::SYMBOL_LIST, 0, stream_id, creation_ts, NumericIndex{0}, NumericIndex{0}, std::move(list_segment));
}

void backwards_compat_write_journal(const std::shared_ptr<Store>& store, const StreamId& symbol, std::string action) {
SegmentInMemory seg{baackward_compat_journal_stream_descriptor(action, symbol)};
// Very old internal ArcticDB clients (2021) wrote non-zero version IDs in symbol list entries. This API supports that.
void extremely_backwards_compat_write_journal(const std::shared_ptr<Store>& store,
const StreamId& symbol,
const std::string& action,
VersionId version_id) {
SegmentInMemory seg{backward_compat_journal_stream_descriptor(action, symbol)};
util::variant_match(symbol,
[&seg] (const StringId& id) {
seg.set_string(0, id);
Expand All @@ -150,14 +154,19 @@ void backwards_compat_write_journal(const std::shared_ptr<Store>& store, const S

seg.end_row();
try {
store->write_sync(KeyType::SYMBOL_LIST, 0, StreamId{ action }, IndexValue{ symbol }, IndexValue{ symbol },
store->write_sync(KeyType::SYMBOL_LIST, version_id, StreamId{ action }, IndexValue{ symbol }, IndexValue{ symbol },
std::move(seg));
} catch ([[maybe_unused]] const arcticdb::storage::DuplicateKeyException& e) {
// Both version and content hash are fixed, so collision is possible
ARCTICDB_DEBUG(log::storage(), "Symbol list DuplicateKeyException: {}", e.what());
}
}

// Internal ArcticDB clients (2021) write symbol list entries with an obsolete schema, and always with version ID 0.
void backwards_compat_write_journal(const std::shared_ptr<Store>& store, const StreamId& symbol, const std::string& action) {
extremely_backwards_compat_write_journal(store, symbol, action, 0);
}

void backwards_compat_compact(const std::shared_ptr<Store>& store, std::vector<AtomKey>&& old_keys, const BackwardsCompatCollectionType& symbols) {
auto compacted_key = backwards_compat_write(store, symbols, StreamId{std::string{CompactionId}}, timestamp(12), StringId{}).get();
delete_keys(store, std::move(old_keys), to_atom(compacted_key)).get();
Expand Down
60 changes: 60 additions & 0 deletions cpp/arcticdb/version/test/test_symbol_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,66 @@ bool all_symbols_match(
return true;
}

TEST_F(SymbolListSuite, BackwardsCompatInterleave) {
ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 1);
auto version_store = get_test_engine();
const auto& store = version_store._test_get_store();
auto version_map = std::make_shared<VersionMap>();
SymbolList symbol_list{version_map};
std::vector<StreamId> expected;

SymbolList::add_symbol(store, "s", 0);
symbol_list.get_symbols(store, false); // trigger a compaction

SymbolList::add_symbol(store, "symbol", 0);
backwards_compat_write_journal(store, "symbol", std::string{AddSymbol});
SymbolList::add_symbol(store, "symbol", 2);
backwards_compat_write_journal(store, "symbol", std::string{DeleteSymbol});
SymbolList::add_symbol(store, "symbol", 3);

auto syms = symbol_list.get_symbols(store, false);
ASSERT_EQ(syms.size(), 2);
}

TEST_F(SymbolListSuite, ExtremelyBackwardsCompatInterleavedWithSomewhatBackwardsCompat) {
ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 1);
auto version_store = get_test_engine();
const auto& store = version_store._test_get_store();
auto version_map = std::make_shared<VersionMap>();
SymbolList symbol_list{version_map};
std::vector<StreamId> expected;

SymbolList::add_symbol(store, "s", 0);
symbol_list.get_symbols(store, false); // trigger a compaction

// Very old arcticc clients wrote version numbers on the symbol list entries, which needs special handling.
extremely_backwards_compat_write_journal(store, "symbol", std::string{AddSymbol}, 0);
extremely_backwards_compat_write_journal(store, "symbol", std::string{DeleteSymbol}, 1);
backwards_compat_write_journal(store, "symbol", std::string{AddSymbol});

auto syms = symbol_list.get_symbols(store, false);
ASSERT_EQ(syms.size(), 2);
}

TEST_F(SymbolListSuite, ExtremelyBackwardsCompatInterleavedWithNewStyle) {
ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 1);
auto version_store = get_test_engine();
const auto& store = version_store._test_get_store();
auto version_map = std::make_shared<VersionMap>();
SymbolList symbol_list{version_map};
std::vector<StreamId> expected;

SymbolList::add_symbol(store, "s", 0);
symbol_list.get_symbols(store, false); // trigger a compaction

extremely_backwards_compat_write_journal(store, "symbol", std::string{AddSymbol}, 0);
extremely_backwards_compat_write_journal(store, "symbol", std::string{DeleteSymbol}, 1);
SymbolList::add_symbol(store, "symbol", 2);

auto syms = symbol_list.get_symbols(store, false);
ASSERT_EQ(syms.size(), 2);
}

TEST_F(SymbolListSuite, BackwardsCompat) {
ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 5);
auto version_store = get_test_engine();
Expand Down

0 comments on commit 7d78797

Please sign in to comment.