diff --git a/cpp/arcticdb/util/storage_lock.hpp b/cpp/arcticdb/util/storage_lock.hpp index 80258305b7..7fdd45a443 100644 --- a/cpp/arcticdb/util/storage_lock.hpp +++ b/cpp/arcticdb/util/storage_lock.hpp @@ -116,7 +116,7 @@ class StorageLock { OnExit x{[that=this] () { that->mutex_.unlock(); }}; - if(!ref_key_exists(store) || ttl_expired(store)) { + if(!ref_key_exists(store) || !ttl_not_expired(store)) { ts_= create_ref_key(store); auto lock_sleep = ConfigsMap::instance()->get_int("StorageLock.WaitMs", 200); std::this_thread::sleep_for(std::chrono::milliseconds(lock_sleep)); @@ -154,13 +154,14 @@ class StorageLock { sleep_ms(wait_ms); total_wait += wait_ms; wait_ms *= 2; - if (ttl_expired(store)) + auto read_ts = ttl_not_expired(store); + if (!read_ts) break; if (timeout_ms && total_wait > *timeout_ms) { ts_ = 0; log::lock().info("Lock timed out, giving up after {}", wait_ms); mutex_.unlock(); - throw StorageLockTimeout{fmt::format("Storage lock {} timeout out after {} ms", name_, total_wait)}; + throw StorageLockTimeout{fmt::format("Storage lock {} timeout out after {} ms. Lock held since {} (UTC)", name_, total_wait, date_and_time(*read_ts))}; } } ts_ = create_ref_key(store); @@ -224,17 +225,18 @@ class StorageLock { } } - bool ttl_expired(const std::shared_ptr& store) { - if (auto read_ts = read_timestamp(store); read_ts) { + std::optional ttl_not_expired(const std::shared_ptr& store) { + auto read_ts = read_timestamp(store); + if (read_ts) { // check TTL auto ttl = ConfigsMap::instance()->get_int("StorageLock.TTL", DEFAULT_TTL_INTERVAL); if (ClockType::coarse_nanos_since_epoch() - *read_ts > ttl) { log::lock().warn("StorageLock {} taken for more than TTL (default 1 day). Force releasing", name_); force_release_lock(name_, store); - return true; + return std::nullopt; } } - return false; + return read_ts; } }; diff --git a/cpp/arcticdb/util/timer.hpp b/cpp/arcticdb/util/timer.hpp index 66da249d89..82aa7dd280 100644 --- a/cpp/arcticdb/util/timer.hpp +++ b/cpp/arcticdb/util/timer.hpp @@ -20,6 +20,7 @@ #include #include #include +#include namespace arcticdb { @@ -293,5 +294,12 @@ arcticdb::ScopedTimerTotal timer2{#name, [&data](auto totals) { \ std::copy(std::begin(totals), std::end(totals), std::back_inserter(data)); \ }}; +inline std::string date_and_time(int64_t ts) { + std::time_t seconds_since_epoch = ts / BILLION; + std::stringstream ss; + ss << std::put_time(std::gmtime(&seconds_since_epoch), "%F %T"); + return ss.str(); +} + } //namespace arcticdb diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 1d98148f91..1b75b7cfe1 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -203,6 +203,11 @@ std::set LocalVersionedEngine::list_streams_internal( return res; } +size_t LocalVersionedEngine::compact_symbol_list_internal() { + ARCTICDB_SAMPLE(CompactSymbolListInternal, 0) + return symbol_list().compact(store()); +} + std::string LocalVersionedEngine::dump_versions(const StreamId& stream_id) { return version_map()->dump_entry(store(), stream_id); } diff --git a/cpp/arcticdb/version/local_versioned_engine.hpp b/cpp/arcticdb/version/local_versioned_engine.hpp index 0eb44673aa..69a3f29801 100644 --- a/cpp/arcticdb/version/local_versioned_engine.hpp +++ b/cpp/arcticdb/version/local_versioned_engine.hpp @@ -187,6 +187,8 @@ class LocalVersionedEngine : public VersionedEngine { const std::optional& opt_all_symbols ) override; + size_t compact_symbol_list_internal() override; + VersionedItem write_versioned_dataframe_internal( const StreamId& stream_id, const std::shared_ptr& frame, diff --git a/cpp/arcticdb/version/python_bindings.cpp b/cpp/arcticdb/version/python_bindings.cpp index 3610333f44..738d289666 100644 --- a/cpp/arcticdb/version/python_bindings.cpp +++ b/cpp/arcticdb/version/python_bindings.cpp @@ -611,6 +611,9 @@ void register_bindings(py::module &version, py::exception(), "List all the stream ids that have been written") + .def("compact_symbol_list", + &PythonVersionStore::compact_symbol_list, + py::call_guard(), "Compacts the symbol list cache into a single key in the storage") .def("read_metadata", &PythonVersionStore::read_metadata, py::call_guard(), "Get back the metadata and version info for a symbol.") diff --git a/cpp/arcticdb/version/symbol_list.cpp b/cpp/arcticdb/version/symbol_list.cpp index 289882a50d..306c72e4ff 100644 --- a/cpp/arcticdb/version/symbol_list.cpp +++ b/cpp/arcticdb/version/symbol_list.cpp @@ -761,13 +761,7 @@ std::set SymbolList::load( OnExit x([&lock, &store] { lock.unlock(store); }); ARCTICDB_RUNTIME_DEBUG(log::symbol(),"Checking whether we still need to compact under lock"); - if(!has_recent_compaction(store, load_result.maybe_previous_compaction)) { - auto written = write_symbols(store, - load_result.symbols_, - compaction_id, - data_.type_holder_); - delete_keys(store, load_result.detach_symbol_list_keys(), std::get(written)); - } + compact_internal(store, load_result); } else { ARCTICDB_RUNTIME_DEBUG(log::symbol(),"Not compacting the symbol list due to lock contention"); } @@ -788,6 +782,32 @@ std::set SymbolList::load( return output; } +size_t SymbolList::compact(const std::shared_ptr& store) { + auto version_map = data_.version_map_; + LoadResult load_result = ExponentialBackoff(100, 2000) + .go([this, &version_map, &store]() { return attempt_load(version_map, store, data_); }); + auto num_symbol_list_keys = load_result.symbol_list_keys_.size(); + + ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Forcing compaction. Obtaining lock..."); + StorageLock lock{StringId{CompactionLockName}}; + lock.lock_timeout(store, 10000); + OnExit x([&lock, &store] { lock.unlock(store); }); + + ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Checking whether we still need to compact under lock"); + compact_internal(store, load_result); + return num_symbol_list_keys; +} + +void SymbolList::compact_internal(const std::shared_ptr& store, LoadResult& load_result) const { + if(!has_recent_compaction(store, load_result.maybe_previous_compaction)) { + auto written = write_symbols(store, + load_result.symbols_, + compaction_id, + data_.type_holder_); + delete_keys(store, load_result.detach_symbol_list_keys(), std::get(written)); + } +} + } //namespace arcticdb namespace std { diff --git a/cpp/arcticdb/version/symbol_list.hpp b/cpp/arcticdb/version/symbol_list.hpp index 2c7f7de874..a7face5802 100644 --- a/cpp/arcticdb/version/symbol_list.hpp +++ b/cpp/arcticdb/version/symbol_list.hpp @@ -165,6 +165,8 @@ class SymbolList { return load(data_.version_map_, store, false); } + size_t compact(const std::shared_ptr& store); + static void add_symbol(const std::shared_ptr& store, const entity::StreamId& symbol, entity::VersionId reference_id); static void remove_symbol(const std::shared_ptr& store, const entity::StreamId& symbol, entity::VersionId reference_id); @@ -172,6 +174,8 @@ class SymbolList { static void clear(const std::shared_ptr& store); private: + void compact_internal(const std::shared_ptr& store, LoadResult& load_result) const; + [[nodiscard]] bool needs_compaction(const LoadResult& load_result) const; }; diff --git a/cpp/arcticdb/version/version_store_api.cpp b/cpp/arcticdb/version/version_store_api.cpp index f4c72d5be7..3aced0cebb 100644 --- a/cpp/arcticdb/version/version_store_api.cpp +++ b/cpp/arcticdb/version/version_store_api.cpp @@ -504,6 +504,10 @@ std::set PythonVersionStore::list_streams( return list_streams_internal(snap_name, regex, prefix, opt_use_symbol_list, opt_all_symbols); } +size_t PythonVersionStore::compact_symbol_list() { + return compact_symbol_list_internal(); +} + VersionedItem PythonVersionStore::write_partitioned_dataframe( const StreamId& stream_id, const py::tuple &item, diff --git a/cpp/arcticdb/version/version_store_api.hpp b/cpp/arcticdb/version/version_store_api.hpp index ec57a1f6b5..01201b03e4 100644 --- a/cpp/arcticdb/version/version_store_api.hpp +++ b/cpp/arcticdb/version/version_store_api.hpp @@ -312,6 +312,8 @@ class PythonVersionStore : public LocalVersionedEngine { const std::optional& use_symbol_list = std::nullopt, const std::optional& all_symbols = std::nullopt); + size_t compact_symbol_list(); + void clear(const bool continue_on_error = true); bool empty(); void force_delete_symbol(const StreamId& stream_id); diff --git a/cpp/arcticdb/version/versioned_engine.hpp b/cpp/arcticdb/version/versioned_engine.hpp index 07f87061f7..657d47a583 100644 --- a/cpp/arcticdb/version/versioned_engine.hpp +++ b/cpp/arcticdb/version/versioned_engine.hpp @@ -140,6 +140,8 @@ class VersionedEngine { const std::optional& opt_all_symbols ) = 0; + virtual size_t compact_symbol_list_internal() = 0; + virtual IndexRange get_index_range( const StreamId &stream_id, const VersionQuery& version_query) = 0; diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py index d430237866..2585f46f27 100644 --- a/python/arcticdb/version_store/_store.py +++ b/python/arcticdb/version_store/_store.py @@ -2071,6 +2071,24 @@ def list_symbols( use_symbol_list = False return list(self.version_store.list_streams(snapshot, regex, prefix, use_symbol_list, all_symbols)) + def compact_symbol_list(self) -> int: + """ + Compact the symbol list cache into a single key in the storage + + Returns + ------- + The number of symbol list keys prior to compaction + + + Raises + ------ + PermissionException + Library has been opened in read-only mode + InternalException + Storage lock required to compact the symbol list could not be acquired + """ + return self.version_store.compact_symbol_list() + def list_snapshots(self, load_metadata: Optional[bool] = True) -> Dict[str, Any]: """ List the snapshots in the library. diff --git a/python/arcticdb/version_store/library.py b/python/arcticdb/version_store/library.py index 650cc71395..acee421b83 100644 --- a/python/arcticdb/version_store/library.py +++ b/python/arcticdb/version_store/library.py @@ -1706,6 +1706,24 @@ def reload_symbol_list(self): """ self._nvs.version_store.reload_symbol_list() + def compact_symbol_list(self) -> None: + """ + Compact the symbol list cache into a single key in the storage + + Returns + ------- + The number of symbol list keys prior to compaction + + + Raises + ------ + PermissionException + Library has been opened in read-only mode + InternalException + Storage lock required to compact the symbol list could not be acquired + """ + return self._nvs.compact_symbol_list() + def is_symbol_fragmented(self, symbol: str, segment_size: Optional[int] = None) -> bool: """ Check whether the number of segments that would be reduced by compaction is more than or equal to the diff --git a/python/tests/integration/arcticdb/version_store/test_symbol_list.py b/python/tests/integration/arcticdb/version_store/test_symbol_list.py index 4831f9ef65..19b097beca 100644 --- a/python/tests/integration/arcticdb/version_store/test_symbol_list.py +++ b/python/tests/integration/arcticdb/version_store/test_symbol_list.py @@ -21,12 +21,13 @@ from arcticdb_ext import set_config_int, unset_config_int from arcticdb_ext.storage import KeyType, OpenMode from arcticdb_ext.tools import CompactionId, CompactionLockName -from arcticdb_ext.exceptions import InternalException +from arcticdb_ext.exceptions import InternalException, PermissionException from multiprocessing import Pool from arcticdb_ext import set_config_int import random import string +from tests.util.mark import MACOS_CONDA_BUILD @pytest.fixture(autouse=True) @@ -390,4 +391,78 @@ def test_symbol_list_parallel_stress_with_delete( def test_symbol_list_exception_and_printout(mock_s3_store_with_mock_storage_exception): # moto is choosen just because it's easy to give storage error with pytest.raises(InternalException, match="E_S3_RETRYABLE Retry-able error"): - mock_s3_store_with_mock_storage_exception.list_symbols() \ No newline at end of file + mock_s3_store_with_mock_storage_exception.list_symbols() + + +def test_force_compact_symbol_list(lmdb_version_store_v1): + lib = lmdb_version_store_v1 + lib_tool = lib.library_tool() + # No symbol list keys + assert lib.compact_symbol_list() == 0 + symbol_list_keys = lib_tool.find_keys(KeyType.SYMBOL_LIST) + assert len(symbol_list_keys) == 1 + assert not len(lib.list_symbols()) + lib_tool.remove(symbol_list_keys[0]) + + num_syms = 1000 + syms = [f"sym_{idx:03}" for idx in range(num_syms)] + for sym in syms: + lib.write(sym, 1) + symbol_list_keys = lib_tool.find_keys(KeyType.SYMBOL_LIST) + assert len(symbol_list_keys) == num_syms + assert lib.compact_symbol_list() == num_syms + symbol_list_keys = lib_tool.find_keys(KeyType.SYMBOL_LIST) + assert len(symbol_list_keys) == 1 + assert set(lib.list_symbols()) == set(syms) + # Idempotent + assert lib.compact_symbol_list() == 1 + symbol_list_keys = lib_tool.find_keys(KeyType.SYMBOL_LIST) + assert len(symbol_list_keys) == 1 + assert set(lib.list_symbols()) == set(syms) + # Everything deleted + for sym in syms: + lib.delete(sym) + # +1 for previous compacted key + assert lib.compact_symbol_list() == num_syms + 1 + symbol_list_keys = lib_tool.find_keys(KeyType.SYMBOL_LIST) + assert len(symbol_list_keys) == 1 + assert not len(lib.list_symbols()) + + +# Using S3 because LMDB does not allow OpenMode to be changed +def test_force_compact_symbol_list_read_only(s3_version_store_v1): + lib_write = s3_version_store_v1 + lib_read_only = make_read_only(lib_write) + # No symbol list keys + with pytest.raises(PermissionException): + lib_read_only.compact_symbol_list() + # Some symbol list keys + lib_write.write("sym1", 1) + lib_write.write("sym2", 1) + with pytest.raises(PermissionException): + lib_read_only.compact_symbol_list() + # One compacted symbol list key + assert lib_write.compact_symbol_list() == 2 + with pytest.raises(PermissionException): + lib_read_only.compact_symbol_list() + + +def test_force_compact_symbol_list_lock_held(lmdb_version_store_v1): + lib = lmdb_version_store_v1 + lock = lib.version_store.get_storage_lock(CompactionLockName) + lock.lock() + with pytest.raises(InternalException): + lib.compact_symbol_list() + lock.unlock() + assert lib.compact_symbol_list() == 0 + + +@pytest.mark.skipif(MACOS_CONDA_BUILD, reason="Failing for unclear reasons") +def test_force_compact_symbol_list_lock_held_past_ttl(lmdb_version_store_v1): + # Set TTL to 5 seconds. Compact symbol list will retry for 10 seconds, so should always work + set_config_int("StorageLock.TTL", 5_000_000_000) + lib = lmdb_version_store_v1 + lock = lib.version_store.get_storage_lock(CompactionLockName) + lock.lock() + assert lib.compact_symbol_list() == 0 +