Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: yield when serialization is in progress #3220

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,7 @@ bool RestoreStreamer::ShouldWrite(cluster::SlotId slot_id) const {
}

bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
// Can't switch fibers because that could invalidate iterator or cause bucket splits which may
// move keys between buckets.
FiberAtomicGuard fg;
std::unique_lock<util::fb2::Mutex> lk(bucket_ser_mu_);

bool written = false;

Expand Down Expand Up @@ -312,7 +310,7 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0";

FiberAtomicGuard fg;
{ std::unique_lock<util::fb2::Mutex> lk(bucket_ser_mu_); }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you put it on its own scope? It will be freed immediately after, which will mean that other OnDbChange callbacks could run in parallel, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so this is the case. Once it acquires the lock, it will drop it immediately but it won't preempt. It will preempt potentially when we call WriteBucket. All the steps between dropping the lock on line 313 and calling WriteBucket are atomic, there is no preemption. However, when we call WriteBucket we will reqacquire the lock on the same fiber before we preempt. Think of this {} as transferring ownership of the lock without explicitly doing it (e.g. lk.unlock() before we call WriteBucket

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some thoughts on what you wrote:

  • Is what you wrote also true for CVCUponInsert()? I see that it uses a callback. I think it's true, but you should verify if we're counting on that
  • I know that "regular" mutexes (i.e. non fiber ones) do not guarantee that, even if the lock is unlocked, it will not preempt to other threads. I don't know if that's true here, but I guess it is. In other words, this may not guarantee a mutex transfer without yielding in between
  • I understand that WriteBucket() locks this lock, so I don't see any benefit to lock (and immediately release) here as well. Why even do that?
  • There might be a subtle bug here: CVCUponInsert() might call WriteBucket() multiple times (over multiple buckets). If we lose the mutex to some other operation, which touches that other bucket, it might change the underlying structure causing us to lose some changes

I'd recommend adding a WriteBucketNoLock() method which will be called here and in WriteBucket(). Bonus points if you add ABSL_LOCKS_EXCLUDED and ABSL_LOCKS_REQUIRED annotations to let the compiler verify we don't create deadlocks

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also dont see why we need to lock and release here, as if we lock in WriteBucket this should be enough.
@chakaz Regarding the comment that CVCUponInsert calls WriteBucket multiple times, I think that ok.. because what we care is that we dont break the bucket into multiple pieces but I dont think we should care if CVCUponInsert serializes one bucket we will preempt , serialize onother bucket from different flow and than go back to continue serialize the next buckets from CVCUponInsert. Is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chakaz

Yes we do not need the lock OnDbChange that was a mistake.

I know that "regular" mutexes (i.e. non fiber ones) do not guarantee that, even if the lock is unlocked, it will not preempt to other threads. I don't know if that's true here, but I guess it is. In other words, this may not guarantee a mutex transfer without yielding in between

Regular mutexes behave differently when notifying other threads to make progress. If this code was running on thread a but wakes up thread b and releases the mutex then it's a race for which thread locks the mutex. This is not true for fibers:

   90 void Mutex::unlock() {                                                                             
   91   detail::FiberInterface* active = detail::FiberActive();                                          
   92                                                                                                    
   93   unique_lock lk(wait_queue_splk_);                                                                
   94   CHECK(owner_ == active);                                                                         
   95   owner_ = nullptr;                                                                                
   96                                                                                                    
   97   wait_queue_.NotifyOne(active);                                                                   
   98 }  

And if you follow NotifyOne it calls NotifyImpl which calls ActivateOther

  116   
  117   // Schedules another fiber without switching to it.                                              
  118   // other can belong to another thread.                                                           
  119   void ActivateOther(FiberInterface* other);
  120   

The gist here is a) That ActivateOther adds the fiber to the ready queue b) that in our case all the interesting fibers (the fiber that runs OnDbChange && IterateFB) that use this mutex run on the same thread, this is both true for SaveStagesController && FullSyncFb for replication (on RDB version they do push on the same serialization channel which is fine since we don’t (yet until phase 2) split individual slots.

+1 for the rest

@adiholden won't happen, when we release the lock we continue running on the same fiber, we don't switch.

Plz let me know if I missed anything

Copy link
Contributor Author

@kostasrim kostasrim Jun 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adiholden

because what we care is that we dont break the bucket into multiple pieces

We do want to break the bucket into multiple pieces, otherwise what's the point ? (edit: we just don't want to allow interleaved bucket serialization per SliceSnapshot)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do want to break the bucket into multiple pieces, otherwise what's the point ?

