Skip to content

Commit

Permalink
Merge pull request #23556 from bashtanov/migrations-memory-bugfixes
Browse files Browse the repository at this point in the history
Migrations memory bugfixes
  • Loading branch information
mmaslankaprv authored Oct 2, 2024
2 parents 83f80f1 + ec5538d commit 97b9e0d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 31 deletions.
75 changes: 44 additions & 31 deletions src/v/cluster/data_migration_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,10 @@ ss::future<> backend::work_once() {
_nodes_to_retry.erase(node_id);
co_await send_rpc(node_id);
}
co_await ssx::async_for_each(
to_schedule_topic_work, [this](const auto& nt) {
_topic_work_to_retry.erase(nt);
return schedule_topic_work(nt);
});
for (const auto& nt : to_schedule_topic_work) {
_topic_work_to_retry.erase(nt);
co_await schedule_topic_work(nt);
}
spawn_advances();
if (next_tick == model::timeout_clock::time_point::max()) {
_timer.cancel();
Expand Down Expand Up @@ -830,7 +829,8 @@ ss::future<> backend::handle_raft0_leadership_update() {
// start coordinating
for (auto& [id, mrstate] : _migration_states) {
for (auto& [nt, tstate] : mrstate.outstanding_topics) {
co_await reconcile_topic(nt, tstate, id, mrstate.scope, false);
co_await reconcile_existing_topic(
nt, tstate, id, mrstate.scope, false);
}
}
// resend advance requests
Expand Down Expand Up @@ -859,15 +859,19 @@ ss::future<> backend::handle_migration_update(id id) {
auto units = co_await _mutex.get_units(_as);
vlog(dm_log.debug, "lock acquired for data migration {} notification", id);

auto new_maybe_metadata = _table.get_migration(id);
auto new_state = new_maybe_metadata ? std::make_optional<state>(
new_maybe_metadata->get().state)
: std::nullopt;
auto new_ref = _table.get_migration(id);
// copying as it may go from the table on scheduling points
auto new_metadata = new_ref ? std::make_optional<migration_metadata>(
new_ref->get().copy())
: std::nullopt;
auto new_state = new_metadata
? std::make_optional<state>(new_metadata->state)
: std::nullopt;
vlog(dm_log.debug, "migration {} new state is {}", id, new_state);

work_scope new_scope;
if (new_maybe_metadata) {
new_scope = get_work_scope(new_maybe_metadata->get());
if (new_metadata) {
new_scope = get_work_scope(*new_metadata);
}

// forget about the migration if it went forward or is gone
Expand Down Expand Up @@ -899,8 +903,7 @@ ss::future<> backend::handle_migration_update(id id) {
vlog(dm_log.debug, "creating migration {} reconciliation state", id);
auto new_it = _migration_states.emplace_hint(old_it, id, new_scope);
if (new_scope.topic_work_needed || new_scope.partition_work_needed) {
co_await reconcile_migration(
new_it->second, new_maybe_metadata->get());
co_await reconcile_migration(new_it->second, *new_metadata);
} else {
// yes it is done as there is nothing to do
to_advance_if_done(new_it);
Expand Down Expand Up @@ -949,7 +952,8 @@ ss::future<> backend::process_delta(cluster::topic_table_delta&& delta) {
tstate.clear();
// We potentially re-enqueue an already coordinated partition here.
// The first RPC reply will clear it.
co_await reconcile_topic(nt, tstate, migration_id, mrstate.scope, false);
co_await reconcile_existing_topic(
nt, tstate, migration_id, mrstate.scope, false);

// local partition work
if (has_local_replica(delta.ntp)) {
Expand Down Expand Up @@ -1148,7 +1152,7 @@ ss::future<> backend::drop_migration_reconciliation_rstate(
_migration_states.erase(rs_it);
}

ss::future<> backend::reconcile_topic(
ss::future<> backend::reconcile_existing_topic(
const model::topic_namespace& nt,
topic_reconciliation_state& tstate,
id migration,
Expand Down Expand Up @@ -1256,30 +1260,39 @@ ss::future<> backend::reconcile_migration(
metadata.id,
mrstate.scope.sought_state);
co_await std::visit(
[this, &metadata, &mrstate](const auto& migration) mutable {
[this, migration_id = metadata.id, &mrstate](
const auto& migration) mutable {
return ss::do_with(
migration.topic_nts(),
[this, &metadata, &mrstate](const auto& nts) {
// poor man's `nts | std::views::enumerate`
auto enumerated_nts = std::views::transform(
nts, [index = -1](const auto& nt) mutable {
return std::forward_as_tuple(++index, nt);
});
return ssx::async_for_each(
// poor man's `migration.topic_nts() | std::views::enumerate`
std::views::transform(
migration.topic_nts(),
[index = -1](const auto& nt) mutable {
return std::forward_as_tuple(++index, nt);
}),
[this, migration_id, &mrstate](auto& enumerated_nts) {
return ss::do_for_each(
enumerated_nts,
[this, &metadata, &mrstate](const auto& idx_nt) {
[this, migration_id, &mrstate](const auto& idx_nt) {
auto& [idx, nt] = idx_nt;
auto& tstate = mrstate.outstanding_topics[nt];
tstate.idx_in_migration = idx;
_topic_migration_map.emplace(nt, metadata.id);
return reconcile_topic(
nt, tstate, metadata.id, mrstate.scope, true);
return reconcile_topic(migration_id, idx, nt, mrstate);
});
});
},
metadata.migration);
}

ss::future<> backend::reconcile_topic(
const id migration_id,
size_t idx_in_migration,
const model::topic_namespace& nt,
migration_reconciliation_state& mrstate) {
auto& tstate = mrstate.outstanding_topics[nt];
tstate.idx_in_migration = idx_in_migration;
_topic_migration_map.emplace(nt, migration_id);
co_return co_await reconcile_existing_topic(
nt, tstate, migration_id, mrstate.scope, true);
}

std::optional<std::reference_wrapper<backend::replica_work_state>>
backend::get_replica_work_state(const model::ntp& ntp) {
model::topic_namespace nt{ntp.ns, ntp.tp.topic};
Expand Down
7 changes: 7 additions & 0 deletions src/v/cluster/data_migration_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ class backend {

// call only with _mutex lock grabbed
ss::future<> reconcile_topic(
const id migration_id,
size_t idx_in_migration,
const model::topic_namespace& nt,
migration_reconciliation_state& mrstate);

// call only with _mutex lock grabbed
ss::future<> reconcile_existing_topic(
const model::topic_namespace& nt,
topic_reconciliation_state& tstate,
id migration,
Expand Down

0 comments on commit 97b9e0d

Please sign in to comment.