Skip to content

Commit

Permalink
c/migrations: list unmounted topics
Browse files Browse the repository at this point in the history
  • Loading branch information
nvartolomei committed Oct 2, 2024
1 parent 94e07bc commit be9dc1b
Show file tree
Hide file tree
Showing 12 changed files with 253 additions and 3 deletions.
3 changes: 3 additions & 0 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ ss::future<> controller::start(
std::ref(_stm),
std::ref(_partition_leaders),
std::ref(_connections),
_cloud_storage_api.local_is_initialized()
? std::make_optional(std::ref(_cloud_storage_api.local()))
: std::nullopt,
std::ref(_as));

co_await _data_migration_worker.start(
Expand Down
71 changes: 71 additions & 0 deletions src/v/cluster/data_migration_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
*/
#include "cluster/data_migration_frontend.h"

#include "cloud_storage/remote.h"
#include "cloud_storage/remote_path_provider.h"
#include "cloud_storage_clients/types.h"
#include "cluster/cluster_utils.h"
#include "cluster/commands.h"
#include "cluster/controller_stm.h"
Expand All @@ -25,9 +28,12 @@
#include "partition_leaders_table.h"
#include "rpc/connection_cache.h"
#include "ssx/future-util.h"
#include "utils/retry_chain_node.h"

#include <fmt/ostream.h>

#include <stdexcept>

namespace cluster::data_migrations {

frontend::frontend(
Expand All @@ -38,6 +44,8 @@ frontend::frontend(
ss::sharded<controller_stm>& stm,
ss::sharded<partition_leaders_table>& leaders,
ss::sharded<rpc::connection_cache>& connections,
std::optional<std::reference_wrapper<cloud_storage::remote>>
cloud_storage_api,
ss::sharded<ss::abort_source>& as)
: _self(self)
, _cloud_storage_api_initialized(cloud_storage_api_initialized)
Expand All @@ -46,6 +54,7 @@ frontend::frontend(
, _controller(stm)
, _leaders_table(leaders)
, _connections(connections)
, _cloud_storage_api(cloud_storage_api)
, _as(as)
, _operation_timeout(10s) {}

Expand Down Expand Up @@ -120,6 +129,68 @@ bool frontend::data_migrations_active() const {
&& _cloud_storage_api_initialized;
}

ss::future<frontend::list_unmounted_topics_result>
frontend::list_unmounted_topics() {
if (!data_migrations_active()) {
return ssx::now<list_unmounted_topics_result>(errc::feature_disabled);
}

vlog(dm_log.debug, "listing unmounted topics");

return container().invoke_on(
data_migrations_shard,
[](frontend& local) -> ss::future<list_unmounted_topics_result> {
auto rtc = retry_chain_node{local._as.local(), 10s, 100ms};
auto log = retry_chain_logger(dm_log, rtc);

auto maybe_bucket
= cloud_storage::configuration::get_bucket_config()();
vassert(
maybe_bucket, "cloud_storage_api active but no bucket configured");
cloud_storage_clients::bucket_name bucket{*maybe_bucket};

cloud_storage_clients::object_key prefix{"migration/"};
auto list_result
= co_await local._cloud_storage_api->get().list_objects(
bucket, rtc, prefix);
if (!list_result) {
vlog(
log.error,
"failed to list objects in bucket {}: {}",
bucket,
list_result.error());
co_return errc::timeout;
}

chunked_vector<labeled_namespaced_topic> ret;
for (const auto& item : list_result.assume_value().contents) {
vlog(log.trace, "parsing object {}", item.key);

auto path_parse_result
= cloud_storage::topic_mount_manifest_path::from_string(
std::string(item.key));
if (!path_parse_result) {
vlog(
log.error,
"failed to parse object key {} in bucket {}",
item.key,
bucket);
continue;
}

ret.emplace_back(labeled_namespaced_topic{
.label = cloud_storage::remote_label(
path_parse_result->cluster_uuid()),
.topic = path_parse_result->tp_ns(),
});
}

vlog(log.trace, "found {} unmounted topics", ret.size());

co_return ret;
});
}

ss::future<result<id>> frontend::create_migration(
data_migration migration, can_dispatch_to_leader can_dispatch) {
if (!data_migrations_active()) {
Expand Down
10 changes: 10 additions & 0 deletions src/v/cluster/data_migration_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
#pragma once

#include "base/outcome.h"
#include "cloud_storage/fwd.h"
#include "cluster/data_migration_types.h"
#include "cluster/fwd.h"
#include "container/fragmented_vector.h"
#include "features/fwd.h"
#include "rpc/fwd.h"

Expand All @@ -32,8 +34,14 @@ class frontend : public ss::peering_sharded_service<frontend> {
ss::sharded<controller_stm>&,
ss::sharded<partition_leaders_table>&,
ss::sharded<rpc::connection_cache>&,
std::optional<std::reference_wrapper<cloud_storage::remote>>
_cloud_storage_api,
ss::sharded<ss::abort_source>&);

using list_unmounted_topics_result
= result<chunked_vector<labeled_namespaced_topic>>;
ss::future<list_unmounted_topics_result> list_unmounted_topics();

ss::future<result<id>> create_migration(
data_migration migration,
can_dispatch_to_leader dispatch = can_dispatch_to_leader::yes);
Expand Down Expand Up @@ -88,6 +96,8 @@ class frontend : public ss::peering_sharded_service<frontend> {
ss::sharded<controller_stm>& _controller;
ss::sharded<partition_leaders_table>& _leaders_table;
ss::sharded<rpc::connection_cache>& _connections;
std::optional<std::reference_wrapper<cloud_storage::remote>>
_cloud_storage_api;
ss::sharded<ss::abort_source>& _as;
std::chrono::milliseconds _operation_timeout;
};
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/data_migration_rpc.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
"output_type": "check_ntp_states_reply"
}
]
}
}
5 changes: 5 additions & 0 deletions src/v/cluster/data_migration_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,4 +221,9 @@ std::ostream& operator<<(std::ostream& o, const check_ntp_states_reply& r) {
return o;
}

std::ostream& operator<<(std::ostream& o, const labeled_namespaced_topic& v) {
fmt::print(o, "{{label: {}, topic: {}}}", v.label, v.topic);
return o;
}

} // namespace cluster::data_migrations
21 changes: 21 additions & 0 deletions src/v/cluster/data_migration_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
#pragma once

#include "cloud_storage/remote_label.h"
#include "cluster/errc.h"
#include "container/fragmented_vector.h"
#include "model/metadata.h"
Expand Down Expand Up @@ -517,4 +518,24 @@ struct check_ntp_states_reply
friend std::ostream& operator<<(std::ostream&, const self&);
};

