From fe3561c8c8c70b601301c8a4f491b2a27f7c4601 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Wed, 2 Oct 2024 11:58:42 +0100 Subject: [PATCH] c/migrations: list unmounted topics --- src/v/cloud_storage/remote_label.h | 21 +++++ src/v/cloud_storage/tests/BUILD | 1 + .../tests/remote_path_provider_test.cc | 3 + .../tests/topic_mount_handler_test.cc | 89 +++++++++++++++++-- src/v/cloud_storage/topic_mount_handler.cc | 41 +++++++++ src/v/cloud_storage/topic_mount_handler.h | 6 ++ src/v/cluster/controller.cc | 20 +++++ src/v/cluster/controller.h | 6 +- src/v/cluster/data_migration_backend.cc | 18 ++-- src/v/cluster/data_migration_backend.h | 6 +- src/v/cluster/data_migration_frontend.cc | 18 ++++ src/v/cluster/data_migration_frontend.h | 12 +++ src/v/cluster/data_migration_rpc.json | 2 +- .../redpanda/admin/api-doc/migration.def.json | 33 ++++++- src/v/redpanda/admin/api-doc/migration.json | 16 +++- src/v/redpanda/admin/server.h | 2 + src/v/redpanda/admin/topics.cc | 74 +++++++++++++++ tests/rptest/services/admin.py | 4 + .../rptest/tests/data_migrations_api_test.py | 48 ++++++++++ 19 files changed, 394 insertions(+), 26 deletions(-) diff --git a/src/v/cloud_storage/remote_label.h b/src/v/cloud_storage/remote_label.h index 6478717f84bdf..ded29096c5e93 100644 --- a/src/v/cloud_storage/remote_label.h +++ b/src/v/cloud_storage/remote_label.h @@ -9,6 +9,7 @@ #pragma once #include "model/fundamental.h" +#include "model/metadata.h" #include "serde/rw/envelope.h" #include "serde/rw/uuid.h" @@ -36,4 +37,24 @@ struct remote_label } }; +struct labeled_namespaced_topic + : serde::envelope< + labeled_namespaced_topic, + serde::version<0>, + serde::compat_version<0>> { + using rpc_adl_exempt = std::true_type; + + cloud_storage::remote_label label; + model::topic_namespace tp_ns; + + auto serde_fields() { return std::tie(label, tp_ns); } + + friend bool + operator==(const labeled_namespaced_topic&, const labeled_namespaced_topic&) + = default; + + friend std::ostream& + operator<<(std::ostream&, const labeled_namespaced_topic&); +}; + } // namespace cloud_storage diff --git a/src/v/cloud_storage/tests/BUILD b/src/v/cloud_storage/tests/BUILD index 5bcac8915c4b0..3fd11907d182c 100644 --- a/src/v/cloud_storage/tests/BUILD +++ b/src/v/cloud_storage/tests/BUILD @@ -225,6 +225,7 @@ redpanda_cc_gtest( deps = [ ":s3_imposter_gtest", "//src/v/cloud_storage", + "//src/v/cloud_storage:remote_label", "//src/v/cloud_storage_clients", "//src/v/cluster:topic_configuration", "//src/v/config", diff --git a/src/v/cloud_storage/tests/remote_path_provider_test.cc b/src/v/cloud_storage/tests/remote_path_provider_test.cc index eae9e274b4d27..ce604a47aac87 100644 --- a/src/v/cloud_storage/tests/remote_path_provider_test.cc +++ b/src/v/cloud_storage/tests/remote_path_provider_test.cc @@ -404,6 +404,9 @@ TEST(BogusManifestPath, TestTopicMountManifestPath) { "migration/foo", "migration/foo/bar", "migration/foo/bar/baz", + "migration/foo/bar/baz/qux", + // Valid label/uuid but too many path components. + "migration/deadbeef-0000-0000-0000-000000000000/kafka/tp/qux", }) { const auto tmp = topic_mount_manifest_path::parse(path); ASSERT_FALSE(tmp.has_value()) diff --git a/src/v/cloud_storage/tests/topic_mount_handler_test.cc b/src/v/cloud_storage/tests/topic_mount_handler_test.cc index 30561a7782878..c1e2523da2e5e 100644 --- a/src/v/cloud_storage/tests/topic_mount_handler_test.cc +++ b/src/v/cloud_storage/tests/topic_mount_handler_test.cc @@ -8,6 +8,7 @@ * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ #include "cloud_storage/remote.h" +#include "cloud_storage/remote_label.h" #include "cloud_storage/tests/s3_imposter.h" #include "cloud_storage/topic_mount_handler.h" #include "cloud_storage/types.h" @@ -23,6 +24,8 @@ #include +#include + #include using namespace cloud_storage; @@ -52,9 +55,7 @@ get_topic_configuration(cluster::topic_properties topic_props) { } // namespace -struct TopicMountHandlerFixture - : public s3_imposter_fixture - , public testing::TestWithParam> { +struct TopicMountHandlerFixture : public s3_imposter_fixture { TopicMountHandlerFixture() { pool.start(10, ss::sharded_parameter([this] { return conf; })).get(); io.start( @@ -79,7 +80,15 @@ struct TopicMountHandlerFixture ss::sharded remote; }; -TEST_P(TopicMountHandlerFixture, TestMountTopicManifestDoesNotExist) { +struct TopicMountHandlerSuite + : public TopicMountHandlerFixture + , public testing::TestWithParam> {}; + +struct TopicMountHandlerListSuite + : public TopicMountHandlerFixture + , public ::testing::Test {}; + +TEST_P(TopicMountHandlerSuite, TestMountTopicManifestDoesNotExist) { set_expectations_and_listen({}); auto topic_props = cluster::topic_properties{}; @@ -105,7 +114,7 @@ TEST_P(TopicMountHandlerFixture, TestMountTopicManifestDoesNotExist) { confirm_result, topic_mount_result::mount_manifest_does_not_exist); } -TEST_P(TopicMountHandlerFixture, TestMountTopicManifestNotDeleted) { +TEST_P(TopicMountHandlerSuite, TestMountTopicManifestNotDeleted) { set_expectations_and_listen({}); retry_chain_node rtc(never_abort, 10s, 20ms); @@ -169,7 +178,7 @@ TEST_P(TopicMountHandlerFixture, TestMountTopicManifestNotDeleted) { ASSERT_EQ(exists_result, download_result::success); } -TEST_P(TopicMountHandlerFixture, TestMountTopicSuccess) { +TEST_P(TopicMountHandlerSuite, TestMountTopicSuccess) { set_expectations_and_listen({}); retry_chain_node rtc(never_abort, 10s, 20ms); @@ -217,7 +226,7 @@ TEST_P(TopicMountHandlerFixture, TestMountTopicSuccess) { ASSERT_EQ(exists_result, download_result::notfound); } -TEST_P(TopicMountHandlerFixture, TestUnmountTopicManifestNotCreated) { +TEST_P(TopicMountHandlerSuite, TestUnmountTopicManifestNotCreated) { set_expectations_and_listen({}); retry_chain_node rtc(never_abort, 10s, 20ms); @@ -269,7 +278,7 @@ TEST_P(TopicMountHandlerFixture, TestUnmountTopicManifestNotCreated) { ASSERT_EQ(exists_result, download_result::notfound); } -TEST_P(TopicMountHandlerFixture, TestUnmountTopicSuccess) { +TEST_P(TopicMountHandlerSuite, TestUnmountTopicSuccess) { set_expectations_and_listen({}); retry_chain_node rtc(never_abort, 10s, 20ms); @@ -305,7 +314,69 @@ TEST_P(TopicMountHandlerFixture, TestUnmountTopicSuccess) { ASSERT_EQ(exists_result, download_result::success); } +TEST_F(TopicMountHandlerListSuite, TestListUnmountedTopics) { + set_expectations_and_listen({}); + retry_chain_node rtc(never_abort, 10s, 20ms); + + auto handler = topic_mount_handler(bucket_name, remote.local()); + auto result = handler.list_unmounted_topics(rtc).get(); + + // Create some dummy expectations to make sure we don't fail. + add_expectations({ + expectation{.url = "foobar"}, + expectation{.url = "migration/foo"}, + expectation{.url = "migration/foo/bar"}, + expectation{.url = "migration/foo/bar/baz"}, + }); + + ASSERT_TRUE(result); + ASSERT_TRUE(result.value().empty()); + + std::vector topics{ + cluster::topic_configuration( + model::ns("kafka"), model::topic("tp1"), 1, 1), + cluster::topic_configuration( + model::ns("kafka"), model::topic("tp2"), 1, 1), + }; + + for (auto label : {model::default_cluster_uuid, test_uuid}) { + for (const auto& tp_ns : {test_tp_ns, test_tp_ns_override}) { + auto topic = cluster::topic_configuration(tp_ns.ns, tp_ns.tp, 1, 1); + if (label != model::default_cluster_uuid) { + topic.properties.remote_label = remote_label{label}; + } + if (tp_ns != test_tp_ns) { + topic.properties.remote_topic_namespace_override = tp_ns; + } + topics.push_back(topic); + } + } + + auto expectations = std::vector{}; + for (const auto& topic : topics) { + ASSERT_EQ( + handler.unmount_topic(topic, rtc).get(), + topic_unmount_result::success); + + expectations.emplace_back(labeled_namespaced_topic{ + .label = topic.properties.remote_label.value_or( + remote_label{model::default_cluster_uuid}), + .tp_ns = topic.tp_ns, + }); + } + + result = handler.list_unmounted_topics(rtc).get(); + ASSERT_TRUE(result); + ASSERT_EQ(result.value().size(), topics.size()); + + for (const auto& topic : result.value()) { + auto it = std::find(expectations.begin(), expectations.end(), topic); + ASSERT_NE(it, expectations.end()); + expectations.erase(it); + } +} + INSTANTIATE_TEST_SUITE_P( TopicMountHandlerOverride, - TopicMountHandlerFixture, + TopicMountHandlerSuite, testing::Combine(testing::Bool(), testing::Bool())); diff --git a/src/v/cloud_storage/topic_mount_handler.cc b/src/v/cloud_storage/topic_mount_handler.cc index a6ab567101156..7710a90f989bb 100644 --- a/src/v/cloud_storage/topic_mount_handler.cc +++ b/src/v/cloud_storage/topic_mount_handler.cc @@ -15,7 +15,9 @@ #include "cloud_storage/remote.h" #include "cloud_storage/remote_path_provider.h" #include "cloud_storage/topic_mount_manifest.h" +#include "cloud_storage/types.h" #include "model/metadata.h" +#include "utils/retry_chain_node.h" namespace cloud_storage { @@ -155,6 +157,45 @@ ss::future topic_mount_handler::unmount_topic( co_return topic_unmount_result::success; } +ss::future>> +topic_mount_handler::list_unmounted_topics(retry_chain_node& parent) { + auto log = retry_chain_logger(cst_log, parent); + + cloud_storage_clients::object_key prefix{ + cloud_storage::topic_mount_manifest_path::prefix()}; + + vlog(log.debug, "listing unmounted topics with prefix {}", prefix); + + auto list_result = co_await _remote.list_objects(_bucket, parent, prefix); + if (!list_result) { + vlog(log.error, "failed to list objects: {}", list_result.error()); + co_return list_result.error(); + } + + chunked_vector ret; + for (const auto& item : list_result.assume_value().contents) { + vlog(log.trace, "parsing object {}", item.key); + + auto path_parse_result + = cloud_storage::topic_mount_manifest_path::parse( + std::string(item.key)); + if (!path_parse_result) { + vlog(log.error, "failed to parse object key {}", item.key); + continue; + } + + ret.emplace_back(labeled_namespaced_topic{ + .label = cloud_storage::remote_label( + path_parse_result->cluster_uuid()), + .tp_ns = path_parse_result->tp_ns(), + }); + } + + vlog(log.trace, "found {} unmounted topics", ret.size()); + + co_return ret; +} + ss::future topic_mount_handler::prepare_mount_topic( const cluster::topic_configuration& topic_cfg, retry_chain_node& parent) { return mount_topic(topic_cfg, true, parent); diff --git a/src/v/cloud_storage/topic_mount_handler.h b/src/v/cloud_storage/topic_mount_handler.h index 3e6eeabc05ff9..e0eced62e9d93 100644 --- a/src/v/cloud_storage/topic_mount_handler.h +++ b/src/v/cloud_storage/topic_mount_handler.h @@ -10,10 +10,12 @@ */ #pragma once +#include "base/outcome.h" #include "cloud_storage/fwd.h" #include "cloud_storage/remote_label.h" #include "cloud_storage_clients/types.h" #include "cluster/topic_configuration.h" +#include "container/fragmented_vector.h" #include "model/fundamental.h" #include "utils/retry_chain_node.h" @@ -63,6 +65,10 @@ class topic_mount_handler { ss::future unmount_topic( const cluster::topic_configuration& topic_cfg, retry_chain_node& parent); + // List from cloud storage all topics that are not mounted. + ss::future>> + list_unmounted_topics(retry_chain_node& parent); + private: // Perform the mounting process by deleting the topic mount manifest. // topic_cfg should be the recovered topic configuration from a topic diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index f7f3739082bfd..bb02c60378bd9 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -10,6 +10,7 @@ #include "cluster/controller.h" #include "base/likely.h" +#include "cloud_storage/topic_mount_handler.h" #include "cluster/bootstrap_backend.h" #include "cluster/client_quota_backend.h" #include "cluster/client_quota_frontend.h" @@ -305,6 +306,13 @@ ss::future<> controller::start( std::ref(_members_table), std::ref(_as)); + if (auto bucket_opt = get_configured_bucket(); bucket_opt.has_value()) { + co_await _topic_mount_handler.start( + bucket_opt.value(), ss::sharded_parameter([this] { + return std::ref(_cloud_storage_api.local()); + })); + } + co_await _data_migration_frontend.start( _raft0->self().id(), _cloud_storage_api.local_is_initialized(), @@ -313,6 +321,15 @@ ss::future<> controller::start( std::ref(_stm), std::ref(_partition_leaders), std::ref(_connections), + ss::sharded_parameter( + [this]() -> std::optional< + std::reference_wrapper> { + if (_topic_mount_handler.local_is_initialized()) { + return std::ref(_topic_mount_handler.local()); + } else { + return {}; + } + }), std::ref(_as)); co_await _data_migration_worker.start( @@ -782,6 +799,9 @@ ss::future<> controller::start( _cloud_storage_api.local_is_initialized() ? std::make_optional(std::ref(_cloud_storage_api.local())) : std::nullopt, + _topic_mount_handler.local_is_initialized() + ? std::make_optional(std::ref(_topic_mount_handler.local())) + : std::nullopt, std::ref(_as.local())); co_await _data_migration_backend.invoke_on_instance( &data_migrations::backend::start); diff --git a/src/v/cluster/controller.h b/src/v/cluster/controller.h index ebbab2b472dbf..a337166d2f63d 100644 --- a/src/v/cluster/controller.h +++ b/src/v/cluster/controller.h @@ -11,7 +11,6 @@ #pragma once -#include "cloud_storage/fwd.h" #include "cluster/cloud_metadata/producer_id_recovery_manager.h" #include "cluster/controller_probe.h" #include "cluster/controller_stm.h" @@ -34,6 +33,10 @@ #include #include +namespace cloud_storage { +class topic_mount_handler; +} + namespace cluster { class cluster_discovery; @@ -325,6 +328,7 @@ class controller { ss::sharded _quota_store; // instance per core ss::sharded _quota_backend; // single instance ss::sharded _data_migration_worker; + ss::sharded _topic_mount_handler; ssx::single_sharded _data_migration_backend; ss::sharded _data_migration_irpc_frontend; ss::gate _gate; diff --git a/src/v/cluster/data_migration_backend.cc b/src/v/cluster/data_migration_backend.cc index a0b54218aff46..d5df6b5abd275 100644 --- a/src/v/cluster/data_migration_backend.cc +++ b/src/v/cluster/data_migration_backend.cc @@ -88,6 +88,8 @@ backend::backend( shard_table& shard_table, std::optional> cloud_storage_api, + std::optional> + topic_mount_handler, ss::abort_source& as) : _self(*config::node().node_id()) , _table(table) @@ -98,6 +100,7 @@ backend::backend( , _topic_table(topic_table) , _shard_table(shard_table) , _cloud_storage_api(cloud_storage_api) + , _topic_mount_handler(topic_mount_handler) , _as(as) {} ss::future<> backend::start() { @@ -137,14 +140,6 @@ ss::future<> backend::start() { }); if (_cloud_storage_api) { - auto maybe_bucket = cloud_storage::configuration::get_bucket_config()(); - vassert( - maybe_bucket, "cloud_storage_api active but no bucket configured"); - cloud_storage_clients::bucket_name bucket{*maybe_bucket}; - _topic_mount_handler - = std::make_unique( - bucket, *_cloud_storage_api); - _table_notification_id = _table.register_notification([this](id id) { ssx::spawn_with_gate( _gate, [this, id]() { return handle_migration_update(id); }); @@ -698,7 +693,7 @@ ss::future backend::prepare_mount_topic( co_return errc::topic_not_exists; } vlog(dm_log.info, "trying to prepare mount topic, cfg={}", *cfg); - auto mnt_res = co_await _topic_mount_handler->prepare_mount_topic( + auto mnt_res = co_await _topic_mount_handler->get().prepare_mount_topic( *cfg, rcn); if (mnt_res == cloud_storage::topic_mount_result::mount_manifest_exists) { co_return errc::success; @@ -714,7 +709,7 @@ ss::future backend::confirm_mount_topic( co_return errc::topic_not_exists; } vlog(dm_log.info, "trying to confirm mount topic, cfg={}", *cfg); - auto mnt_res = co_await _topic_mount_handler->confirm_mount_topic( + auto mnt_res = co_await _topic_mount_handler->get().confirm_mount_topic( *cfg, rcn); if ( mnt_res @@ -753,7 +748,8 @@ ss::future backend::do_unmount_topic( vlog(dm_log.warn, "topic {} missing, ignoring", nt); co_return errc::success; } - auto umnt_res = co_await _topic_mount_handler->unmount_topic(*cfg, rcn); + auto umnt_res = co_await _topic_mount_handler->get().unmount_topic( + *cfg, rcn); if (umnt_res == cloud_storage::topic_unmount_result::success) { co_return errc::success; } diff --git a/src/v/cluster/data_migration_backend.h b/src/v/cluster/data_migration_backend.h index 8126706d7ecea..b105389a8ac3f 100644 --- a/src/v/cluster/data_migration_backend.h +++ b/src/v/cluster/data_migration_backend.h @@ -42,6 +42,8 @@ class backend { shard_table& shard_table, std::optional> _cloud_storage_api, + std::optional> + _topic_mount_handler, ss::abort_source& as); ss::future<> start(); @@ -320,10 +322,10 @@ class backend { shard_table& _shard_table; std::optional> _cloud_storage_api; + std::optional> + _topic_mount_handler; ss::abort_source& _as; - std::unique_ptr _topic_mount_handler; - ss::gate _gate; ssx::semaphore _sem{0, "c/data-migration-be"}; mutex _mutex{"c/data-migration-be::lock"}; diff --git a/src/v/cluster/data_migration_frontend.cc b/src/v/cluster/data_migration_frontend.cc index e5943c787b4e7..00c984bf95767 100644 --- a/src/v/cluster/data_migration_frontend.cc +++ b/src/v/cluster/data_migration_frontend.cc @@ -10,6 +10,7 @@ */ #include "cluster/data_migration_frontend.h" +#include "cloud_storage/topic_mount_handler.h" #include "cluster/cluster_utils.h" #include "cluster/commands.h" #include "cluster/controller_stm.h" @@ -25,6 +26,7 @@ #include "partition_leaders_table.h" #include "rpc/connection_cache.h" #include "ssx/future-util.h" +#include "utils/retry_chain_node.h" #include @@ -38,6 +40,8 @@ frontend::frontend( ss::sharded& stm, ss::sharded& leaders, ss::sharded& connections, + std::optional> + topic_mount_handler, ss::sharded& as) : _self(self) , _cloud_storage_api_initialized(cloud_storage_api_initialized) @@ -46,6 +50,7 @@ frontend::frontend( , _controller(stm) , _leaders_table(leaders) , _connections(connections) + , _topic_mount_handler(topic_mount_handler) , _as(as) , _operation_timeout(10s) {} @@ -300,6 +305,19 @@ frontend::get_migration(id migration_id) { }); } +ss::future +frontend::list_unmounted_topics() { + if (!_topic_mount_handler.has_value()) { + vlog(dm_log.warn, "topic mount handler is not initialized"); + co_return co_await ssx::now( + errc::feature_disabled); + } + + auto rtc = retry_chain_node{_as.local(), 30s, 100ms}; + + co_return co_await _topic_mount_handler->get().list_unmounted_topics(rtc); +} + ss::future frontend::insert_barrier() { const auto barrier_deadline = _operation_timeout + model::timeout_clock::now(); diff --git a/src/v/cluster/data_migration_frontend.h b/src/v/cluster/data_migration_frontend.h index 9b4746e888122..ee9b024ed8f69 100644 --- a/src/v/cluster/data_migration_frontend.h +++ b/src/v/cluster/data_migration_frontend.h @@ -11,6 +11,7 @@ #pragma once #include "base/outcome.h" +#include "cloud_storage/remote_label.h" #include "cluster/data_migration_types.h" #include "cluster/fwd.h" #include "features/fwd.h" @@ -18,6 +19,10 @@ #include +namespace cloud_storage { +class topic_mount_handler; +} + namespace cluster::data_migrations { class frontend : public ss::peering_sharded_service { @@ -32,6 +37,7 @@ class frontend : public ss::peering_sharded_service { ss::sharded&, ss::sharded&, ss::sharded&, + std::optional>, ss::sharded&); ss::future> create_migration( @@ -50,6 +56,10 @@ class frontend : public ss::peering_sharded_service { ss::future> get_migration(id); ss::future> list_migrations(); + using list_unmounted_topics_result + = result>; + ss::future list_unmounted_topics(); + private: /** * Must be executed on data migrations shard @@ -90,6 +100,8 @@ class frontend : public ss::peering_sharded_service { ss::sharded& _controller; ss::sharded& _leaders_table; ss::sharded& _connections; + std::optional> + _topic_mount_handler; ss::sharded& _as; std::chrono::milliseconds _operation_timeout; }; diff --git a/src/v/cluster/data_migration_rpc.json b/src/v/cluster/data_migration_rpc.json index 455a08202e0ad..8f98344a4d68b 100644 --- a/src/v/cluster/data_migration_rpc.json +++ b/src/v/cluster/data_migration_rpc.json @@ -26,4 +26,4 @@ "output_type": "check_ntp_states_reply" } ] -} \ No newline at end of file +} diff --git a/src/v/redpanda/admin/api-doc/migration.def.json b/src/v/redpanda/admin/api-doc/migration.def.json index 0d4eea7f6165d..7863cd54e608b 100644 --- a/src/v/redpanda/admin/api-doc/migration.def.json +++ b/src/v/redpanda/admin/api-doc/migration.def.json @@ -14,6 +14,37 @@ } } }, +"labeled_namespaced_topic": { + "type": "object", + "required": [ + "topic" + ], + "properties": { + "label": { + "type": "string", + "description": "Label of the topic" + }, + "topic": { + "type": "string", + "description": "Topic name" + }, + "ns": { + "type": "string", + "description": "Topic namespace, if not present it is assumed that topic is in kafka namespace" + } + } +}, +"unmounted_topics_response": { + "type": "object", + "properties": { + "topics": { + "type": "array", + "items": { + "$ref": "labeled_namespaced_topic" + } + } + } +}, "outbound_migration": { "type": "object", "required": [ @@ -183,4 +214,4 @@ "type": "int" } } -} \ No newline at end of file +} diff --git a/src/v/redpanda/admin/api-doc/migration.json b/src/v/redpanda/admin/api-doc/migration.json index a574f76f24ea9..8b9fd98b45d29 100644 --- a/src/v/redpanda/admin/api-doc/migration.json +++ b/src/v/redpanda/admin/api-doc/migration.json @@ -90,6 +90,20 @@ ] } }, +"/v1/topics/unmounted": { + "get": { + "operationId": "list_unmounted_topics", + "summary": "List all unmounted topics", + "responses": { + "200": { + "description": "List of unmounted topics", + "schema": { + "$ref": "unmounted_topics_response" + } + } + } + } +}, "/v1/topics/mount": { "post": { "operationId": "mount_topics", @@ -136,4 +150,4 @@ } } } -} \ No newline at end of file +} diff --git a/src/v/redpanda/admin/server.h b/src/v/redpanda/admin/server.h index 59a03e7e7558a..1d6115ed430db 100644 --- a/src/v/redpanda/admin/server.h +++ b/src/v/redpanda/admin/server.h @@ -665,6 +665,8 @@ class admin_server { delete_migration(std::unique_ptr); // Topic routes + ss::future> list_unmounted_topics( + std::unique_ptr, std::unique_ptr); ss::future mount_topics(std::unique_ptr); ss::future diff --git a/src/v/redpanda/admin/topics.cc b/src/v/redpanda/admin/topics.cc index 7d3b98a8a7a18..6324459f6f4b5 100644 --- a/src/v/redpanda/admin/topics.cc +++ b/src/v/redpanda/admin/topics.cc @@ -13,11 +13,18 @@ #include "cluster/data_migration_frontend.h" #include "cluster/data_migration_types.h" #include "container/fragmented_vector.h" +#include "json/chunked_buffer.h" #include "json/validator.h" #include "redpanda/admin/api-doc/migration.json.hh" #include "redpanda/admin/data_migration_utils.h" #include "redpanda/admin/server.h" #include "redpanda/admin/util.h" +#include "ssx/async_algorithm.h" + +#include +#include +#include +#include using admin::apply_validator; @@ -120,9 +127,38 @@ json::validator make_unmount_array_validator() { return json::validator(schema); } +seastar::httpd::migration_json::labeled_namespaced_topic +to_admin_type(const cloud_storage::labeled_namespaced_topic& topic) { + seastar::httpd::migration_json::labeled_namespaced_topic ret; + ret.label = ss::sstring(topic.label.cluster_uuid()); + ret.topic = topic.tp_ns.tp(); + ret.ns = topic.tp_ns.ns(); + return ret; +} + +// Note: A similar method exists in pandaproxy::json. Extract it to +// `src/v/bytes/iostream.h`. +auto iobuf_body_writer(iobuf buf) { + return [buf = std::move(buf)](ss::output_stream out) mutable { + return ss::do_with( + std::move(out), + [buf = std::move(buf)](ss::output_stream& out) mutable { + return write_iobuf_to_output_stream(std::move(buf), out) + .finally([&out] { return out.close(); }); + }); + }; +} + } // namespace void admin_server::register_topic_routes() { + register_route_raw_async( + ss::httpd::migration_json::list_unmounted_topics, + [this]( + std::unique_ptr req, + std::unique_ptr reply) { + return list_unmounted_topics(std::move(req), std::move(reply)); + }); register_route( ss::httpd::migration_json::mount_topics, [this](std::unique_ptr req) { @@ -135,6 +171,44 @@ void admin_server::register_topic_routes() { }); } +ss::future> +admin_server::list_unmounted_topics( + std::unique_ptr req, + std::unique_ptr reply) { + auto result = co_await _controller->get_data_migration_frontend() + .local() + .list_unmounted_topics(); + if (!result) { + vlog( + adminlog.warn, + "unable list unmounted topics - error: {}", + result.error()); + co_await throw_on_error(*req, result.error(), model::controller_ntp); + throw ss::httpd::server_error_exception( + "unknown error when listing unmounted topics"); + } + + json::chunked_buffer buf; + json::Writer writer(buf); + writer.StartObject(); + writer.Key("topics"); + writer.StartArray(); + co_await ssx::async_for_each( + result.value().begin(), + result.value().end(), + [&writer](const cloud_storage::labeled_namespaced_topic& topic) { + auto json_str = to_admin_type(topic).to_json(); + writer.RawValue( + json_str.c_str(), json_str.size(), rapidjson::Type::kObjectType); + }); + writer.EndArray(); + writer.EndObject(); + + reply->write_body("json", iobuf_body_writer(std::move(buf).as_iobuf())); + + co_return std::move(reply); +} + ss::future admin_server::mount_topics(std::unique_ptr req) { static thread_local json::validator validator diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index 7d2086c6d7d50..f9e6638e7e04e 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -1766,6 +1766,10 @@ def delete_data_migration(self, path = f"migrations/{migration_id}" return self._request("DELETE", path, node=node) + def list_unmounted_topics(self, node: Optional[ClusterNode] = None): + path = "topics/unmounted" + return self._request("GET", path, node=node) + def unmount_topics(self, topics: list[NamespacedTopic], node: Optional[ClusterNode] = None): diff --git a/tests/rptest/tests/data_migrations_api_test.py b/tests/rptest/tests/data_migrations_api_test.py index d8224267107f9..1d873bd01901b 100644 --- a/tests/rptest/tests/data_migrations_api_test.py +++ b/tests/rptest/tests/data_migrations_api_test.py @@ -806,3 +806,51 @@ def test_migrated_topic_data_integrity(self, transfer_leadership: bool, read_blocked=False, produce_blocked=False) remounted = True + + @cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST) + def test_list_unmounted_topics(self): + topics = [TopicSpec(partition_count=3) for i in range(5)] + + for t in topics: + self.client().create_topic(t) + + admin = Admin(self.redpanda) + unmounted_response = admin.list_unmounted_topics().json() + assert len(unmounted_response["topics"] + ) == 0, "There should be no unmounted topics" + + outbound_topics = [make_namespaced_topic(t.name) for t in topics] + reply = self.admin.unmount_topics(outbound_topics).json() + self.logger.info(f"create migration reply: {reply}") + + self.logger.info('waiting for partitions be deleted') + self.wait_partitions_disappear(topics) + + unmounted_response = admin.list_unmounted_topics().json() + assert len(unmounted_response["topics"]) == len( + topics), "There should be unmounted topics" + + # Mount 3 topics based on the unmounted topics response. This ensures + # that the response is correct/usable. + inbound_topics = [ + InboundTopic(NamespacedTopic(topic=t["topic"], namespace=t["ns"]), + alias=None) for t in unmounted_response["topics"][:3] + ] + mount_resp = self.admin.mount_topics(inbound_topics).json() + + # Build expectations based on original topic specs that match the + # unmounted topics response. + expected_topic_specs = [] + for t in unmounted_response["topics"][:3]: + expected_topic_specs.append( + TopicSpec(name=t["topic"], partition_count=3)) + + self.wait_partitions_appear(expected_topic_specs) + + # Wait for the migration to complete. This guarantees that the mount manifests + # are deleted. + self.wait_migration_disappear(mount_resp["id"]) + + unmounted_response = admin.list_unmounted_topics().json() + assert len(unmounted_response["topics"] + ) == 2, "There should be 2 unmounted topics"