From ed0a9f8ab4f86fba8ce251baf31b0ac673f54271 Mon Sep 17 00:00:00 2001 From: Alexey Bashtanov Date: Fri, 27 Sep 2024 16:41:54 +0100 Subject: [PATCH 1/3] c/migrations: fix tstate lifetime in backend::reconcile_migration It was shorter than the one of the coroutine using it. --- src/v/cluster/data_migration_backend.cc | 26 +++++++++++++++++-------- src/v/cluster/data_migration_backend.h | 7 +++++++ 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/v/cluster/data_migration_backend.cc b/src/v/cluster/data_migration_backend.cc index 6a364a1602e84..e70a0fa4f5432 100644 --- a/src/v/cluster/data_migration_backend.cc +++ b/src/v/cluster/data_migration_backend.cc @@ -830,7 +830,8 @@ ss::future<> backend::handle_raft0_leadership_update() { // start coordinating for (auto& [id, mrstate] : _migration_states) { for (auto& [nt, tstate] : mrstate.outstanding_topics) { - co_await reconcile_topic(nt, tstate, id, mrstate.scope, false); + co_await reconcile_existing_topic( + nt, tstate, id, mrstate.scope, false); } } // resend advance requests @@ -949,7 +950,8 @@ ss::future<> backend::process_delta(cluster::topic_table_delta&& delta) { tstate.clear(); // We potentially re-enqueue an already coordinated partition here. // The first RPC reply will clear it. - co_await reconcile_topic(nt, tstate, migration_id, mrstate.scope, false); + co_await reconcile_existing_topic( + nt, tstate, migration_id, mrstate.scope, false); // local partition work if (has_local_replica(delta.ntp)) { @@ -1148,7 +1150,7 @@ ss::future<> backend::drop_migration_reconciliation_rstate( _migration_states.erase(rs_it); } -ss::future<> backend::reconcile_topic( +ss::future<> backend::reconcile_existing_topic( const model::topic_namespace& nt, topic_reconciliation_state& tstate, id migration, @@ -1269,17 +1271,25 @@ ss::future<> backend::reconcile_migration( enumerated_nts, [this, &metadata, &mrstate](const auto& idx_nt) { auto& [idx, nt] = idx_nt; - auto& tstate = mrstate.outstanding_topics[nt]; - tstate.idx_in_migration = idx; - _topic_migration_map.emplace(nt, metadata.id); - return reconcile_topic( - nt, tstate, metadata.id, mrstate.scope, true); + return reconcile_topic(metadata.id, idx, nt, mrstate); }); }); }, metadata.migration); } +ss::future<> backend::reconcile_topic( + const id migration_id, + size_t idx_in_migration, + const model::topic_namespace& nt, + migration_reconciliation_state& mrstate) { + auto& tstate = mrstate.outstanding_topics[nt]; + tstate.idx_in_migration = idx_in_migration; + _topic_migration_map.emplace(nt, migration_id); + co_return co_await reconcile_existing_topic( + nt, tstate, migration_id, mrstate.scope, true); +} + std::optional> backend::get_replica_work_state(const model::ntp& ntp) { model::topic_namespace nt{ntp.ns, ntp.tp.topic}; diff --git a/src/v/cluster/data_migration_backend.h b/src/v/cluster/data_migration_backend.h index 87bf49b8ca95e..8126706d7ecea 100644 --- a/src/v/cluster/data_migration_backend.h +++ b/src/v/cluster/data_migration_backend.h @@ -204,6 +204,13 @@ class backend { // call only with _mutex lock grabbed ss::future<> reconcile_topic( + const id migration_id, + size_t idx_in_migration, + const model::topic_namespace& nt, + migration_reconciliation_state& mrstate); + + // call only with _mutex lock grabbed + ss::future<> reconcile_existing_topic( const model::topic_namespace& nt, topic_reconciliation_state& tstate, id migration, From 8565c73a575fac4d295a19288c979056e272936c Mon Sep 17 00:00:00 2001 From: Alexey Bashtanov Date: Fri, 27 Sep 2024 16:44:03 +0100 Subject: [PATCH 2/3] c/migrations: fix memory management in handle_migration_update copy migration definition from migrations table, as it may disappear --- src/v/cluster/data_migration_backend.cc | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/v/cluster/data_migration_backend.cc b/src/v/cluster/data_migration_backend.cc index e70a0fa4f5432..dab8269c53cad 100644 --- a/src/v/cluster/data_migration_backend.cc +++ b/src/v/cluster/data_migration_backend.cc @@ -860,15 +860,19 @@ ss::future<> backend::handle_migration_update(id id) { auto units = co_await _mutex.get_units(_as); vlog(dm_log.debug, "lock acquired for data migration {} notification", id); - auto new_maybe_metadata = _table.get_migration(id); - auto new_state = new_maybe_metadata ? std::make_optional( - new_maybe_metadata->get().state) - : std::nullopt; + auto new_ref = _table.get_migration(id); + // copying as it may go from the table on scheduling points + auto new_metadata = new_ref ? std::make_optional( + new_ref->get().copy()) + : std::nullopt; + auto new_state = new_metadata + ? std::make_optional(new_metadata->state) + : std::nullopt; vlog(dm_log.debug, "migration {} new state is {}", id, new_state); work_scope new_scope; - if (new_maybe_metadata) { - new_scope = get_work_scope(new_maybe_metadata->get()); + if (new_metadata) { + new_scope = get_work_scope(*new_metadata); } // forget about the migration if it went forward or is gone @@ -900,8 +904,7 @@ ss::future<> backend::handle_migration_update(id id) { vlog(dm_log.debug, "creating migration {} reconciliation state", id); auto new_it = _migration_states.emplace_hint(old_it, id, new_scope); if (new_scope.topic_work_needed || new_scope.partition_work_needed) { - co_await reconcile_migration( - new_it->second, new_maybe_metadata->get()); + co_await reconcile_migration(new_it->second, *new_metadata); } else { // yes it is done as there is nothing to do to_advance_if_done(new_it); From ec5538d640a375834889db93dd2c0efab510adde Mon Sep 17 00:00:00 2001 From: Alexey Bashtanov Date: Fri, 27 Sep 2024 16:46:29 +0100 Subject: [PATCH 3/3] c/migrations: memory management fixes - do not use ssx::async_for_each for coroutines - extend container lifetime for ss::do_for_each using ss::do_with --- src/v/cluster/data_migration_backend.cc | 32 ++++++++++++------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/v/cluster/data_migration_backend.cc b/src/v/cluster/data_migration_backend.cc index dab8269c53cad..a0b54218aff46 100644 --- a/src/v/cluster/data_migration_backend.cc +++ b/src/v/cluster/data_migration_backend.cc @@ -307,11 +307,10 @@ ss::future<> backend::work_once() { _nodes_to_retry.erase(node_id); co_await send_rpc(node_id); } - co_await ssx::async_for_each( - to_schedule_topic_work, [this](const auto& nt) { - _topic_work_to_retry.erase(nt); - return schedule_topic_work(nt); - }); + for (const auto& nt : to_schedule_topic_work) { + _topic_work_to_retry.erase(nt); + co_await schedule_topic_work(nt); + } spawn_advances(); if (next_tick == model::timeout_clock::time_point::max()) { _timer.cancel(); @@ -1261,20 +1260,21 @@ ss::future<> backend::reconcile_migration( metadata.id, mrstate.scope.sought_state); co_await std::visit( - [this, &metadata, &mrstate](const auto& migration) mutable { + [this, migration_id = metadata.id, &mrstate]( + const auto& migration) mutable { return ss::do_with( - migration.topic_nts(), - [this, &metadata, &mrstate](const auto& nts) { - // poor man's `nts | std::views::enumerate` - auto enumerated_nts = std::views::transform( - nts, [index = -1](const auto& nt) mutable { - return std::forward_as_tuple(++index, nt); - }); - return ssx::async_for_each( + // poor man's `migration.topic_nts() | std::views::enumerate` + std::views::transform( + migration.topic_nts(), + [index = -1](const auto& nt) mutable { + return std::forward_as_tuple(++index, nt); + }), + [this, migration_id, &mrstate](auto& enumerated_nts) { + return ss::do_for_each( enumerated_nts, - [this, &metadata, &mrstate](const auto& idx_nt) { + [this, migration_id, &mrstate](const auto& idx_nt) { auto& [idx, nt] = idx_nt; - return reconcile_topic(metadata.id, idx, nt, mrstate); + return reconcile_topic(migration_id, idx, nt, mrstate); }); }); },