struct labeled_namespaced_topic
: serde::envelope<
labeled_namespaced_topic,
serde::version<0>,
serde::compat_version<0>> {
using rpc_adl_exempt = std::true_type;

cloud_storage::remote_label label;
model::topic_namespace topic;

auto serde_fields() { return std::tie(label, topic); }

friend bool
operator==(const labeled_namespaced_topic&, const labeled_namespaced_topic&)
= default;

friend std::ostream&
operator<<(std::ostream&, const labeled_namespaced_topic&);
};

} // namespace cluster::data_migrations
33 changes: 32 additions & 1 deletion src/v/redpanda/admin/api-doc/migration.def.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,37 @@
}
}
},
"labeled_namespaced_topic": {
"type": "object",
"required": [
"topic"
],
"properties": {
"label": {
"type": "string",
"description": "Label of the topic"
},
"topic": {
"type": "string",
"description": "Topic name"
},
"ns": {
"type": "string",
"description": "Topic namespace, if not present it is assumed that topic is in kafka namespace"
}
}
},
"unmounted_topics_response": {
"type": "object",
"properties": {
"topics": {
"type": "array",
"items": {
"$ref": "labeled_namespaced_topic"
}
}
}
},
"outbound_migration": {
"type": "object",
"required": [
Expand Down Expand Up @@ -183,4 +214,4 @@
"type": "int"
}
}
}
}
16 changes: 15 additions & 1 deletion src/v/redpanda/admin/api-doc/migration.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@
]
}
},
"/v1/topics/unmounted": {
"get": {
"operationId": "list_unmounted_topics",
"summary": "List all unmounted topics",
"responses": {
"200": {
"description": "List of unmounted topics",
"schema": {
"$ref": "unmounted_topics_response"
}
}
}
}
},
"/v1/topics/mount": {
"post": {
"operationId": "mount_topics",
Expand Down Expand Up @@ -136,4 +150,4 @@
}
}
}
}
}
2 changes: 2 additions & 0 deletions src/v/redpanda/admin/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,8 @@ class admin_server {
delete_migration(std::unique_ptr<ss::http::request>);

// Topic routes
ss::future<ss::json::json_return_type>
list_unmounted_topics(std::unique_ptr<ss::http::request>);
ss::future<ss::json::json_return_type>
mount_topics(std::unique_ptr<ss::http::request>);
ss::future<ss::json::json_return_type>
Expand Down
41 changes: 41 additions & 0 deletions src/v/redpanda/admin/topics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "redpanda/admin/server.h"
#include "redpanda/admin/util.h"

#include <seastar/json/json_elements.hh>

using admin::apply_validator;

namespace {
Expand Down Expand Up @@ -120,9 +122,23 @@ json::validator make_unmount_array_validator() {
return json::validator(schema);
}

seastar::httpd::migration_json::labeled_namespaced_topic
to_admin_type(const cluster::data_migrations::labeled_namespaced_topic& topic) {
seastar::httpd::migration_json::labeled_namespaced_topic ret;
ret.label = ss::sstring(topic.label.cluster_uuid());
ret.topic = topic.topic.tp();
ret.ns = topic.topic.ns();
return ret;
}

} // namespace

void admin_server::register_topic_routes() {
register_route<superuser>(
ss::httpd::migration_json::list_unmounted_topics,
[this](std::unique_ptr<ss::http::request> req) {
return list_unmounted_topics(std::move(req));
});
register_route<superuser>(
ss::httpd::migration_json::mount_topics,
[this](std::unique_ptr<ss::http::request> req) {
Expand All @@ -135,6 +151,31 @@ void admin_server::register_topic_routes() {
});
}

ss::future<ss::json::json_return_type>
admin_server::list_unmounted_topics(std::unique_ptr<ss::http::request> req) {
auto result = co_await _controller->get_data_migration_frontend()
.local()
.list_unmounted_topics();
if (!result) {
vlog(
adminlog.warn,
"unable list unmounted topics - error: {}",
result.error());
co_await throw_on_error(*req, result.error(), model::controller_ntp);
throw ss::httpd::server_error_exception(
"unknown error when listing unmounted topics");
}

ss::httpd::migration_json::unmounted_topics_response reply;

// TODO(nv): register_route_raw_async
for (auto& topic : result.value()) {
reply.topics.push(to_admin_type(topic));
}

co_return std::move(reply);
}

ss::future<ss::json::json_return_type>
admin_server::mount_topics(std::unique_ptr<ss::http::request> req) {
static thread_local json::validator validator
Expand Down
4 changes: 4 additions & 0 deletions tests/rptest/services/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1697,6 +1697,10 @@ def delete_data_migration(self,
path = f"migrations/{migration_id}"
return self._request("DELETE", path, node=node)

def list_unmounted_topics(self, node: Optional[ClusterNode] = None):
path = "topics/unmounted"
return self._request("GET", path, node=node)

def unmount_topics(self,
topics: list[NamespacedTopic],
node: Optional[ClusterNode] = None):
Expand Down
Loading

0 comments on commit be9dc1b

Please sign in to comment.