-
Notifications
You must be signed in to change notification settings - Fork 883
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: yield when serialization is in progress #3220
base: main
Are you sure you want to change the base?
Conversation
src/server/snapshot.h
Outdated
@@ -171,6 +171,8 @@ class SliceSnapshot { | |||
size_t savecb_calls = 0; | |||
size_t keys_total = 0; | |||
} stats_; | |||
|
|||
bool bucket_ser_in_progress_ = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove this, it suffices to use the other bool above
src/server/snapshot.cc
Outdated
} | ||
|
||
// If a bucket is being serialized we need to block | ||
// maybe worth adding it on an event queue? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can notify and wait on a queue here, but is it really worth it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the upside of using a queue instead of just blocking?
I don't understand your proposal, sorry
src/server/snapshot.cc
Outdated
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get()); | ||
++it; | ||
} | ||
og.SetVersion(snapshot_version_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not bump the version before we yield because other fibers might check against the version and decide to move on instead of block and cause a bucket split
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is interesting on its own, as:
other fibers might check against the version and decide to move on instead of block and cause a bucket split
If that other fiber is older, it would have been called by now (we call by the order of snapshot versions), and if it's newer it wouldn't matter if the value is snapshot_version_
or smaller (it can't be bigger). Furthermore, we shouldn't really get to the point in parallel in multiple fibers because we have the above (to-be-changed) polling.
Furthermore^2, if you set it to be after, we could actually serialize the same key twice in a rare case in which we call OnDbChange
, block, but the Traverse()
fiber continues and reaches this bucket. This will generate an invalid RDB, which Dragonfly will reject loading.
src/server/snapshot.cc
Outdated
return false; | ||
|
||
while (bucket_ser_in_progress_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will block all operations on the thread, even actions on different buckets. Effectively this means if the bucket is not serialized and there is serialization in progress. block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
definitely do not actively poll. this will consume 100% CPU, causing alerts and such, especially on huge entries...
@chakaz draft so we can discuss :) |
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id() | ||
<< " at " << v; | ||
++stats_.skipped; | ||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intuition is that check()
should return true
if the check passes (== all ok), i.e. the opposite of the logic, but that's a nit
src/server/snapshot.cc
Outdated
// Must be atomic because after we call it.snapshot_version_ we're starting | ||
// to send incremental updates instead of serializing the whole bucket: We must not | ||
// send the update until the initial SerializeBucket is called. | ||
// Relying on the atomicity of SerializeBucket is Ok here because only one thread may handle this | ||
// bucket. | ||
FiberAtomicGuard fg; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the whole point of this PR to remove the atomicity requirement?
I.e. instead of fixing the typo in the comment, you should remove it (as well as the FiberAtomicGuard
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't remove it since we (yet) don't yield. We will yield when we actually start serializing in chunks but I am happy to remove this on this PR -- no objections
src/server/snapshot.cc
Outdated
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get()); | ||
++it; | ||
} | ||
og.SetVersion(snapshot_version_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is interesting on its own, as:
other fibers might check against the version and decide to move on instead of block and cause a bucket split
If that other fiber is older, it would have been called by now (we call by the order of snapshot versions), and if it's newer it wouldn't matter if the value is snapshot_version_
or smaller (it can't be bigger). Furthermore, we shouldn't really get to the point in parallel in multiple fibers because we have the above (to-be-changed) polling.
Furthermore^2, if you set it to be after, we could actually serialize the same key twice in a rare case in which we call OnDbChange
, block, but the Traverse()
fiber continues and reaches this bucket. This will generate an invalid RDB, which Dragonfly will reject loading.
src/server/snapshot.cc
Outdated
} | ||
|
||
// If a bucket is being serialized we need to block | ||
// maybe worth adding it on an event queue? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the upside of using a queue instead of just blocking?
I don't understand your proposal, sorry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that in addition to snapshot.cc
, we also have streamer.cc
which has some very similar logic.
I would say that it's not worth modifying, as it's way more rarely used (only during slot migrations), but the thing is, it must support blocking as well, as the underlying value serialization will block :|
src/server/snapshot.cc
Outdated
// to send incremental updates instead of serializing the whole bucket: We must not | ||
// send the update until the initial SerializeBucket is called. | ||
// Relying on the atomicity of SerializeBucket is Ok here because only one thread may handle this | ||
// bucket. | ||
FiberAtomicGuard fg; | ||
DCHECK_LT(it.GetVersion(), snapshot_version_); | ||
|
||
bucket_ser_in_progress_ = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this boolean is the same as serialize_bucket_running_
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes yes, this was for testing, it's a draft PR I will clean this up -- I wanted to discuss it with Shahar
src/server/snapshot.cc
Outdated
// maybe worth adding it on an event queue? | ||
while (bucket_ser_in_progress_) { | ||
ThisFiber::Yield(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it could be that the bucket was already serialized by the time you run this was sleeping. i.e snapshot version for this bucket was update. you need to check it again after the sleep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch!
@@ -312,7 +310,7 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { | |||
void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { | |||
DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0"; | |||
|
|||
FiberAtomicGuard fg; | |||
{ std::unique_lock<util::fb2::Mutex> lk(bucket_ser_mu_); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you put it on its own scope? It will be freed immediately after, which will mean that other OnDbChange
callbacks could run in parallel, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so this is the case. Once it acquires the lock, it will drop it immediately but it won't preempt
. It will preempt
potentially when we call WriteBucket
. All the steps between dropping the lock on line 313 and calling WriteBucket
are atomic, there is no preemption. However, when we call WriteBucket
we will reqacquire the lock on the same fiber before we preempt
. Think of this {}
as transferring ownership of the lock without explicitly doing it (e.g. lk.unlock()
before we call WriteBucket
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some thoughts on what you wrote:
- Is what you wrote also true for
CVCUponInsert()
? I see that it uses a callback. I think it's true, but you should verify if we're counting on that - I know that "regular" mutexes (i.e. non fiber ones) do not guarantee that, even if the lock is unlocked, it will not preempt to other threads. I don't know if that's true here, but I guess it is. In other words, this may not guarantee a mutex transfer without yielding in between
- I understand that
WriteBucket()
locks this lock, so I don't see any benefit to lock (and immediately release) here as well. Why even do that? - There might be a subtle bug here:
CVCUponInsert()
might callWriteBucket()
multiple times (over multiple buckets). If we lose the mutex to some other operation, which touches that other bucket, it might change the underlying structure causing us to lose some changes
I'd recommend adding a WriteBucketNoLock()
method which will be called here and in WriteBucket()
. Bonus points if you add ABSL_LOCKS_EXCLUDED
and ABSL_LOCKS_REQUIRED
annotations to let the compiler verify we don't create deadlocks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also dont see why we need to lock and release here, as if we lock in WriteBucket this should be enough.
@chakaz Regarding the comment that CVCUponInsert calls WriteBucket multiple times, I think that ok.. because what we care is that we dont break the bucket into multiple pieces but I dont think we should care if CVCUponInsert serializes one bucket we will preempt , serialize onother bucket from different flow and than go back to continue serialize the next buckets from CVCUponInsert. Is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we do not need the lock OnDbChange
that was a mistake.
I know that "regular" mutexes (i.e. non fiber ones) do not guarantee that, even if the lock is unlocked, it will not preempt to other threads. I don't know if that's true here, but I guess it is. In other words, this may not guarantee a mutex transfer without yielding in between
Regular mutexes behave differently when notifying other threads to make progress. If this code was running on thread a but wakes up thread b and releases the mutex then it's a race for which thread locks the mutex. This is not true for fibers:
90 void Mutex::unlock() {
91 detail::FiberInterface* active = detail::FiberActive();
92
93 unique_lock lk(wait_queue_splk_);
94 CHECK(owner_ == active);
95 owner_ = nullptr;
96
97 wait_queue_.NotifyOne(active);
98 }
And if you follow NotifyOne
it calls NotifyImpl
which calls ActivateOther
116
117 // Schedules another fiber without switching to it.
118 // other can belong to another thread.
119 void ActivateOther(FiberInterface* other);
120
The gist here is a) That ActivateOther adds the fiber to the ready queue b) that in our case all the interesting fibers (the fiber that runs OnDbChange && IterateFB) that use this mutex run on the same thread
, this is both true for SaveStagesController && FullSyncFb for replication (on RDB version they do push on the same serialization channel which is fine since we don’t (yet until phase 2) split individual slots.
+1 for the rest
@adiholden won't happen, when we release the lock we continue running on the same fiber, we don't switch.
Plz let me know if I missed anything
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because what we care is that we dont break the bucket into multiple pieces
We do want to break the bucket into multiple pieces, otherwise what's the point ? (edit: we just don't want to allow interleaved bucket serialization per SliceSnapshot)
@@ -312,7 +310,7 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { | |||
void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { | |||
DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0"; | |||
|
|||
FiberAtomicGuard fg; | |||
{ std::unique_lock<util::fb2::Mutex> lk(bucket_ser_mu_); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also dont see why we need to lock and release here, as if we lock in WriteBucket this should be enough.
@chakaz Regarding the comment that CVCUponInsert calls WriteBucket multiple times, I think that ok.. because what we care is that we dont break the bucket into multiple pieces but I dont think we should care if CVCUponInsert serializes one bucket we will preempt , serialize onother bucket from different flow and than go back to continue serialize the next buckets from CVCUponInsert. Is that right?
@@ -171,6 +172,8 @@ class SliceSnapshot { | |||
size_t savecb_calls = 0; | |||
size_t keys_total = 0; | |||
} stats_; | |||
|
|||
util::fb2::Mutex bucket_ser_mu_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you have different mutex one for snapshot and one for cluster, I think this is a problem.
If we have a snapshot created at the same time there is a migration created and we yield in the bucket serialization
No description provided.