From c6dd207d475aee648aece6c86b763840ea0a4b22 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Sun, 19 Apr 2026 19:53:02 +0200 Subject: [PATCH] fix(server): Process empty buckets in snapshot --- src/core/dash.h | 31 ++++++++++++++++++------------- src/server/serializer_base.cc | 7 ++++--- src/server/snapshot.cc | 7 ++++--- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/src/core/dash.h b/src/core/dash.h index 7545544f17ee..a2710122d2b3 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -348,7 +348,7 @@ class DashTable : public detail::DashTableBase { // Unlike Traverse, TraverseBuckets calls cb once on bucket iterator and not on each entry in // bucket. TraverseBuckets is stable during table mutations. It guarantees traversing all buckets // that existed at the beginning of traversal. - template Cursor TraverseBuckets(Cursor curs, Cb&& cb); + template Cursor TraverseBuckets(Cursor curs, Cb&& cb, bool visit_empty = false); // Traverses over a single bucket in table and calls cb(iterator). The traverse order will be // segment by segment over physical backets. @@ -460,15 +460,16 @@ class DashTable<_Key, _Value, Policy>::Iterator { uint32_t seg_id_; detail::PhysicalBid bucket_id_; uint8_t slot_id_; + bool done_; friend class DashTable; Iterator(Owner* me, uint32_t seg_id, detail::PhysicalBid bid, uint8_t sid) - : owner_(me), seg_id_(seg_id), bucket_id_(bid), slot_id_(sid) { + : owner_(me), seg_id_(seg_id), bucket_id_(bid), slot_id_(sid), done_(false) { } Iterator(Owner* me, uint32_t seg_id, detail::PhysicalBid bid) - : owner_(me), seg_id_(seg_id), bucket_id_(bid), slot_id_(0) { + : owner_(me), seg_id_(seg_id), bucket_id_(bid), slot_id_(0), done_(false) { Seek2Occupied(); } @@ -486,7 +487,8 @@ class DashTable<_Key, _Value, Policy>::Iterator { : owner_(other.owner_), seg_id_(other.seg_id_), bucket_id_(other.bucket_id_), - slot_id_(other.slot_id_) { + slot_id_(other.slot_id_), + done_(other.done_) { } // Copy constructor from iterator to bucket_iterator and vice versa. @@ -495,14 +497,15 @@ class DashTable<_Key, _Value, Policy>::Iterator { : owner_(other.owner_), seg_id_(other.seg_id_), bucket_id_(other.bucket_id_), - slot_id_(IsSingleBucket ? 0 : other.slot_id_) { + slot_id_(IsSingleBucket ? 0 : other.slot_id_), + done_(other.done_) { // if this - is a bucket_iterator - we reset slot_id to the first occupied space. if constexpr (IsSingleBucket) { Seek2Occupied(); } } - Iterator() : owner_(nullptr), seg_id_(0), bucket_id_(0), slot_id_(0) { + Iterator() : owner_(nullptr), seg_id_(0), bucket_id_(0), slot_id_(0), done_(true) { } Iterator(const Iterator& other) = default; @@ -539,7 +542,7 @@ class DashTable<_Key, _Value, Policy>::Iterator { // Make it self-contained. Does not need container::end(). bool is_done() const { - return owner_ == nullptr; + return done_; } bool IsOccupied() const { @@ -564,10 +567,11 @@ class DashTable<_Key, _Value, Policy>::Iterator { } friend bool operator==(const Iterator& lhs, const Iterator& rhs) { - if (lhs.owner_ == nullptr && rhs.owner_ == nullptr) + if (lhs.done_ && rhs.done_) return true; return lhs.owner_ == rhs.owner_ && lhs.seg_id_ == rhs.seg_id_ && - lhs.bucket_id_ == rhs.bucket_id_ && lhs.slot_id_ == rhs.slot_id_; + lhs.bucket_id_ == rhs.bucket_id_ && lhs.slot_id_ == rhs.slot_id_ && + lhs.done_ == rhs.done_; } friend bool operator!=(const Iterator& lhs, const Iterator& rhs) { @@ -649,7 +653,7 @@ struct DashTable<_Key, _Value, Policy>::BucketSet { template template void DashTable<_Key, _Value, Policy>::Iterator::Seek2Occupied() { - if (owner_ == nullptr) + if (done_) return; assert(seg_id_ < owner_->segment_.size()); @@ -673,7 +677,7 @@ void DashTable<_Key, _Value, Policy>::Iterator::Seek2Oc bucket_id_ = slot_id_ = 0; } } - owner_ = nullptr; + done_ = true; } template @@ -1164,7 +1168,8 @@ auto DashTable<_Key, _Value, Policy>::AdvanceCursorBucketOrder(Cursor cursor) -> template template -auto DashTable<_Key, _Value, Policy>::TraverseBuckets(Cursor cursor, Cb&& cb) -> Cursor { +auto DashTable<_Key, _Value, Policy>::TraverseBuckets(Cursor cursor, Cb&& cb, bool visit_empty) + -> Cursor { if (SegmentType::OutOfRange(cursor.bucket_id())) // sanity. return Cursor::end(); @@ -1178,7 +1183,7 @@ auto DashTable<_Key, _Value, Policy>::TraverseBuckets(Cursor cursor, Cb&& cb) -> assert(s); if (bid < s->num_buckets()) { const auto& bucket = s->GetBucket(bid); - if (bucket.GetBusy()) { // Invoke callback only if bucket has elements. + if (visit_empty || bucket.GetBusy()) { cb(BucketIt(sid, bid)); invoked = true; } diff --git a/src/server/serializer_base.cc b/src/server/serializer_base.cc index c4dc166295af..0d0cc96ccb8f 100644 --- a/src/server/serializer_base.cc +++ b/src/server/serializer_base.cc @@ -153,11 +153,12 @@ bool SerializerBase::ProcessBucketInternal(DbIndex db_index, PrimeTable::bucket_ if (it.is_done() || it.GetVersion() >= snapshot_version_) { stats_.buckets_skipped++; - if (it.is_done()) - return false; + // Update versions for empty buckets + if (it.GetVersion() < snapshot_version_) + it.SetVersion(snapshot_version_); // Force flush all delayed entries in the touched bucket - if (EngineShard::tlocal()->tiered_storage() != nullptr && on_update && !it.is_done()) + if (EngineShard::tlocal()->tiered_storage() != nullptr && on_update) ProcessDelayedEntries(false, it.bucket_address(), base_cntx_); // Expected to be fully serialized due to big_value_mu_ guarding all paths diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 93017010bc5c..d11c6aaadd02 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -179,9 +179,10 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) { return; } - snapshot_cursor_ = pt->TraverseBuckets(snapshot_cursor_, [this, snapshot_db_indx](auto it) { - ProcessBucket(snapshot_db_indx, it, false); - }); + snapshot_cursor_ = pt->TraverseBuckets( + snapshot_cursor_, + [this, snapshot_db_indx](auto it) { ProcessBucket(snapshot_db_indx, it, false); }, + true /* include empty buckets */); if (use_background_mode_) { // Yielding for background fibers has low overhead if the time slice isn't used up.