Skip to content

Commit

Permalink
Merge pull request #23140 from bashtanov/migrations-infra-abc
Browse files Browse the repository at this point in the history
Migrations infra part 6: cancelations, retries, blocking reads
  • Loading branch information
bashtanov authored Sep 2, 2024
2 parents c8f4d7d + 54a837b commit 43eff9d
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 72 deletions.
6 changes: 3 additions & 3 deletions src/v/cluster/data_migrated_resources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ migrated_resource_state get_resource_state<inbound_migration>(state state) {
case state::preparing:
case state::prepared:
case state::canceling:
case state::cancelled:
case state::executing:
case state::executed:
case state::cut_over:
return migrated_resource_state::fully_blocked;
case state::finished:
case state::cancelled:
return migrated_resource_state::non_restricted;
}
}
Expand All @@ -63,15 +63,15 @@ migrated_resource_state get_resource_state<outbound_migration>(state state) {
case state::planned:
case state::preparing:
case state::prepared:
case state::canceling:
case state::cancelled:
return migrated_resource_state::metadata_locked;
case state::executing:
case state::executed:
case state::canceling:
return migrated_resource_state::read_only;
case state::cut_over:
return migrated_resource_state::fully_blocked;
case state::finished:
case state::cancelled:
return migrated_resource_state::non_restricted;
}
}
Expand Down
183 changes: 137 additions & 46 deletions src/v/cluster/data_migration_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <seastar/core/abort_source.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sleep.hh>

