diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index c8322c805889..38354daecabd 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -347,6 +347,18 @@ inline bool MayDeleteAsynchronously(const PrimeValue& pv) { return (obj_type == OBJ_SET || obj_type == OBJ_HASH) && pv.Encoding() == kEncodingStrMap2; } +// Implement ChangeConsumerInterface with a single callback for one-shot functions +template struct CallbackConsumer : public DbSlice::ChangeConsumerInterface { + explicit CallbackConsumer(F f) : f_{std::move(f)} { + } + + void OnChange(DbIndex db_index, const ChangeReq& req) { + f_(db_index, req); + } + + F f_; +}; + } // namespace #define ADD(x) (x) += o.x @@ -826,7 +838,7 @@ void DbSlice::DelMutable(Context cntx, ItAndUpdater it_updater) { } void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids, uint64_t next_version, - uint64_t cb_id) { + ChangeConsumerInterface* consumer) { VLOG(1) << "Start FlushSlotsFb"; // Slot deletion can take time as it traverses all the database, hence it runs in fiber. // We want to flush all the data of a slot that was added till the time the call to FlushSlots @@ -870,7 +882,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids, uint64_t next_versi } while (cursor && etl.gstate() != GlobalState::SHUTTING_DOWN); VLOG(1) << "FlushSlotsFb del count is: " << del_count; - UnregisterOnChange(cb_id); + UnregisterOnChange(consumer); if (absl::GetFlag(FLAGS_cluster_flush_decommit_memory)) { int64_t start = absl::GetCurrentTimeNanos(); @@ -929,10 +941,11 @@ void DbSlice::FlushSlots(const cluster::SlotRanges& slot_ranges) { } }; - uint64_t cb_id = RegisterOnChange(std::move(on_change)); + auto consumer = std::make_unique>(std::move(on_change)); + RegisterOnChange(consumer.get()); - fb2::Fiber("flush_slots", [this, shared_slots, next_version, cb_id]() { - FlushSlotsFb(*shared_slots, next_version, cb_id); + fb2::Fiber("flush_slots", [this, shared_slots, next_version, consumer = std::move(consumer)]() { + FlushSlotsFb(*shared_slots, next_version, consumer.get()); }).Detach(); } @@ -1318,7 +1331,8 @@ PrimeIterator DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it) con void DbSlice::ExpireAllIfNeeded() { // We hold no locks to any of the keys so we should Wait() here such that // we don't preempt in ExpireIfNeeded - serialization_latch_.Wait(); + WaitForUnblockedJournalWrites(); + // Disable flush journal changes to prevent preemtion in traverse. journal::DisableFlushGuard journal_flush_guard(owner_->journal()); @@ -1340,9 +1354,28 @@ void DbSlice::ExpireAllIfNeeded() { } } -uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) { +void DbSlice::RegisterOnChange(ChangeConsumerInterface* consumer) { DCHECK(!owner_->shard_lock()->IsFree()); - return change_cb_.emplace_back(NextVersion(), std::move(cb)).first; + + consumer->snapshot_version_ = NextVersion(); + change_cb_.emplace_back(consumer); +} + +void DbSlice::UnregisterOnChange(ChangeConsumerInterface* consumer) { + change_cb_latch_.Wait(); + auto it = std::find(change_cb_.begin(), change_cb_.end(), consumer); + CHECK(it != change_cb_.end()); + change_cb_.erase(it); +} + +bool DbSlice::WillBlockOnJournalWrite() const { + return ranges::any_of(change_cb_, &ChangeConsumerInterface::IsAnyBucketBlocked); +} + +void DbSlice::WaitForUnblockedJournalWrites() const { + std::lock_guard lk{change_cb_latch_}; + while (WillBlockOnJournalWrite()) + ranges::for_each(change_cb_, &ChangeConsumerInterface::WaitForNoBucketBlocked); } // Ordering invariant (PIT mode): @@ -1353,46 +1386,22 @@ uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) { // snapshot could miss the bucket entirely — its traversal already passed it, and the version // stamp from the current snapshot would cause the earlier snapshot's OnChangeBlocking to skip it. void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound) { - unique_lock lk(serialization_latch_); - uint64_t bucket_version = it.GetVersion(); - // change_cb_ is ordered by version. - DVLOG(2) << "Running callbacks in dbid " << db_ind << " with bucket_version=" << bucket_version - << ", upper_bound=" << upper_bound; - - const size_t limit = change_cb_.size(); - auto ccb = change_cb_.begin(); - for (size_t i = 0; i < limit; ++i) { - uint64_t cb_version = ccb->first; - DCHECK_LE(cb_version, upper_bound); - if (cb_version == upper_bound) { + + // See CallChangeCallbacks for iteration details + unique_lock lk(change_cb_latch_); + for (auto* cb : change_cb_ | std::views::take(change_cb_.size())) { + DCHECK_LE(cb->snapshot_version_, upper_bound); + if (cb->snapshot_version_ == upper_bound) return; - } - // We can not have here bucket_version < cb_version check. Explanation: - // Suppose we run snapshots S1 and S2, S1 starts serializing the bucket B, - // now snapshot S2 is started and it reaches the B, calls FlushChangeToEarlierCallbacks. - // if if we have here strong inequality, then S1 callback will be skipped here, and S2 - // will start processing B concurrently with S1. It should be fine in general, but - // we prefer avoiding this, so that we could DCHECK the invariant that the version bucket - // does not change during the serialization, therefore we allow at most one serializer - // reading the bucket at the same time. - if (bucket_version <= cb_version) { - ccb->second(db_ind, ChangeReq{it.GetInnerIt()}); - } - ++ccb; + // We call OnChange even if bucket_version == snapshot_version to ensure that the bucket + // _finished_ serializing, as we update its version when we start serializing. + if (bucket_version <= cb->snapshot_version_) + cb->OnChange(db_ind, ChangeReq{it.GetInnerIt()}); } } -//! Unregisters the callback. -void DbSlice::UnregisterOnChange(uint64_t id) { - serialization_latch_.Wait(); - auto it = find_if(change_cb_.begin(), change_cb_.end(), - [id](const auto& cb) { return cb.first == id; }); - CHECK(it != change_cb_.end()); - change_cb_.erase(it); -} - auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteExpiredStats { auto& db = *db_arr_[cntx.db_index]; DeleteExpiredStats result; @@ -1898,7 +1907,7 @@ void DbSlice::OnCbFinishBlocking() { } // We must not change the bucket's internal order during serialization - serialization_latch_.Wait(); + WaitForUnblockedJournalWrites(); PrimeBumpPolicy policy; auto bump_it = db.prime.BumpUp(it, policy); if (bump_it != it) { // the item was bumped @@ -1915,16 +1924,10 @@ void DbSlice::CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const { if (change_cb_.empty()) return; - // does not preempt, just increments the counter. - unique_lock lk(serialization_latch_); - - const size_t limit = change_cb_.size(); - auto ccb = change_cb_.begin(); - for (size_t i = 0; i < limit; ++i) { - CHECK(ccb->second); - ccb->second(id, cr); - ++ccb; - } + // Lock latch to prevent deletion, take size() entries to ignore new insertions during suspensions + unique_lock lk(change_cb_latch_); + for (auto* cb : change_cb_ | std::views::take(change_cb_.size())) + cb->OnChange(id, cr); } } // namespace dfly diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 36fb54aad417..9f96e19e51f9 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -84,6 +84,24 @@ class DbSlice { void operator=(const DbSlice&) = delete; public: + // Consumer of bucket change events than can be registered inside the slice. + // It also includes additional methods for interfacing with snapshots and migrations. + struct ChangeConsumerInterface { + // Called before a specific bucket (or set of buckets) will be mutated + virtual void OnChange(DbIndex, const ChangeReq&) = 0; + + // Should return true if any bucket is mid-serialization + virtual bool IsAnyBucketBlocked() const { + return false; + } + + // Should wait for IsAnyBucketBlocked to return false + virtual void WaitForNoBucketBlocked() const { + } + + uint64_t snapshot_version_ = 0; + }; + // Auto-laundering iterator wrapper. Laundering means re-finding keys if they moved between // buckets. template class IteratorT { @@ -393,7 +411,10 @@ class DbSlice { //! Registers the callback to be called for each change. //! Returns the registration id which is also the unique version of the dbslice //! at a time of the call. - uint64_t RegisterOnChange(ChangeCallback cb); + void RegisterOnChange(ChangeConsumerInterface* consumer); + + // Unregister consumer. Not allowed to be called form the consumer callback + void UnregisterOnChange(ChangeConsumerInterface* consumer); bool HasRegisteredCallbacks() const { return !change_cb_.empty(); @@ -402,9 +423,6 @@ class DbSlice { // Call registered callbacks with version less than upper_bound. void FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound); - //! Unregisters the callback. - void UnregisterOnChange(uint64_t id); - struct DeleteExpiredStats { uint32_t deleted = 0; // number of deleted items due to expiry. uint32_t deleted_bytes = 0; // total bytes of deleted items. @@ -483,13 +501,12 @@ class DbSlice { // if it's not empty and not EX. void SetNotifyKeyspaceEvents(std::string_view notify_keyspace_events); - bool WillBlockOnJournalWrite() const { - return serialization_latch_.IsBlocked(); - } + // Returns true if any registered snapshot is blocked on bucket serialiazion (big value, delayed) + // and thus might reject the journal change + bool WillBlockOnJournalWrite() const; - LocalLatch* GetLatch() { - return &serialization_latch_; - } + // Block and wait for WillBlockOnJournalWrite to become false + void WaitForUnblockedJournalWrites() const; void StartSampleTopK(DbIndex db_ind, uint32_t min_freq); @@ -521,7 +538,8 @@ class DbSlice { PrimeValue obj, uint64_t expire_at_ms, bool force_update); - void FlushSlotsFb(const cluster::SlotSet& slot_ids, uint64_t next_version, uint64_t cb_id); + void FlushSlotsFb(const cluster::SlotSet& slot_ids, uint64_t next_version, + ChangeConsumerInterface* consumer); util::fb2::Fiber FlushDbIndexes(const std::vector& indexes); // Invalidate all watched keys in database. Used on FLUSH. @@ -565,11 +583,6 @@ class DbSlice { void CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const; - // We need this because registered callbacks might yield and when they do so we want - // to avoid Heartbeat or Flushing the db. - // This latch protects us against this case. - mutable LocalLatch serialization_latch_; - ShardId shard_id_; uint8_t cache_mode_ : 1; @@ -608,7 +621,8 @@ class DbSlice { mutable absl::flat_hash_set uniq_fps_; // ordered from the smallest to largest version. - std::list> change_cb_; + std::list change_cb_; + mutable LocalLatch change_cb_latch_; // to avoid deletion during traversal // Used in temporary computations in Find item and CbFinish // This set is used to hold fingerprints of key accessed during the run of diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index c2e2204f941e..375835c3e6a6 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -703,15 +703,27 @@ TEST_F(DflyEngineTest, Bug468) { ASSERT_FALSE(IsLocked(0, "foo")); } +struct CountingConsumer : public DbSlice::ChangeConsumerInterface { + explicit CountingConsumer(unsigned* cb_hits) : cb_hits_(cb_hits) { + } + + void OnChange(DbIndex db_index, const ChangeReq&) { + (*cb_hits_)++; + } + + unsigned* cb_hits_; +}; + TEST_F(DflyEngineTest, Bug496) { shard_set->RunBlockingInParallel([](EngineShard* shard) { auto& db = namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()); - int cb_hits = 0; + unsigned cb_hits = 0; + CountingConsumer consumer{&cb_hits}; + // RegisterOnChange requires the shard lock to be held (see #7153). shard->shard_lock()->Acquire(IntentLock::EXCLUSIVE); - uint32_t cb_id = - db.RegisterOnChange([&cb_hits](DbIndex, const DbSlice::ChangeReq&) { cb_hits++; }); + db.RegisterOnChange(&consumer); shard->shard_lock()->Release(IntentLock::EXCLUSIVE); { @@ -732,7 +744,7 @@ TEST_F(DflyEngineTest, Bug496) { EXPECT_EQ(cb_hits, 3); } - db.UnregisterOnChange(cb_id); + db.UnregisterOnChange(&consumer); }); } diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index bbc7f26b28a7..6106ddac5517 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -652,7 +652,7 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, // ScanCb can preempt due to journaling expired entries and we need to make sure that // we enter the callback in a timing when journaling will not cause preemption. Otherwise, // the bucket might change as we Traverse and yield. - db_slice.GetLatch()->Wait(); + db_slice.WaitForUnblockedJournalWrites(); // Disable flush journal changes to prevent preemtion in traverse. journal::DisableFlushGuard journal_flush_guard(op_args.shard->journal()); diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 9297159c15dc..3f3a8d90d5b7 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -537,7 +537,7 @@ bool RestoreStreamer::Cancel() { snapshot_version_ = 0; // to prevent double cancel in another fiber cntx_->Cancel(); if (sver != 0) { - db_slice_->UnregisterOnChange(sver); + db_slice_->UnregisterOnChange(this); } bool res = JournalStreamer::Cancel(); LOG_IF(WARNING, res != (sver != 0)) << "Journal and DBSlice unregister state mismatch in " diff --git a/src/server/serializer_base.cc b/src/server/serializer_base.cc index 720dd3308c41..dd3d5d643a22 100644 --- a/src/server/serializer_base.cc +++ b/src/server/serializer_base.cc @@ -19,6 +19,7 @@ #include "server/tiered_storage.h" #include "util/fibers/fibers.h" #include "util/fibers/stacktrace.h" +#include "util/fibers/synchronization.h" namespace dfly { @@ -36,6 +37,9 @@ void BucketDependencies::Decrement(BucketIdentity bucket) { it->second->unlock(); if (!it->second->IsBlocked()) deps_.erase(it); + + if (deps_.empty()) + empty_q_.notify_all(); } void BucketDependencies::Wait(BucketIdentity bucket) const { @@ -47,6 +51,11 @@ void BucketDependencies::Wait(BucketIdentity bucket) const { counter->Wait(); } +void BucketDependencies::WaitEmpty() const { + util::fb2::NoOpLock lock; + empty_q_.wait(lock, [&] { return deps_.empty(); }); +} + bool BucketDependencies::DEBUG_IsBusy(BucketIdentity bucket) const { return deps_.contains(bucket); } @@ -131,15 +140,13 @@ SerializerBase::~SerializerBase() { // emitting large values. void SerializerBase::RegisterChangeListener() { db_array_ = db_slice_->databases(); // copy pointers to survive flush - auto cb = [this](DbIndex dbid, const ChangeReq& req) { - std::visit([&](auto it) { OnChangeBlocking(dbid, it); }, req); - }; - snapshot_version_ = db_slice_->RegisterOnChange(cb); + db_slice_->RegisterOnChange(this); } void SerializerBase::UnregisterChangeListener() { + DCHECK(!IsAnyBucketBlocked()); if (auto version = std::exchange(snapshot_version_, 0); version > 0) - db_slice_->UnregisterOnChange(version); + db_slice_->UnregisterOnChange(this); } bool SerializerBase::ProcessBucket(DbIndex db_index, PrimeTable::bucket_iterator it, @@ -165,12 +172,9 @@ bool SerializerBase::ProcessBucket(DbIndex db_index, PrimeTable::bucket_iterator // acquire serialization latch. // We must make sure that earlier snapshots serialized this bucket before we update its // version below. - std::optional> db_guard; - if (!on_update) { + if (!on_update) db_slice_->FlushChangeToEarlierCallbacks(db_index, DbSlice::Iterator::FromPrime(it), snapshot_version_); - db_guard.emplace(*db_slice_->GetLatch()); - } // The block above with updating earlier callbacks is not exlusive - check version again if (it.GetVersion() >= snapshot_version_) @@ -192,6 +196,14 @@ bool SerializerBase::ProcessBucket(DbIndex db_index, PrimeTable::bucket_iterator return true; } +void SerializerBase::WaitForNoBucketBlocked() const { + BucketDependencies::WaitEmpty(); +} + +void SerializerBase::OnChange(DbIndex db_index, const ChangeReq& req) { + std::visit([&](auto it) { OnChangeBlocking(db_index, it); }, req); +} + void SerializerBase::OnChangeBlocking(DbIndex db_index, PrimeTable::bucket_iterator it) { std::string_view active_name = util::fb2::detail::FiberActive()->name(); if (!absl::StartsWith(active_name, "shard_queue") && // diff --git a/src/server/serializer_base.h b/src/server/serializer_base.h index 27b0c4586f66..ce3b7a0c7d7f 100644 --- a/src/server/serializer_base.h +++ b/src/server/serializer_base.h @@ -10,6 +10,7 @@ #include #include "io/io.h" +#include "server/db_slice.h" #include "server/synchronization.h" #include "server/table.h" #include "util/fibers/future.h" @@ -36,9 +37,18 @@ struct BucketDependencies { void Wait(BucketIdentity bucket) const; bool DEBUG_IsBusy(BucketIdentity) const; + // Wait for no dependencies to exist + void WaitEmpty() const; + bool HasAny() const { + return deps_.size() > 0; + } + private: using SharedLatch = std::shared_ptr; absl::flat_hash_map deps_; + + // Triggered when all dependencies are resolved + mutable util::fb2::CondVarAny empty_q_; }; struct TieredDelayedEntry { @@ -79,7 +89,9 @@ struct DelayedEntryHandler { // Progress should be driven externally by calling ProcessBucket(). // Additionally, db_slice change listeners can be registered that invoke SerializeBucket // before any modification are performed to ensure point-in-time isolation. -class SerializerBase : public BucketDependencies, public DelayedEntryHandler { +class SerializerBase : public BucketDependencies, + public DelayedEntryHandler, + public DbSlice::ChangeConsumerInterface { public: struct Stats { uint64_t keys_serialized = 0; // total number of keys serialized @@ -114,6 +126,14 @@ class SerializerBase : public BucketDependencies, public DelayedEntryHandler { virtual unsigned SerializeBucketLocked(DbIndex db_index, PrimeTable::bucket_iterator it, bool on_update) = 0; + void OnChange(DbIndex db_index, const ChangeReq& req) override; + + bool IsAnyBucketBlocked() const override { + return BucketDependencies::HasAny(); + } + + void WaitForNoBucketBlocked() const override; + // Called when an existing bucket is about to be mutated. Calls ProcessBucket. void OnChangeBlocking(DbIndex db_index, PrimeTable::bucket_iterator it); @@ -127,7 +147,6 @@ class SerializerBase : public BucketDependencies, public DelayedEntryHandler { DbTableArray db_array_; - uint64_t snapshot_version_ = 0; Stats stats_; // Guards output stream (serializer) to not be used from multiple fibers diff --git a/src/server/serializer_base_test.cc b/src/server/serializer_base_test.cc index 8e02d8b7c07e..3080687a75c8 100644 --- a/src/server/serializer_base_test.cc +++ b/src/server/serializer_base_test.cc @@ -23,6 +23,7 @@ #include "server/conn_context.h" #include "server/db_slice.h" #include "server/engine_shard.h" +#include "server/engine_shard_set.h" #include "server/execution_state.h" #include "server/journal/journal.h" #include "server/journal/serializer.h" @@ -125,8 +126,8 @@ struct TestDriver : public SerializerBase, journal::JournalConsumerInterface { // TODO: possibly replace with unified loop if we decide on this? void Loop(); - void Serialize(BucketIdentity bucket, std::string key) { - if (absl::Bernoulli(bg_, params_.delay_prob)) { + void Serialize(BucketIdentity bucket, std::string key, unsigned obj_type) { + if (obj_type == OBJ_STRING && absl::Bernoulli(bg_, params_.delay_prob)) { DelayedEntryHandler::deps_.Increment(bucket); unsigned delay = absl::Uniform(bg_, params_.delay_lat_us.first, params_.delay_lat_us.second); auto de = std::make_unique(0, CompactKey{key}, @@ -247,7 +248,7 @@ unsigned TestDriver::SerializeBucketLocked(DbIndex db_index, PrimeTable::bucket_ DCHECK_EQ(it.GetVersion(), snapshot_version_); std::lock_guard lk{stream_mu_}; - Serialize(it.bucket_address(), it->first.ToString()); + Serialize(it.bucket_address(), it->first.ToString(), it->second.ObjType()); ++serialized; while (absl::Bernoulli(bg_, 0.3)) { @@ -393,10 +394,9 @@ TEST_F(SerializerBaseTest, IncreasingLists) { // Serialization code has many paths that omit empty bucket checks at all - // assert those "lost" delayed reads are correctly flushed before new changes TEST_F(SerializerBaseTest, DelayedAllDeleted) { - GTEST_SKIP() << "To be fixed"; + GTEST_SKIP() << "Deadlocks"; - // 1-2 ms - driver_params = {.delay_prob = 0.9, .delay_lat_us = {1000, 2000}}; + driver_params = {.delay_prob = 0.9, .delay_lat_us = {200, 600}}; // Fill database with some keys const size_t kKeys = 10000; @@ -413,7 +413,7 @@ TEST_F(SerializerBaseTest, DelayedAllDeleted) { // Let all values to be expire deleted TEST_current_time_ms = TEST_current_time_ms + 100; for (unsigned i = 0; i < kKeys; i++) - EXPECT_THAT(Run({"GET", absl::StrCat("key:", i)}), ArgType(RespExpr::NIL)); + Run({"SET", absl::StrCat("key:", i), "V"}); // Reallow delayed entry resolution Change([](TestDriver& d) { d.delay_driver_.Resume(); }); @@ -424,4 +424,58 @@ TEST_F(SerializerBaseTest, DelayedAllDeleted) { Finish(); } + +// Background eviction can cause delayed entries to be evicted. +// We must make sure that the baseline is either serialized before the eviction message or aborted. +// Detail: Yes, currently pure offloaded strings never allocate memory - but the keys do, so they +// are targets for eviction nonetheless +TEST_F(SerializerBaseTest, DelayedEvicted) { + // 1-2 ms + driver_params = {.delay_prob = 0.5, .delay_lat_us = {1000, 2000}}; + + // Fill database with some keys + const size_t kKeys = 100; + Run({"DEBUG", "POPULATE", std::to_string(kKeys), "key", "4096"}); + + // Add gigantic set that will eat up memory + // Its bucket should not coincide with any of the string keys + Run({"SADD", "gigantic-set", "first-entry"}); + + // Start and pause reolution of delayed entries + Start(); + Change([](TestDriver& d) { d.delay_driver_.Pause(); }); + + // Enable cache mode and grow set + shard_set->TEST_EnableCacheMode(); + max_memory_limit = 10'000; + for (unsigned i = 0; i < 1000; i++) + Run({"SADD", "gigantic-set", + absl::StrCat("some data some data some data some long data @ ", i)}); + + // Give heartbeat a change to run if it didn't yet + for (unsigned i = 0; i < 10; i++) + util::ThisFiber::Yield(); + + // Tiered entries blocked all evictions + unsigned evicted = GetMetrics().db_stats.front().events.evicted_keys; + EXPECT_EQ(evicted, 0u); + + // Reallow delayed entry resolution + Change([](TestDriver& d) { d.delay_driver_.Resume(); }); + + // Put more memory pressure on it + for (unsigned i = 0; i < 1000; i++) + Run({"SADD", "gigantic-set", + absl::StrCat("more some data some data some data some long data @ ", i)}); + + // Now we have evictions + evicted = GetMetrics().db_stats.front().events.evicted_keys; + EXPECT_GT(evicted, 0u); + + auto [_1, baselines, _2] = Finish(); + + // Currently we write baselines before eviction + EXPECT_EQ(baselines.size(), kKeys + 1); +} + } // namespace dfly