We want to break entries, not buckets. Breaking on a bucket granularity isn't enough.
Of course the challenge is no to have interleaved entries in the stream.

PrimeTable* table = db_slice_->GetTables(0).first;

if (const PrimeTable::bucket_iterator* bit = req.update()) {
Expand Down
2 changes: 2 additions & 0 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ class RestoreStreamer : public JournalStreamer {
cluster::SlotSet my_slots_;
bool fiber_cancelled_ = false;
bool snapshot_finished_ = false;

util::fb2::Mutex bucket_ser_mu_;
};

} // namespace dfly
46 changes: 29 additions & 17 deletions src/server/snapshot.cc
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that in addition to snapshot.cc, we also have streamer.cc which has some very similar logic.
I would say that it's not worth modifying, as it's way more rarely used (only during slot migrations), but the thing is, it must support blocking as well, as the underlying value serialization will block :|

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <absl/strings/match.h>
#include <absl/strings/str_cat.h>

#include <mutex>

#include "base/logging.h"
#include "core/heap_size.h"
#include "server/db_slice.h"
Expand All @@ -16,6 +18,7 @@
#include "server/rdb_extensions.h"
#include "server/rdb_save.h"
#include "server/tiered_storage.h"
#include "util/fibers/synchronization.h"

namespace dfly {

Expand Down Expand Up @@ -235,16 +238,27 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn
}

bool SliceSnapshot::BucketSaveCb(PrimeIterator it) {
++stats_.savecb_calls;

uint64_t v = it.GetVersion();
if (v >= snapshot_version_) {
// either has been already serialized or added after snapshotting started.
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id()
<< " at " << v;
++stats_.skipped;
return false;
// We need to block if serialization is in progress
{
std::unique_lock<util::fb2::Mutex> lk(bucket_ser_mu_);
++stats_.savecb_calls;

auto check = [&](auto v) {
if (v >= snapshot_version_) {
// either has been already serialized or added after snapshotting started.
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id()
<< " at " << v;
++stats_.skipped;
return false;
}
return true;
kostasrim marked this conversation as resolved.
Show resolved Hide resolved
};

uint64_t v = it.GetVersion();
if (!check(v))
return false;
}

db_slice_->FlushChangeToEarlierCallbacks(current_db_, DbSlice::Iterator::FromPrime(it),
snapshot_version_);

Expand All @@ -253,12 +267,8 @@ bool SliceSnapshot::BucketSaveCb(PrimeIterator it) {
}

unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_iterator it) {
// Must be atomic because after after we call it.snapshot_version_ we're starting
// to send incremental updates instead of serializing the whole bucket: We must not
// send the update until the initial SerializeBucket is called.
// Relying on the atomicity of SerializeBucket is Ok here because only one thread may handle this
// bucket.
FiberAtomicGuard fg;
std::unique_lock<util::fb2::Mutex> lk(bucket_ser_mu_);

DCHECK_LT(it.GetVersion(), snapshot_version_);

// traverse physical bucket and write it into string file.
Expand All @@ -268,6 +278,7 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite

while (!it.is_done()) {
++result;
// might yield
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get());
++it;
}
Expand Down Expand Up @@ -330,10 +341,11 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) {
}

void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
FiberAtomicGuard fg;
{ std::unique_lock<util::fb2::Mutex> lk(bucket_ser_mu_); }
PrimeTable* table = db_slice_->GetTables(db_index).first;
const PrimeTable::bucket_iterator* bit = req.update();

if (const PrimeTable::bucket_iterator* bit = req.update()) {
if (bit) {
if (bit->GetVersion() < snapshot_version_) {
stats_.side_saved += SerializeBucket(db_index, *bit);
}
Expand Down
3 changes: 3 additions & 0 deletions src/server/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "server/rdb_save.h"
#include "server/table.h"
#include "util/fibers/future.h"
#include "util/fibers/synchronization.h"

namespace dfly {

Expand Down Expand Up @@ -171,6 +172,8 @@ class SliceSnapshot {
size_t savecb_calls = 0;
size_t keys_total = 0;
} stats_;

util::fb2::Mutex bucket_ser_mu_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have different mutex one for snapshot and one for cluster, I think this is a problem.
If we have a snapshot created at the same time there is a migration created and we yield in the bucket serialization

};

} // namespace dfly
Loading