-
Notifications
You must be signed in to change notification settings - Fork 678
rm_stm: async loop over intrusive list fix #27694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rm_stm: async loop over intrusive list fix #27694
Conversation
co_await ss::max_concurrent_for_each( | ||
_active_tx_producers, 5, [this, &last_err](const auto& producer) { | ||
return mark_expired(producer.id()).then([&last_err](tx::errc res) { | ||
std::move(producer_ids_to_expire), | ||
max_concurrency, | ||
[this, &last_err](const auto producer_id) { | ||
return mark_expired(producer_id).then([&last_err](tx::errc res) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is dangerous to suspend while looping over intrusive lists
does this PR fix a real bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not one that we've seen strike
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But is there a bug? It's still a bug if you've never seen a crash, but can describe how it might occur. Merely saying "it is dangerous" isn't quite enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm_stm::abort_all_txes
does not hold any lock itself, and its caller holds partition produce lock. So rm_stm::reset_procucers()
can run concurrently and invalidate iterators.
CI test resultstest results on build#72792
test results on build#72980
|
Could this be equivalently safe with co_await ss::max_concurrent_for_each(
_active_tx_producers,
5,
[this, &last_err](const producer_state& producer) {
if (!producer._active_transaction_hook.is_linked()) {
return ss::now();
}
return mark_expired(producer.id()).then([&last_err](tx::errc res) {
if (res != tx::errc::none) {
last_err = res;
}
});
}); |
} | ||
|
||
ss::future<tx::errc> rm_stm::abort_all_txes() { | ||
static constexpr uint max_concurrency = 5u; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to get @bashtanov's thoughts on this. This was originally added for migration purposes, but I vaguely recall a discussion in the PR about the potential unsafe iteration (no synchronization when this method is called). I reviewed that PR but its not the same PR that added this method. Alexey, do you remember anything about it? I'm not able to find that discussion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found the discussion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mark_expired holds a lock on every individual invocation, but is it not possible to have an inter-leafed modification to the list in-between invocations of mark_expired
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ya for sure (that was the original concern).. but not sure what the conclusion of the thread was (I went silent :)). The fix makes sense but I'd like to check with @bashtanov incase we are overlooking something.
I don't agree. |
src/v/cluster/rm_stm.cc
Outdated
tx::errc last_err = tx::errc::none; | ||
|
||
// snap the intrusive list produced_ids before yielding the cpu | ||
chunked_vector<model::producer_identity> producer_ids_to_expire; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should just change the type of _active_tx_producers
container @bharathv ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
umm would that help? Any container is prone to iterator invalidation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh no i just meant if we are going ot make a copy of the container to iterate over anyway then we could align the data structure types. you're right im not suggesting magic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC my reasoning was that the function is only called when the partition is made read-only, so no further control batches can be applied and non individual erases can happen. reset_producers()
, which erases the whole container, however, can still be called, so your concern is valid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
intrusive list is still offering value here, we're not copying the list members (which are quite large afaict) only the ids (int64+int16)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sgtm!
src/v/cluster/rm_stm.cc
Outdated
tx::errc last_err = tx::errc::none; | ||
|
||
// snap the intrusive list produced_ids before yielding the cpu | ||
chunked_vector<model::producer_identity> producer_ids_to_expire; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC my reasoning was that the function is only called when the partition is made read-only, so no further control batches can be applied and non individual erases can happen. reset_producers()
, which erases the whole container, however, can still be called, so your concern is valid.
src/v/cluster/rm_stm.cc
Outdated
producer_ids_to_expire.reserve(_active_tx_producers.size()); | ||
std::ranges::transform( | ||
_active_tx_producers, | ||
std::back_inserter(producer_ids_to_expire), | ||
[](const auto& producer) { return producer.id(); }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can use something like
chunked_vector<model::producer_identity> producer_ids_to_expire{
std::from_range,
_active_tx_producers | std::views::transform([](const auto& producer) {
return producer.id();
})};
It'll reserve automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats awesome, thanks
f2164ab
to
7acd494
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes a race condition in the abort_all_txes()
function by preventing iterator invalidation when looping over an intrusive list. The fix ensures that asynchronous operations don't corrupt the intrusive list iteration by snapshotting producer identities before any suspension points.
Key changes:
- Snapshot producer identities from the intrusive list into an owning collection before yielding CPU
- Use the snapshotted collection for asynchronous transaction expiration operations
- Add necessary header include for transaction error codes
It is dangerous to suspend while looping over intrusive lists because destruction of the intrusive list element can invalidate an iterator on the intrusive list. Here, the resolution is to snapshot the producer_identities which need to be cancelled into an owning collection before crossing a suspension point. The snapshotted producer identities will subsequently be used to expire transactions, which is an asynchronous operation.
7acd494
to
d256479
Compare
Retry command for Build#72980please wait until all jobs are finished before running the slash command
|
/backport v25.2.x |
/backport v25.1.x |
/backport v24.3.x |
Failed to create a backport PR to v24.3.x branch. I tried:
|
Failed to create a backport PR to v25.1.x branch. I tried:
|
It is dangerous to suspend while looping over intrusive lists because destruction of the intrusive list element can invalidate an iterator on the intrusive list.
Here, the resolution is to snapshot the producer_identities which need to be cancelled into an owning collection before crossing a suspension point. The snapshotted producer identities will subsequently be used to expire transactions, which is an asynchronous operation.
Backports Required
Release Notes
Bug Fixes