#include <chrono>
Expand Down Expand Up @@ -166,6 +167,7 @@ ss::future<> backend::start() {

ss::future<> backend::stop() {
vlog(dm_log.info, "backend stopping");
abort_all_topic_work();
_mutex.broken();
_sem.broken();
_timer.cancel();
Expand Down Expand Up @@ -430,8 +432,15 @@ ss::future<> backend::schedule_topic_work(model::topic_namespace nt) {

ss::future<backend::topic_work_result>
backend::do_topic_work(model::topic_namespace nt, topic_work tw) noexcept {
// todo: create an abort_source, save it in map, use in retry_loop, trigger
// where appropriate
auto [it, ins] = _active_topic_work_states.try_emplace(nt);
auto& tsws_lwptr_entry = it->second;
if (!ins) {
tsws_lwptr_entry->rcn.request_abort();
// and forget about it, only the dying fiber will hold it
}
auto tsws_lwptr = seastar::make_lw_shared<topic_scoped_work_state>();
tsws_lwptr_entry = tsws_lwptr;

errc ec;
try {
vlog(
Expand All @@ -441,8 +450,10 @@ backend::do_topic_work(model::topic_namespace nt, topic_work tw) noexcept {
nt,
tw.sought_state);
ec = co_await std::visit(
[this, &nt, &tw](const auto& info) {
return do_topic_work(nt, tw.sought_state, info);
[this, &nt, &tw, tsws_lwptr = std::move(tsws_lwptr)](
const auto& info) mutable {
return do_topic_work(
nt, tw.sought_state, info, std::move(tsws_lwptr));
},
tw.info);
vlog(
Expand All @@ -464,6 +475,7 @@ backend::do_topic_work(model::topic_namespace nt, topic_work tw) noexcept {
std::current_exception());
ec = errc::topic_operation_error;
}
_active_topic_work_states.erase(nt);
co_return topic_work_result{
.nt = std::move(nt),
.migration = tw.migration_id,
Expand All @@ -475,9 +487,9 @@ backend::do_topic_work(model::topic_namespace nt, topic_work tw) noexcept {
ss::future<errc> backend::do_topic_work(
const model::topic_namespace& nt,
state sought_state,
const inbound_topic_work_info& itwi) {
ss::abort_source as;
retry_chain_node rcn{as, ss::lowres_clock::time_point::max(), 2s};
const inbound_topic_work_info& itwi,
tsws_lwptr_t tsws) {
auto& rcn = tsws->rcn;
// this switch should be in accordance to the logic in get_work_scope
switch (sought_state) {
case state::prepared:
Expand All @@ -492,6 +504,21 @@ ss::future<errc> backend::do_topic_work(
co_return co_await retry_loop(
rcn, [this, &nt, &rcn] { return confirm_mount_topic(nt, rcn); });
}
case state::cancelled: {
// attempt to unmount first
auto unmount_res = co_await unmount_topic(nt, rcn);
if (unmount_res != errc::success) {
vlog(
dm_log.warn, "failed to unmount topic {}: {}", nt, unmount_res);
}
// drop topic in any case
auto drop_res = co_await delete_topic(nt, rcn);
if (drop_res != errc::success) {
vlog(dm_log.warn, "failed to drop topic {}: {}", nt, drop_res);
co_return drop_res;
}
co_return errc::success;
}
default:
vassert(
false,
Expand All @@ -504,28 +531,44 @@ ss::future<errc> backend::do_topic_work(
ss::future<errc> backend::do_topic_work(
const model::topic_namespace& nt,
state sought_state,
const outbound_topic_work_info&) {
// this assert is in accordance to the logic in get_work_scope
vassert(
sought_state == state::finished,
"only ->finished state transition requires topic work");
ss::abort_source as;
retry_chain_node rcn{as, ss::lowres_clock::time_point::max(), 2s};
// todo: unmount first
auto unmount_res = co_await retry_loop(
rcn, [this, &nt, &rcn] { return unmount_topic(nt, rcn); });
if (unmount_res == errc::topic_not_exists) {
vlog(dm_log.warn, "topic {} does not exist, ignoring", nt, unmount_res);
const outbound_topic_work_info&,
tsws_lwptr_t tsws) {
auto& rcn = tsws->rcn;
// this switch should be in accordance to the logic in get_work_scope
switch (sought_state) {
case state::finished: {
// unmount first
auto unmount_res = co_await unmount_topic(nt, rcn);
if (unmount_res != errc::success) {
vlog(
dm_log.warn, "failed to unmount topic {}: {}", nt, unmount_res);
co_return unmount_res;
}
// delete
co_return co_await delete_topic(nt, rcn);
}
case state::cancelled: {
// Noop, we have it here only because reconciliation logic requires
// either topic or partition work. The topic is unmounted and deleted in
// cut_over state, which cannot be cancelled. So if we are here we only
// need to lift topic restrictions, which is performed by
// migrated_resources.
co_return errc::success;
}
if (unmount_res != errc::success) {
vlog(dm_log.warn, "failed to unmount topic {}: {}", nt, unmount_res);
co_return unmount_res;
default:
vassert(
false,
"unknown topic work requested when transitioning outbound migration "
"state to {}",
sought_state);
}
co_return co_await retry_loop(rcn, [this, &nt, &rcn] {
return _topics_frontend.delete_topic_after_migration(
nt, rcn.get_deadline());
});
}

void backend::abort_all_topic_work() {
for (auto& [nt, tsws] : _active_topic_work_states) {
tsws->rcn.request_abort();
}
_active_topic_work_states.clear();
}

ss::future<errc> backend::create_topic(
Expand Down Expand Up @@ -648,11 +691,33 @@ ss::future<errc> backend::confirm_mount_topic(
co_return errc::topic_operation_error;
}

ss::future<errc>
backend::delete_topic(const model::topic_namespace& nt, retry_chain_node& rcn) {
return retry_loop(rcn, [this, &nt, &rcn]() {
return _topics_frontend
.delete_topic_after_migration(nt, rcn.get_deadline())
.then([&nt](errc ec) {
if (ec == errc::topic_not_exists) {
vlog(dm_log.warn, "topic {} missing, ignoring", nt);
return errc::success;
}
return ec;
});
});
}

ss::future<errc> backend::unmount_topic(
const model::topic_namespace& nt, retry_chain_node& rcn) {
return retry_loop(
rcn, [this, &nt, &rcn] { return do_unmount_topic(nt, rcn); });
}

ss::future<errc> backend::do_unmount_topic(
const model::topic_namespace& nt, retry_chain_node& rcn) {
auto cfg = _topic_table.get_topic_cfg(nt);
if (!cfg) {
co_return errc::topic_not_exists;
vlog(dm_log.warn, "topic {} missing, ignoring", nt);
co_return errc::success;
}
auto umnt_res = co_await _topic_mount_handler->unmount_topic(*cfg, rcn);
if (umnt_res == cloud_storage::topic_unmount_result::success) {
Expand Down Expand Up @@ -727,6 +792,8 @@ ss::future<> backend::handle_raft0_leadership_update() {
wakeup();
} else {
vlog(dm_log.debug, "stepping down as a coordinator");
// stop topic-scoped work
abort_all_topic_work();
// stop coordinating
for (auto& [id, mrstate] : _migration_states) {
co_await ssx::async_for_each(
Expand Down Expand Up @@ -761,12 +828,13 @@ ss::future<> backend::handle_migration_update(id id) {
"migration {} old sought state is {}",
id,
old_mrstate.scope.sought_state);
if (
!new_maybe_metadata || new_state >= old_mrstate.scope.sought_state) {
vlog(
dm_log.debug, "dropping migration {} reconciliation state", id);
drop_migration_reconciliation_rstate(old_it);
}
vassert(
!new_maybe_metadata || new_state >= old_mrstate.scope.sought_state,
"migration state went from seeking {} back to {}",
old_mrstate.scope.sought_state,
new_state);
vlog(dm_log.debug, "dropping migration {} reconciliation state", id);
drop_migration_reconciliation_rstate(old_it);
}
// create new state if needed
if (new_maybe_metadata) {
Expand Down Expand Up @@ -828,13 +896,13 @@ ss::future<> backend::process_delta(cluster::topic_table_delta&& delta) {

// local partition work
if (has_local_replica(delta.ntp)) {
_work_states[nt].try_emplace(
_local_work_states[nt].try_emplace(
delta.ntp.tp.partition,
migration_id,
*_migration_states.find(migration_id)->second.scope.sought_state);
} else {
auto topic_work_it = _work_states.find(nt);
if (topic_work_it != _work_states.end()) {
auto topic_work_it = _local_work_states.find(nt);
if (topic_work_it != _local_work_states.end()) {
auto& topic_work_state = topic_work_it->second;
auto rwstate_it = topic_work_state.find(delta.ntp.tp.partition);
if (rwstate_it != topic_work_state.end()) {
Expand All @@ -844,7 +912,7 @@ ss::future<> backend::process_delta(cluster::topic_table_delta&& delta) {
}
topic_work_state.erase(rwstate_it);
if (topic_work_state.empty()) {
_work_states.erase(topic_work_it);
_local_work_states.erase(topic_work_it);
}
}
}
Expand Down Expand Up @@ -998,7 +1066,14 @@ void backend::drop_migration_reconciliation_rstate(
const auto& topics = rs_it->second.outstanding_topics;
for (const auto& [nt, tstate] : topics) {
clear_tstate_belongings(nt, tstate);
_work_states.erase(nt);
_local_work_states.erase(nt);

auto it = _active_topic_work_states.find(nt);
if (it != _active_topic_work_states.end()) {
it->second->rcn.request_abort();
_active_topic_work_states.erase(it);
}

_topic_migration_map.erase(nt);
}
_migration_states.erase(rs_it);
Expand All @@ -1009,8 +1084,8 @@ ss::future<> backend::reconcile_topic(
topic_reconciliation_state& tstate,
id migration,
work_scope scope,
bool schedule_local_work) {
if (!schedule_local_work && !_is_coordinator) {
bool schedule_local_partition_work) {
if (!schedule_local_partition_work && !_is_coordinator) {
vlog(
dm_log.debug,
"not tracking topic {} transition towards state {} as part of "
Expand All @@ -1027,15 +1102,20 @@ ss::future<> backend::reconcile_topic(
nt,
scope.sought_state,
migration,
schedule_local_work,
schedule_local_partition_work,
_is_coordinator);
auto now = model::timeout_clock::now();
if (scope.partition_work_needed) {
if (auto maybe_assignments = _topic_table.get_topic_assignments(nt)) {
co_await ssx::async_for_each(
*maybe_assignments | std::views::values,
[this, nt, &tstate, scope, migration, now, schedule_local_work](
const auto& assignment) {
[this,
nt,
&tstate,
scope,
migration,
now,
schedule_local_partition_work](const auto& assignment) {
model::ntp ntp{nt.ns, nt.tp, assignment.id};
auto nodes = assignment.replicas
| std::views::transform(
Expand Down Expand Up @@ -1067,7 +1147,7 @@ ss::future<> backend::reconcile_topic(
it->second);
_nodes_to_retry.insert_or_assign(node_id, now);
}
if (schedule_local_work && _self == node_id) {
if (schedule_local_partition_work && _self == node_id) {
vlog(
dm_log.debug,
"tracking ntp {} transition towards state {} as "
Expand All @@ -1077,7 +1157,7 @@ ss::future<> backend::reconcile_topic(
ntp,
scope.sought_state,
migration);
auto& topic_work_state = _work_states[nt];
auto& topic_work_state = _local_work_states[nt];
auto [it, _] = topic_work_state.try_emplace(
assignment.id, migration, *scope.sought_state);
auto& rwstate = it->second;
Expand Down Expand Up @@ -1135,7 +1215,7 @@ ss::future<> backend::reconcile_migration(
std::optional<std::reference_wrapper<backend::replica_work_state>>
backend::get_replica_work_state(const model::ntp& ntp) {
model::topic_namespace nt{ntp.ns, ntp.tp.topic};
if (auto it = _work_states.find(nt); it != _work_states.end()) {
if (auto it = _local_work_states.find(nt); it != _local_work_states.end()) {
auto& topic_work_state = it->second;
auto rwstate_it = topic_work_state.find(ntp.tp.partition);
if (rwstate_it != topic_work_state.end()) {
Expand Down Expand Up @@ -1325,6 +1405,8 @@ backend::work_scope backend::get_work_scope(
return {state::executed, false, true};
case state::cut_over:
return {state::finished, false, true};
case state::canceling:
return {state::cancelled, false, true};
default:
return {{}, false, false};
};
Expand All @@ -1340,6 +1422,8 @@ backend::work_scope backend::get_work_scope(
return {state::executed, true, false};
case state::cut_over:
return {state::finished, false, true};
case state::canceling:
return {state::cancelled, false, true};
default:
return {{}, false, false};
};
Expand All @@ -1351,4 +1435,11 @@ void backend::topic_reconciliation_state::clear() {
topic_scoped_work_done = false;
}

backend::topic_scoped_work_state::topic_scoped_work_state()
: _as()
, rcn(
_as,
ss::lowres_clock::now() + retry_chain_node::milliseconds_uint16_t::max(),
2s) {}

} // namespace cluster::data_migrations
Loading

0 comments on commit 43eff9d

Please sign in to comment.