Skip to content

Commit

Permalink
Enhancement/1610/method to force compact symbol list (#1624)
Browse files Browse the repository at this point in the history
#### Reference Issues/PRs
Closes #1610 

Considered augmenting `list_symbols` API to make this an option, but
there are already a lot of arguments (particularly in the
`NativeVersionStore` version) that this would not be compatible with, so
added another method instead.
  • Loading branch information
alexowens90 committed Jun 21, 2024
1 parent 2a92cfe commit 1cd7d33
Show file tree
Hide file tree
Showing 13 changed files with 179 additions and 16 deletions.
16 changes: 9 additions & 7 deletions cpp/arcticdb/util/storage_lock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class StorageLock {
OnExit x{[that=this] () {
that->mutex_.unlock();
}};
if(!ref_key_exists(store) || ttl_expired(store)) {
if(!ref_key_exists(store) || !ttl_not_expired(store)) {
ts_= create_ref_key(store);
auto lock_sleep = ConfigsMap::instance()->get_int("StorageLock.WaitMs", 200);
std::this_thread::sleep_for(std::chrono::milliseconds(lock_sleep));
Expand Down Expand Up @@ -154,13 +154,14 @@ class StorageLock {
sleep_ms(wait_ms);
total_wait += wait_ms;
wait_ms *= 2;
if (ttl_expired(store))
auto read_ts = ttl_not_expired(store);
if (!read_ts)
break;
if (timeout_ms && total_wait > *timeout_ms) {
ts_ = 0;
log::lock().info("Lock timed out, giving up after {}", wait_ms);
mutex_.unlock();
throw StorageLockTimeout{fmt::format("Storage lock {} timeout out after {} ms", name_, total_wait)};
throw StorageLockTimeout{fmt::format("Storage lock {} timeout out after {} ms. Lock held since {} (UTC)", name_, total_wait, date_and_time(*read_ts))};
}
}
ts_ = create_ref_key(store);
Expand Down Expand Up @@ -224,17 +225,18 @@ class StorageLock {
}
}

bool ttl_expired(const std::shared_ptr<Store>& store) {
if (auto read_ts = read_timestamp(store); read_ts) {
std::optional<timestamp> ttl_not_expired(const std::shared_ptr<Store>& store) {
auto read_ts = read_timestamp(store);
if (read_ts) {
// check TTL
auto ttl = ConfigsMap::instance()->get_int("StorageLock.TTL", DEFAULT_TTL_INTERVAL);
if (ClockType::coarse_nanos_since_epoch() - *read_ts > ttl) {
log::lock().warn("StorageLock {} taken for more than TTL (default 1 day). Force releasing", name_);
force_release_lock(name_, store);
return true;
return std::nullopt;
}
}
return false;
return read_ts;
}
};

Expand Down
8 changes: 8 additions & 0 deletions cpp/arcticdb/util/timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <map>
#include <ctime>
#include <sstream>
#include <iomanip>

namespace arcticdb {

Expand Down Expand Up @@ -293,5 +294,12 @@ arcticdb::ScopedTimerTotal timer2{#name, [&data](auto totals) { \
std::copy(std::begin(totals), std::end(totals), std::back_inserter(data)); \
}};

inline std::string date_and_time(int64_t ts) {
std::time_t seconds_since_epoch = ts / BILLION;
std::stringstream ss;
ss << std::put_time(std::gmtime(&seconds_since_epoch), "%F %T");
return ss.str();
}


} //namespace arcticdb
5 changes: 5 additions & 0 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ std::set<StreamId> LocalVersionedEngine::list_streams_internal(
return res;
}

size_t LocalVersionedEngine::compact_symbol_list_internal() {
ARCTICDB_SAMPLE(CompactSymbolListInternal, 0)
return symbol_list().compact(store());
}

std::string LocalVersionedEngine::dump_versions(const StreamId& stream_id) {
return version_map()->dump_entry(store(), stream_id);
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/arcticdb/version/local_versioned_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ class LocalVersionedEngine : public VersionedEngine {
const std::optional<bool>& opt_all_symbols
) override;

size_t compact_symbol_list_internal() override;

VersionedItem write_versioned_dataframe_internal(
const StreamId& stream_id,
const std::shared_ptr<InputTensorFrame>& frame,
Expand Down
3 changes: 3 additions & 0 deletions cpp/arcticdb/version/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,9 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
.def("list_streams",
&PythonVersionStore::list_streams,
py::call_guard<SingleThreadMutexHolder>(), "List all the stream ids that have been written")
.def("compact_symbol_list",
&PythonVersionStore::compact_symbol_list,
py::call_guard<SingleThreadMutexHolder>(), "Compacts the symbol list cache into a single key in the storage")
.def("read_metadata",
&PythonVersionStore::read_metadata,
py::call_guard<SingleThreadMutexHolder>(), "Get back the metadata and version info for a symbol.")
Expand Down
34 changes: 27 additions & 7 deletions cpp/arcticdb/version/symbol_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -761,13 +761,7 @@ std::set<StreamId> SymbolList::load(
OnExit x([&lock, &store] { lock.unlock(store); });

ARCTICDB_RUNTIME_DEBUG(log::symbol(),"Checking whether we still need to compact under lock");
if(!has_recent_compaction(store, load_result.maybe_previous_compaction)) {
auto written = write_symbols(store,
load_result.symbols_,
compaction_id,
data_.type_holder_);
delete_keys(store, load_result.detach_symbol_list_keys(), std::get<AtomKey>(written));
}
compact_internal(store, load_result);
} else {
ARCTICDB_RUNTIME_DEBUG(log::symbol(),"Not compacting the symbol list due to lock contention");
}
Expand All @@ -788,6 +782,32 @@ std::set<StreamId> SymbolList::load(
return output;
}

size_t SymbolList::compact(const std::shared_ptr<Store>& store) {
auto version_map = data_.version_map_;
LoadResult load_result = ExponentialBackoff<StorageException>(100, 2000)
.go([this, &version_map, &store]() { return attempt_load(version_map, store, data_); });
auto num_symbol_list_keys = load_result.symbol_list_keys_.size();

ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Forcing compaction. Obtaining lock...");
StorageLock lock{StringId{CompactionLockName}};
lock.lock_timeout(store, 10000);
OnExit x([&lock, &store] { lock.unlock(store); });

ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Checking whether we still need to compact under lock");
compact_internal(store, load_result);
return num_symbol_list_keys;
}

void SymbolList::compact_internal(const std::shared_ptr<Store>& store, LoadResult& load_result) const {
if(!has_recent_compaction(store, load_result.maybe_previous_compaction)) {
auto written = write_symbols(store,
load_result.symbols_,
compaction_id,
data_.type_holder_);
delete_keys(store, load_result.detach_symbol_list_keys(), std::get<AtomKey>(written));
}
}

} //namespace arcticdb

namespace std {
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/version/symbol_list.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,17 @@ class SymbolList {
return load(data_.version_map_, store, false);
}

size_t compact(const std::shared_ptr<Store>& store);

static void add_symbol(const std::shared_ptr<Store>& store, const entity::StreamId& symbol, entity::VersionId reference_id);

static void remove_symbol(const std::shared_ptr<Store>& store, const entity::StreamId& symbol, entity::VersionId reference_id);

static void clear(const std::shared_ptr<Store>& store);

private:
void compact_internal(const std::shared_ptr<Store>& store, LoadResult& load_result) const;

[[nodiscard]] bool needs_compaction(const LoadResult& load_result) const;
};

Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/version/version_store_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,10 @@ std::set<StreamId> PythonVersionStore::list_streams(
return list_streams_internal(snap_name, regex, prefix, opt_use_symbol_list, opt_all_symbols);
}

size_t PythonVersionStore::compact_symbol_list() {
return compact_symbol_list_internal();
}

VersionedItem PythonVersionStore::write_partitioned_dataframe(
const StreamId& stream_id,
const py::tuple &item,
Expand Down
2 changes: 2 additions & 0 deletions cpp/arcticdb/version/version_store_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ class PythonVersionStore : public LocalVersionedEngine {
const std::optional<bool>& use_symbol_list = std::nullopt,
const std::optional<bool>& all_symbols = std::nullopt);

size_t compact_symbol_list();

void clear(const bool continue_on_error = true);
bool empty();
void force_delete_symbol(const StreamId& stream_id);
Expand Down
2 changes: 2 additions & 0 deletions cpp/arcticdb/version/versioned_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ class VersionedEngine {
const std::optional<bool>& opt_all_symbols
) = 0;

virtual size_t compact_symbol_list_internal() = 0;

virtual IndexRange get_index_range(
const StreamId &stream_id,
const VersionQuery& version_query) = 0;
Expand Down
18 changes: 18 additions & 0 deletions python/arcticdb/version_store/_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2071,6 +2071,24 @@ def list_symbols(
use_symbol_list = False
return list(self.version_store.list_streams(snapshot, regex, prefix, use_symbol_list, all_symbols))

def compact_symbol_list(self) -> int:
"""
Compact the symbol list cache into a single key in the storage
Returns
-------
The number of symbol list keys prior to compaction
Raises
------
PermissionException
Library has been opened in read-only mode
InternalException
Storage lock required to compact the symbol list could not be acquired
"""
return self.version_store.compact_symbol_list()

def list_snapshots(self, load_metadata: Optional[bool] = True) -> Dict[str, Any]:
"""
List the snapshots in the library.
Expand Down
18 changes: 18 additions & 0 deletions python/arcticdb/version_store/library.py
Original file line number Diff line number Diff line change
Expand Up @@ -1706,6 +1706,24 @@ def reload_symbol_list(self):
"""
self._nvs.version_store.reload_symbol_list()

def compact_symbol_list(self) -> None:
"""
Compact the symbol list cache into a single key in the storage
Returns
-------
The number of symbol list keys prior to compaction
Raises
------
PermissionException
Library has been opened in read-only mode
InternalException
Storage lock required to compact the symbol list could not be acquired
"""
return self._nvs.compact_symbol_list()

def is_symbol_fragmented(self, symbol: str, segment_size: Optional[int] = None) -> bool:
"""
Check whether the number of segments that would be reduced by compaction is more than or equal to the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
from arcticdb_ext import set_config_int, unset_config_int
from arcticdb_ext.storage import KeyType, OpenMode
from arcticdb_ext.tools import CompactionId, CompactionLockName
from arcticdb_ext.exceptions import InternalException
from arcticdb_ext.exceptions import InternalException, PermissionException

from multiprocessing import Pool
from arcticdb_ext import set_config_int
import random
import string
from tests.util.mark import MACOS_CONDA_BUILD


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -390,4 +391,78 @@ def test_symbol_list_parallel_stress_with_delete(

def test_symbol_list_exception_and_printout(mock_s3_store_with_mock_storage_exception): # moto is choosen just because it's easy to give storage error
with pytest.raises(InternalException, match="E_S3_RETRYABLE Retry-able error"):
mock_s3_store_with_mock_storage_exception.list_symbols()
mock_s3_store_with_mock_storage_exception.list_symbols()


def test_force_compact_symbol_list(lmdb_version_store_v1):
lib = lmdb_version_store_v1
lib_tool = lib.library_tool()
# No symbol list keys
assert lib.compact_symbol_list() == 0
symbol_list_keys = lib_tool.find_keys(KeyType.SYMBOL_LIST)
assert len(symbol_list_keys) == 1
assert not len(lib.list_symbols())
lib_tool.remove(symbol_list_keys[0])

num_syms = 1000
syms = [f"sym_{idx:03}" for idx in range(num_syms)]
for sym in syms:
lib.write(sym, 1)
symbol_list_keys = lib_tool.find_keys(KeyType.SYMBOL_LIST)
assert len(symbol_list_keys) == num_syms
assert lib.compact_symbol_list() == num_syms
symbol_list_keys = lib_tool.find_keys(KeyType.SYMBOL_LIST)
assert len(symbol_list_keys) == 1
assert set(lib.list_symbols()) == set(syms)
# Idempotent
assert lib.compact_symbol_list() == 1
symbol_list_keys = lib_tool.find_keys(KeyType.SYMBOL_LIST)
assert len(symbol_list_keys) == 1
assert set(lib.list_symbols()) == set(syms)
# Everything deleted
for sym in syms:
lib.delete(sym)
# +1 for previous compacted key
assert lib.compact_symbol_list() == num_syms + 1
symbol_list_keys = lib_tool.find_keys(KeyType.SYMBOL_LIST)
assert len(symbol_list_keys) == 1
assert not len(lib.list_symbols())


# Using S3 because LMDB does not allow OpenMode to be changed
def test_force_compact_symbol_list_read_only(s3_version_store_v1):
lib_write = s3_version_store_v1
lib_read_only = make_read_only(lib_write)
# No symbol list keys
with pytest.raises(PermissionException):
lib_read_only.compact_symbol_list()
# Some symbol list keys
lib_write.write("sym1", 1)
lib_write.write("sym2", 1)
with pytest.raises(PermissionException):
lib_read_only.compact_symbol_list()
# One compacted symbol list key
assert lib_write.compact_symbol_list() == 2
with pytest.raises(PermissionException):
lib_read_only.compact_symbol_list()


def test_force_compact_symbol_list_lock_held(lmdb_version_store_v1):
lib = lmdb_version_store_v1
lock = lib.version_store.get_storage_lock(CompactionLockName)
lock.lock()
with pytest.raises(InternalException):
lib.compact_symbol_list()
lock.unlock()
assert lib.compact_symbol_list() == 0


@pytest.mark.skipif(MACOS_CONDA_BUILD, reason="Failing for unclear reasons")
def test_force_compact_symbol_list_lock_held_past_ttl(lmdb_version_store_v1):
# Set TTL to 5 seconds. Compact symbol list will retry for 10 seconds, so should always work
set_config_int("StorageLock.TTL", 5_000_000_000)
lib = lmdb_version_store_v1
lock = lib.version_store.get_storage_lock(CompactionLockName)
lock.lock()
assert lib.compact_symbol_list() == 0

0 comments on commit 1cd7d33

Please sign in to comment.