diff --git a/src/v/cluster/data_migrated_resources.cc b/src/v/cluster/data_migrated_resources.cc index 16df2a9faf838..8aca93882814e 100644 --- a/src/v/cluster/data_migrated_resources.cc +++ b/src/v/cluster/data_migrated_resources.cc @@ -47,12 +47,12 @@ migrated_resource_state get_resource_state(state state) { case state::preparing: case state::prepared: case state::canceling: - case state::cancelled: case state::executing: case state::executed: case state::cut_over: return migrated_resource_state::fully_blocked; case state::finished: + case state::cancelled: return migrated_resource_state::non_restricted; } } @@ -63,15 +63,15 @@ migrated_resource_state get_resource_state(state state) { case state::planned: case state::preparing: case state::prepared: - case state::canceling: - case state::cancelled: return migrated_resource_state::metadata_locked; case state::executing: case state::executed: + case state::canceling: return migrated_resource_state::read_only; case state::cut_over: return migrated_resource_state::fully_blocked; case state::finished: + case state::cancelled: return migrated_resource_state::non_restricted; } } diff --git a/src/v/cluster/data_migration_backend.cc b/src/v/cluster/data_migration_backend.cc index e0db17460c97c..bc6d392b53d3c 100644 --- a/src/v/cluster/data_migration_backend.cc +++ b/src/v/cluster/data_migration_backend.cc @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -166,6 +167,7 @@ ss::future<> backend::start() { ss::future<> backend::stop() { vlog(dm_log.info, "backend stopping"); + abort_all_topic_work(); _mutex.broken(); _sem.broken(); _timer.cancel(); @@ -430,8 +432,15 @@ ss::future<> backend::schedule_topic_work(model::topic_namespace nt) { ss::future backend::do_topic_work(model::topic_namespace nt, topic_work tw) noexcept { - // todo: create an abort_source, save it in map, use in retry_loop, trigger - // where appropriate + auto [it, ins] = _active_topic_work_states.try_emplace(nt); + auto& tsws_lwptr_entry = it->second; + if (!ins) { + tsws_lwptr_entry->rcn.request_abort(); + // and forget about it, only the dying fiber will hold it + } + auto tsws_lwptr = seastar::make_lw_shared(); + tsws_lwptr_entry = tsws_lwptr; + errc ec; try { vlog( @@ -441,8 +450,10 @@ backend::do_topic_work(model::topic_namespace nt, topic_work tw) noexcept { nt, tw.sought_state); ec = co_await std::visit( - [this, &nt, &tw](const auto& info) { - return do_topic_work(nt, tw.sought_state, info); + [this, &nt, &tw, tsws_lwptr = std::move(tsws_lwptr)]( + const auto& info) mutable { + return do_topic_work( + nt, tw.sought_state, info, std::move(tsws_lwptr)); }, tw.info); vlog( @@ -464,6 +475,7 @@ backend::do_topic_work(model::topic_namespace nt, topic_work tw) noexcept { std::current_exception()); ec = errc::topic_operation_error; } + _active_topic_work_states.erase(nt); co_return topic_work_result{ .nt = std::move(nt), .migration = tw.migration_id, @@ -475,9 +487,9 @@ backend::do_topic_work(model::topic_namespace nt, topic_work tw) noexcept { ss::future backend::do_topic_work( const model::topic_namespace& nt, state sought_state, - const inbound_topic_work_info& itwi) { - ss::abort_source as; - retry_chain_node rcn{as, ss::lowres_clock::time_point::max(), 2s}; + const inbound_topic_work_info& itwi, + tsws_lwptr_t tsws) { + auto& rcn = tsws->rcn; // this switch should be in accordance to the logic in get_work_scope switch (sought_state) { case state::prepared: @@ -492,6 +504,21 @@ ss::future backend::do_topic_work( co_return co_await retry_loop( rcn, [this, &nt, &rcn] { return confirm_mount_topic(nt, rcn); }); } + case state::cancelled: { + // attempt to unmount first + auto unmount_res = co_await unmount_topic(nt, rcn); + if (unmount_res != errc::success) { + vlog( + dm_log.warn, "failed to unmount topic {}: {}", nt, unmount_res); + } + // drop topic in any case + auto drop_res = co_await delete_topic(nt, rcn); + if (drop_res != errc::success) { + vlog(dm_log.warn, "failed to drop topic {}: {}", nt, drop_res); + co_return drop_res; + } + co_return errc::success; + } default: vassert( false, @@ -504,28 +531,44 @@ ss::future backend::do_topic_work( ss::future backend::do_topic_work( const model::topic_namespace& nt, state sought_state, - const outbound_topic_work_info&) { - // this assert is in accordance to the logic in get_work_scope - vassert( - sought_state == state::finished, - "only ->finished state transition requires topic work"); - ss::abort_source as; - retry_chain_node rcn{as, ss::lowres_clock::time_point::max(), 2s}; - // todo: unmount first - auto unmount_res = co_await retry_loop( - rcn, [this, &nt, &rcn] { return unmount_topic(nt, rcn); }); - if (unmount_res == errc::topic_not_exists) { - vlog(dm_log.warn, "topic {} does not exist, ignoring", nt, unmount_res); + const outbound_topic_work_info&, + tsws_lwptr_t tsws) { + auto& rcn = tsws->rcn; + // this switch should be in accordance to the logic in get_work_scope + switch (sought_state) { + case state::finished: { + // unmount first + auto unmount_res = co_await unmount_topic(nt, rcn); + if (unmount_res != errc::success) { + vlog( + dm_log.warn, "failed to unmount topic {}: {}", nt, unmount_res); + co_return unmount_res; + } + // delete + co_return co_await delete_topic(nt, rcn); + } + case state::cancelled: { + // Noop, we have it here only because reconciliation logic requires + // either topic or partition work. The topic is unmounted and deleted in + // cut_over state, which cannot be cancelled. So if we are here we only + // need to lift topic restrictions, which is performed by + // migrated_resources. co_return errc::success; } - if (unmount_res != errc::success) { - vlog(dm_log.warn, "failed to unmount topic {}: {}", nt, unmount_res); - co_return unmount_res; + default: + vassert( + false, + "unknown topic work requested when transitioning outbound migration " + "state to {}", + sought_state); } - co_return co_await retry_loop(rcn, [this, &nt, &rcn] { - return _topics_frontend.delete_topic_after_migration( - nt, rcn.get_deadline()); - }); +} + +void backend::abort_all_topic_work() { + for (auto& [nt, tsws] : _active_topic_work_states) { + tsws->rcn.request_abort(); + } + _active_topic_work_states.clear(); } ss::future backend::create_topic( @@ -648,11 +691,33 @@ ss::future backend::confirm_mount_topic( co_return errc::topic_operation_error; } +ss::future +backend::delete_topic(const model::topic_namespace& nt, retry_chain_node& rcn) { + return retry_loop(rcn, [this, &nt, &rcn]() { + return _topics_frontend + .delete_topic_after_migration(nt, rcn.get_deadline()) + .then([&nt](errc ec) { + if (ec == errc::topic_not_exists) { + vlog(dm_log.warn, "topic {} missing, ignoring", nt); + return errc::success; + } + return ec; + }); + }); +} + ss::future backend::unmount_topic( + const model::topic_namespace& nt, retry_chain_node& rcn) { + return retry_loop( + rcn, [this, &nt, &rcn] { return do_unmount_topic(nt, rcn); }); +} + +ss::future backend::do_unmount_topic( const model::topic_namespace& nt, retry_chain_node& rcn) { auto cfg = _topic_table.get_topic_cfg(nt); if (!cfg) { - co_return errc::topic_not_exists; + vlog(dm_log.warn, "topic {} missing, ignoring", nt); + co_return errc::success; } auto umnt_res = co_await _topic_mount_handler->unmount_topic(*cfg, rcn); if (umnt_res == cloud_storage::topic_unmount_result::success) { @@ -727,6 +792,8 @@ ss::future<> backend::handle_raft0_leadership_update() { wakeup(); } else { vlog(dm_log.debug, "stepping down as a coordinator"); + // stop topic-scoped work + abort_all_topic_work(); // stop coordinating for (auto& [id, mrstate] : _migration_states) { co_await ssx::async_for_each( @@ -761,12 +828,13 @@ ss::future<> backend::handle_migration_update(id id) { "migration {} old sought state is {}", id, old_mrstate.scope.sought_state); - if ( - !new_maybe_metadata || new_state >= old_mrstate.scope.sought_state) { - vlog( - dm_log.debug, "dropping migration {} reconciliation state", id); - drop_migration_reconciliation_rstate(old_it); - } + vassert( + !new_maybe_metadata || new_state >= old_mrstate.scope.sought_state, + "migration state went from seeking {} back to {}", + old_mrstate.scope.sought_state, + new_state); + vlog(dm_log.debug, "dropping migration {} reconciliation state", id); + drop_migration_reconciliation_rstate(old_it); } // create new state if needed if (new_maybe_metadata) { @@ -828,13 +896,13 @@ ss::future<> backend::process_delta(cluster::topic_table_delta&& delta) { // local partition work if (has_local_replica(delta.ntp)) { - _work_states[nt].try_emplace( + _local_work_states[nt].try_emplace( delta.ntp.tp.partition, migration_id, *_migration_states.find(migration_id)->second.scope.sought_state); } else { - auto topic_work_it = _work_states.find(nt); - if (topic_work_it != _work_states.end()) { + auto topic_work_it = _local_work_states.find(nt); + if (topic_work_it != _local_work_states.end()) { auto& topic_work_state = topic_work_it->second; auto rwstate_it = topic_work_state.find(delta.ntp.tp.partition); if (rwstate_it != topic_work_state.end()) { @@ -844,7 +912,7 @@ ss::future<> backend::process_delta(cluster::topic_table_delta&& delta) { } topic_work_state.erase(rwstate_it); if (topic_work_state.empty()) { - _work_states.erase(topic_work_it); + _local_work_states.erase(topic_work_it); } } } @@ -998,7 +1066,14 @@ void backend::drop_migration_reconciliation_rstate( const auto& topics = rs_it->second.outstanding_topics; for (const auto& [nt, tstate] : topics) { clear_tstate_belongings(nt, tstate); - _work_states.erase(nt); + _local_work_states.erase(nt); + + auto it = _active_topic_work_states.find(nt); + if (it != _active_topic_work_states.end()) { + it->second->rcn.request_abort(); + _active_topic_work_states.erase(it); + } + _topic_migration_map.erase(nt); } _migration_states.erase(rs_it); @@ -1009,8 +1084,8 @@ ss::future<> backend::reconcile_topic( topic_reconciliation_state& tstate, id migration, work_scope scope, - bool schedule_local_work) { - if (!schedule_local_work && !_is_coordinator) { + bool schedule_local_partition_work) { + if (!schedule_local_partition_work && !_is_coordinator) { vlog( dm_log.debug, "not tracking topic {} transition towards state {} as part of " @@ -1027,15 +1102,20 @@ ss::future<> backend::reconcile_topic( nt, scope.sought_state, migration, - schedule_local_work, + schedule_local_partition_work, _is_coordinator); auto now = model::timeout_clock::now(); if (scope.partition_work_needed) { if (auto maybe_assignments = _topic_table.get_topic_assignments(nt)) { co_await ssx::async_for_each( *maybe_assignments | std::views::values, - [this, nt, &tstate, scope, migration, now, schedule_local_work]( - const auto& assignment) { + [this, + nt, + &tstate, + scope, + migration, + now, + schedule_local_partition_work](const auto& assignment) { model::ntp ntp{nt.ns, nt.tp, assignment.id}; auto nodes = assignment.replicas | std::views::transform( @@ -1067,7 +1147,7 @@ ss::future<> backend::reconcile_topic( it->second); _nodes_to_retry.insert_or_assign(node_id, now); } - if (schedule_local_work && _self == node_id) { + if (schedule_local_partition_work && _self == node_id) { vlog( dm_log.debug, "tracking ntp {} transition towards state {} as " @@ -1077,7 +1157,7 @@ ss::future<> backend::reconcile_topic( ntp, scope.sought_state, migration); - auto& topic_work_state = _work_states[nt]; + auto& topic_work_state = _local_work_states[nt]; auto [it, _] = topic_work_state.try_emplace( assignment.id, migration, *scope.sought_state); auto& rwstate = it->second; @@ -1135,7 +1215,7 @@ ss::future<> backend::reconcile_migration( std::optional> backend::get_replica_work_state(const model::ntp& ntp) { model::topic_namespace nt{ntp.ns, ntp.tp.topic}; - if (auto it = _work_states.find(nt); it != _work_states.end()) { + if (auto it = _local_work_states.find(nt); it != _local_work_states.end()) { auto& topic_work_state = it->second; auto rwstate_it = topic_work_state.find(ntp.tp.partition); if (rwstate_it != topic_work_state.end()) { @@ -1325,6 +1405,8 @@ backend::work_scope backend::get_work_scope( return {state::executed, false, true}; case state::cut_over: return {state::finished, false, true}; + case state::canceling: + return {state::cancelled, false, true}; default: return {{}, false, false}; }; @@ -1340,6 +1422,8 @@ backend::work_scope backend::get_work_scope( return {state::executed, true, false}; case state::cut_over: return {state::finished, false, true}; + case state::canceling: + return {state::cancelled, false, true}; default: return {{}, false, false}; }; @@ -1351,4 +1435,11 @@ void backend::topic_reconciliation_state::clear() { topic_scoped_work_done = false; } +backend::topic_scoped_work_state::topic_scoped_work_state() + : _as() + , rcn( + _as, + ss::lowres_clock::now() + retry_chain_node::milliseconds_uint16_t::max(), + 2s) {} + } // namespace cluster::data_migrations diff --git a/src/v/cluster/data_migration_backend.h b/src/v/cluster/data_migration_backend.h index c24cf41150245..cd3667caa3f46 100644 --- a/src/v/cluster/data_migration_backend.h +++ b/src/v/cluster/data_migration_backend.h @@ -21,6 +21,7 @@ #include #include +#include namespace cluster::data_migrations { @@ -87,6 +88,14 @@ class backend { state sought_state; errc ec; }; + class topic_scoped_work_state { + ss::abort_source _as; + + public: + retry_chain_node rcn; + topic_scoped_work_state(); + }; + using tsws_lwptr_t = ss::lw_shared_ptr; private: /* loop management */ @@ -116,11 +125,14 @@ class backend { ss::future do_topic_work( const model::topic_namespace& nt, state sought_state, - const inbound_topic_work_info& itwi); + const inbound_topic_work_info& itwi, + tsws_lwptr_t tsws_lwptr); ss::future do_topic_work( const model::topic_namespace& nt, state sought_state, - const outbound_topic_work_info& otwi); + const outbound_topic_work_info& otwi, + tsws_lwptr_t tsws_lwptr); + void abort_all_topic_work(); /* topic work helpers */ ss::future create_topic( const model::topic_namespace& local_nt, @@ -132,7 +144,11 @@ class backend { ss::future confirm_mount_topic( const model::topic_namespace& nt, retry_chain_node& rcn); ss::future + delete_topic(const model::topic_namespace& nt, retry_chain_node& rcn); + ss::future unmount_topic(const model::topic_namespace& nt, retry_chain_node& rcn); + ss::future + do_unmount_topic(const model::topic_namespace& nt, retry_chain_node& rcn); /* communication with partition workers */ void start_partition_work( @@ -177,7 +193,7 @@ class backend { topic_reconciliation_state& tstate, id migration, work_scope scope, - bool schedule_local_work); + bool schedule_local_partition_work); std::optional> get_replica_work_state(const model::ntp& ntp); @@ -256,15 +272,22 @@ class backend { }; absl::flat_hash_map _advance_requests; chunked_vector _unprocessed_deltas; + chunked_hash_map _rpc_responses; - /* Node-local data */ + /* Node-local data for partition-scoped work */ using topic_work_state_t = chunked_hash_map; - chunked_hash_map _work_states; - - chunked_hash_map _rpc_responses; + chunked_hash_map + _local_work_states; + /* + * Topic-scoped work states for starting/stopping and disallowing concurrent + * work on the same topic: similar to data_migrations::worker + */ chunked_vector _topic_work_results; + chunked_hash_map + _active_topic_work_states; // no null pointers on scheduling points + /* Refs to services etc */ model::node_id _self; migrations_table& _table; frontend& _frontend; diff --git a/src/v/cluster/data_migration_types.h b/src/v/cluster/data_migration_types.h index 5f27ee5742c00..bbe980bafd5f1 100644 --- a/src/v/cluster/data_migration_types.h +++ b/src/v/cluster/data_migration_types.h @@ -65,8 +65,6 @@ using consumer_group = named_type; * ┌─────▼────┐ * │ finished │ * └──────────┘ - * - * */ enum class state { planned, diff --git a/src/v/cluster/data_migration_worker.cc b/src/v/cluster/data_migration_worker.cc index 99e80e2acd140..c969ee5122ebb 100644 --- a/src/v/cluster/data_migration_worker.cc +++ b/src/v/cluster/data_migration_worker.cc @@ -22,7 +22,9 @@ #include "rpc/connection_cache.h" #include "ssx/future-util.h" +#include #include +#include #include #include @@ -32,7 +34,6 @@ namespace cluster::data_migrations { -// TODO: add configuration property worker::worker( model::node_id self, partition_leaders_table& leaders_table, @@ -79,6 +80,8 @@ worker::perform_partition_work(model::ntp&& ntp, partition_work&& work) { ntp_state.promise = ss::make_lw_shared>(); ntp_state.is_running = false; ntp_state.work = std::move(work); + ntp_state.as->request_abort(); + ntp_state.as = ss::make_lw_shared(); } spawn_work_if_leader(it); @@ -101,7 +104,8 @@ worker::ntp_state::ntp_state( notification_id_type leadership_subscription) : is_leader(is_leader) , work(std::move(work)) - , leadership_subscription(leadership_subscription) {} + , leadership_subscription(leadership_subscription) + , as(ss::make_lw_shared()) {} ss::future<> worker::handle_operation_result( model::ntp ntp, id migration_id, state sought_state, errc ec) { @@ -118,8 +122,24 @@ ss::future<> worker::handle_operation_result( // todo: configure sleep time, make it abortable from // worker::abort_partition_work - co_await ss::sleep_abortable(1s, _as); + auto it = _managed_ntps.find(ntp); + if ( + it == _managed_ntps.end() + || it->second.work.migration_id != migration_id + || it->second.work.sought_state != sought_state) { + vlog( + dm_log.debug, + "as part of migration {}, partition work for moving ntp {} to " + "state {} is done with result {}, but not needed anymore", + migration_id, + std::move(ntp), + sought_state, + ec); + co_return; + } + co_await ss::sleep_abortable(1s, *it->second.as); } + bool should_retry = ec != errc::success && ec != errc::shutting_down; auto it = _managed_ntps.find(ntp); if ( it == _managed_ntps.end() || it->second.work.migration_id != migration_id @@ -127,15 +147,15 @@ ss::future<> worker::handle_operation_result( vlog( dm_log.debug, "as part of migration {}, partition work for moving ntp {} to state " - "{} is done with result {}, but not needed anymore", + "{} was about to {}, but not needed anymore", migration_id, std::move(ntp), sought_state, - ec); + ec, + should_retry ? "retry" : "complete"); co_return; } - if (ec != errc::success && ec != errc::shutting_down) { - // any other errors deemed retryable + if (should_retry) { it->second.is_running = false; vlog( dm_log.info, @@ -153,6 +173,11 @@ ss::future<> worker::handle_operation_result( void worker::handle_leadership_update(const model::ntp& ntp, bool is_leader) { auto it = _managed_ntps.find(ntp); + vlog( + dm_log.info, + "got leadership update regarding ntp={}, is_leader={}", + ntp, + is_leader); if (it == _managed_ntps.end() || it->second.is_leader == is_leader) { return; } @@ -166,6 +191,7 @@ void worker::unmanage_ntp(managed_ntp_cit it, errc result) { _leaders_table.unregister_leadership_change_notification( it->second.leadership_subscription); it->second.promise->set_value(result); + it->second.as->request_abort(); _managed_ntps.erase(it); } @@ -254,6 +280,11 @@ ss::future worker::do_work( void worker::spawn_work_if_leader(managed_ntp_it it) { vassert(!it->second.is_running, "work already running"); + vlog( + dm_log.info, + "attempting to spawn work for ntp={}, is_leader={}", + it->first, + it->second.is_leader); if (!it->second.is_leader) { return; } diff --git a/src/v/cluster/data_migration_worker.h b/src/v/cluster/data_migration_worker.h index eed9298fbc795..cb4b4a9032a6f 100644 --- a/src/v/cluster/data_migration_worker.h +++ b/src/v/cluster/data_migration_worker.h @@ -18,6 +18,7 @@ #include "errc.h" #include "model/fundamental.h" +#include #include #include #include @@ -49,6 +50,7 @@ class worker : public ss::peering_sharded_service { notification_id_type leadership_subscription; ss::lw_shared_ptr> promise = ss::make_lw_shared>(); + ss::lw_shared_ptr as; ntp_state(const ntp_state&) = delete; ntp_state& operator=(const ntp_state&) = delete; @@ -70,6 +72,7 @@ class worker : public ss::peering_sharded_service { void handle_leadership_update(const model::ntp& ntp, bool is_leader); void unmanage_ntp(managed_ntp_cit it, errc result); void spawn_work_if_leader(managed_ntp_it it); + // also resulting future cannot throw when co_awaited ss::future do_work(managed_ntp_cit it) noexcept; ss::future do_work( diff --git a/src/v/cluster/metadata_cache.cc b/src/v/cluster/metadata_cache.cc index 984fe7e76944d..94f2045fd0e51 100644 --- a/src/v/cluster/metadata_cache.cc +++ b/src/v/cluster/metadata_cache.cc @@ -152,6 +152,11 @@ bool metadata_cache::should_reject_writes() const { == storage::disk_space_alert::degraded; } +bool metadata_cache::should_reject_reads(model::topic_namespace_view tp) const { + return _migrated_resources.local().get_topic_state(tp) + >= data_migrations::migrated_resource_state::fully_blocked; +} + bool metadata_cache::should_reject_writes( model::topic_namespace_view tp) const { return _migrated_resources.local().get_topic_state(tp) diff --git a/src/v/cluster/metadata_cache.h b/src/v/cluster/metadata_cache.h index 73d9639a7304a..fe425f428622f 100644 --- a/src/v/cluster/metadata_cache.h +++ b/src/v/cluster/metadata_cache.h @@ -123,6 +123,9 @@ class metadata_cache { std::optional get_node_rack_id(model::node_id) const; bool should_reject_writes() const; + + /// Check whether migrations block topic writes/reads + bool should_reject_reads(model::topic_namespace_view) const; bool should_reject_writes(model::topic_namespace_view) const; bool contains(model::topic_namespace_view, model::partition_id) const; diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 00056eb66b17d..ec48478eeb164 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -1336,12 +1336,20 @@ class simple_fetch_planner final : public fetch_planner::impl { } auto& tp = fp.topic_partition; + auto tn_view = tp.as_tn_view(); + const auto& metadata_cache = octx.rctx.metadata_cache(); + auto partition_id = tp.get_partition(); - if (unlikely(octx.rctx.metadata_cache().is_disabled( - tp.as_tn_view(), tp.get_partition()))) { + if (unlikely(metadata_cache.is_disabled(tn_view, partition_id))) { resp_it->set(make_partition_response_error( - fp.topic_partition.get_partition(), - error_code::replica_not_available)); + partition_id, error_code::replica_not_available)); + ++resp_it; + return; + } + + if (unlikely(metadata_cache.should_reject_reads(tn_view))) { + resp_it->set(make_partition_response_error( + partition_id, error_code::invalid_topic_exception)); ++resp_it; return; } @@ -1358,11 +1366,10 @@ class simple_fetch_planner final : public fetch_planner::impl { * return not_leader_for_partition error to force metadata * update. */ - auto ec = octx.rctx.metadata_cache().contains(tp.to_ntp()) + auto ec = metadata_cache.contains(tp.to_ntp()) ? error_code::not_leader_for_partition : error_code::unknown_topic_or_partition; - resp_it->set(make_partition_response_error( - fp.topic_partition.get_partition(), ec)); + resp_it->set(make_partition_response_error(partition_id, ec)); ++resp_it; return; }