Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions src/core/dash.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ class DashTable : public detail::DashTableBase {
// Unlike Traverse, TraverseBuckets calls cb once on bucket iterator and not on each entry in
// bucket. TraverseBuckets is stable during table mutations. It guarantees traversing all buckets
// that existed at the beginning of traversal.
template <typename Cb> Cursor TraverseBuckets(Cursor curs, Cb&& cb);
template <typename Cb> Cursor TraverseBuckets(Cursor curs, Cb&& cb, bool visit_empty = false);

// Traverses over a single bucket in table and calls cb(iterator). The traverse order will be
// segment by segment over physical backets.
Expand Down Expand Up @@ -460,15 +460,16 @@ class DashTable<_Key, _Value, Policy>::Iterator {
uint32_t seg_id_;
detail::PhysicalBid bucket_id_;
uint8_t slot_id_;
bool done_;

friend class DashTable;

Iterator(Owner* me, uint32_t seg_id, detail::PhysicalBid bid, uint8_t sid)
: owner_(me), seg_id_(seg_id), bucket_id_(bid), slot_id_(sid) {
: owner_(me), seg_id_(seg_id), bucket_id_(bid), slot_id_(sid), done_(false) {
}

Iterator(Owner* me, uint32_t seg_id, detail::PhysicalBid bid)
: owner_(me), seg_id_(seg_id), bucket_id_(bid), slot_id_(0) {
: owner_(me), seg_id_(seg_id), bucket_id_(bid), slot_id_(0), done_(false) {
Seek2Occupied();
}

Expand All @@ -486,7 +487,8 @@ class DashTable<_Key, _Value, Policy>::Iterator {
: owner_(other.owner_),
seg_id_(other.seg_id_),
bucket_id_(other.bucket_id_),
slot_id_(other.slot_id_) {
slot_id_(other.slot_id_),
done_(other.done_) {
}

// Copy constructor from iterator to bucket_iterator and vice versa.
Expand All @@ -495,14 +497,15 @@ class DashTable<_Key, _Value, Policy>::Iterator {
: owner_(other.owner_),
seg_id_(other.seg_id_),
bucket_id_(other.bucket_id_),
slot_id_(IsSingleBucket ? 0 : other.slot_id_) {
slot_id_(IsSingleBucket ? 0 : other.slot_id_),
done_(other.done_) {
// if this - is a bucket_iterator - we reset slot_id to the first occupied space.
if constexpr (IsSingleBucket) {
Seek2Occupied();
}
}

Iterator() : owner_(nullptr), seg_id_(0), bucket_id_(0), slot_id_(0) {
Iterator() : owner_(nullptr), seg_id_(0), bucket_id_(0), slot_id_(0), done_(true) {
}

Iterator(const Iterator& other) = default;
Expand Down Expand Up @@ -539,7 +542,7 @@ class DashTable<_Key, _Value, Policy>::Iterator {

// Make it self-contained. Does not need container::end().
bool is_done() const {
return owner_ == nullptr;
return done_;
}

bool IsOccupied() const {
Expand All @@ -564,10 +567,11 @@ class DashTable<_Key, _Value, Policy>::Iterator {
}

friend bool operator==(const Iterator& lhs, const Iterator& rhs) {
if (lhs.owner_ == nullptr && rhs.owner_ == nullptr)
if (lhs.done_ && rhs.done_)
return true;
return lhs.owner_ == rhs.owner_ && lhs.seg_id_ == rhs.seg_id_ &&
lhs.bucket_id_ == rhs.bucket_id_ && lhs.slot_id_ == rhs.slot_id_;
lhs.bucket_id_ == rhs.bucket_id_ && lhs.slot_id_ == rhs.slot_id_ &&
lhs.done_ == rhs.done_;
}

friend bool operator!=(const Iterator& lhs, const Iterator& rhs) {
Expand Down Expand Up @@ -649,7 +653,7 @@ struct DashTable<_Key, _Value, Policy>::BucketSet {
template <typename _Key, typename _Value, typename Policy>
template <bool IsConst, bool IsSingleBucket>
void DashTable<_Key, _Value, Policy>::Iterator<IsConst, IsSingleBucket>::Seek2Occupied() {
if (owner_ == nullptr)
if (done_)
return;
assert(seg_id_ < owner_->segment_.size());

Expand All @@ -673,7 +677,7 @@ void DashTable<_Key, _Value, Policy>::Iterator<IsConst, IsSingleBucket>::Seek2Oc
bucket_id_ = slot_id_ = 0;
}
}
owner_ = nullptr;
done_ = true;
}

template <typename _Key, typename _Value, typename Policy>
Expand Down Expand Up @@ -1164,7 +1168,8 @@ auto DashTable<_Key, _Value, Policy>::AdvanceCursorBucketOrder(Cursor cursor) ->

template <typename _Key, typename _Value, typename Policy>
template <typename Cb>
auto DashTable<_Key, _Value, Policy>::TraverseBuckets(Cursor cursor, Cb&& cb) -> Cursor {
auto DashTable<_Key, _Value, Policy>::TraverseBuckets(Cursor cursor, Cb&& cb, bool visit_empty)
-> Cursor {
if (SegmentType::OutOfRange(cursor.bucket_id())) // sanity.
return Cursor::end();

Expand All @@ -1178,7 +1183,7 @@ auto DashTable<_Key, _Value, Policy>::TraverseBuckets(Cursor cursor, Cb&& cb) ->
assert(s);
if (bid < s->num_buckets()) {
const auto& bucket = s->GetBucket(bid);
if (bucket.GetBusy()) { // Invoke callback only if bucket has elements.
if (visit_empty || bucket.GetBusy()) {
cb(BucketIt(sid, bid));
invoked = true;
}
Expand Down
7 changes: 4 additions & 3 deletions src/server/serializer_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,12 @@ bool SerializerBase::ProcessBucketInternal(DbIndex db_index, PrimeTable::bucket_
if (it.is_done() || it.GetVersion() >= snapshot_version_) {
stats_.buckets_skipped++;

if (it.is_done())
return false;
// Update versions for empty buckets
if (it.GetVersion() < snapshot_version_)
it.SetVersion(snapshot_version_);

// Force flush all delayed entries in the touched bucket
if (EngineShard::tlocal()->tiered_storage() != nullptr && on_update && !it.is_done())
if (EngineShard::tlocal()->tiered_storage() != nullptr && on_update)
ProcessDelayedEntries(false, it.bucket_address(), base_cntx_);

// Expected to be fully serialized due to big_value_mu_ guarding all paths
Expand Down
7 changes: 4 additions & 3 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,10 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
return;
}

snapshot_cursor_ = pt->TraverseBuckets(snapshot_cursor_, [this, snapshot_db_indx](auto it) {
ProcessBucket(snapshot_db_indx, it, false);
});
snapshot_cursor_ = pt->TraverseBuckets(
snapshot_cursor_,
[this, snapshot_db_indx](auto it) { ProcessBucket(snapshot_db_indx, it, false); },
true /* include empty buckets */);

if (use_background_mode_) {
// Yielding for background fibers has low overhead if the time slice isn't used up.
Expand Down
Loading