Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "cluster/producer_state_manager.h"
#include "cluster/rm_stm_types.h"
#include "cluster/snapshot.h"
#include "cluster/tx_errc.h"
#include "cluster/tx_gateway_frontend.h"
#include "cluster/types.h"
#include "container/chunked_hash_map.h"
Expand Down Expand Up @@ -1632,15 +1633,25 @@ void rm_stm::maybe_rearm_autoabort_timer(time_point_type deadline) {
}

ss::future<tx::errc> rm_stm::abort_all_txes() {
static constexpr uint max_concurrency = 5u;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to get @bashtanov's thoughts on this. This was originally added for migration purposes, but I vaguely recall a discussion in the PR about the potential unsafe iteration (no synchronization when this method is called). I reviewed that PR but its not the same PR that added this method. Alexey, do you remember anything about it? I'm not able to find that discussion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found the discussion

#26380 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark_expired holds a lock on every individual invocation, but is it not possible to have an inter-leafed modification to the list in-between invocations of mark_expired

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya for sure (that was the original concern).. but not sure what the conclusion of the thread was (I went silent :)). The fix makes sense but I'd like to check with @bashtanov incase we are overlooking something.

if (!co_await sync(_sync_timeout())) {
co_return tx::errc::stale;
}

tx::errc last_err = tx::errc::none;

// snap the intrusive list produced_ids before yielding the cpu
chunked_vector<model::producer_identity> producer_ids_to_expire{
std::from_range,
std::ranges::views::transform(
_active_tx_producers,
[](const auto& producer) { return producer.id(); })};

co_await ss::max_concurrent_for_each(
_active_tx_producers, 5, [this, &last_err](const auto& producer) {
return mark_expired(producer.id()).then([&last_err](tx::errc res) {
std::move(producer_ids_to_expire),
max_concurrency,
[this, &last_err](const auto producer_id) {
return mark_expired(producer_id).then([&last_err](tx::errc res) {
Comment on lines 1641 to +1654
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is dangerous to suspend while looping over intrusive lists

does this PR fix a real bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not one that we've seen strike

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But is there a bug? It's still a bug if you've never seen a crash, but can describe how it might occur. Merely saying "it is dangerous" isn't quite enough?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm_stm::abort_all_txes does not hold any lock itself, and its caller holds partition produce lock. So rm_stm::reset_procucers() can run concurrently and invalidate iterators.

if (res != tx::errc::none) {
last_err = res;
}
Expand Down