Skip to content

Commit

Permalink
c/migrations: list unmounted admin endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
nvartolomei committed Oct 9, 2024
1 parent fc855fe commit 61420e9
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,15 @@ ss::future<> controller::start(
std::ref(_stm),
std::ref(_partition_leaders),
std::ref(_connections),
ss::sharded_parameter(
[this]() -> std::optional<
std::reference_wrapper<cloud_storage::topic_mount_handler>> {
if (_topic_mount_handler.local_is_initialized()) {
return std::ref(_topic_mount_handler.local());
} else {
return {};
}
}),
std::ref(_as));

co_await _data_migration_worker.start(
Expand Down
18 changes: 18 additions & 0 deletions src/v/cluster/data_migration_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
#include "cluster/data_migration_frontend.h"

#include "cloud_storage/topic_mount_handler.h"
#include "cluster/cluster_utils.h"
#include "cluster/commands.h"
#include "cluster/controller_stm.h"
Expand All @@ -25,6 +26,7 @@
#include "partition_leaders_table.h"
#include "rpc/connection_cache.h"
#include "ssx/future-util.h"
#include "utils/retry_chain_node.h"

#include <fmt/ostream.h>

Expand All @@ -38,6 +40,8 @@ frontend::frontend(
ss::sharded<controller_stm>& stm,
ss::sharded<partition_leaders_table>& leaders,
ss::sharded<rpc::connection_cache>& connections,
std::optional<std::reference_wrapper<cloud_storage::topic_mount_handler>>
topic_mount_handler,
ss::sharded<ss::abort_source>& as)
: _self(self)
, _cloud_storage_api_initialized(cloud_storage_api_initialized)
Expand All @@ -46,6 +50,7 @@ frontend::frontend(
, _controller(stm)
, _leaders_table(leaders)
, _connections(connections)
, _topic_mount_handler(topic_mount_handler)
, _as(as)
, _operation_timeout(10s) {}

Expand Down Expand Up @@ -300,6 +305,19 @@ frontend::get_migration(id migration_id) {
});
}

ss::future<frontend::list_unmounted_topics_result>
frontend::list_unmounted_topics() {
if (!_topic_mount_handler.has_value()) {
vlog(dm_log.warn, "topic mount handler is not initialized");
co_return co_await ssx::now<frontend::list_unmounted_topics_result>(
errc::feature_disabled);
}

auto rtc = retry_chain_node{_as.local(), 30s, 100ms};

co_return co_await _topic_mount_handler->get().list_unmounted_topics(rtc);
}

ss::future<std::error_code> frontend::insert_barrier() {
const auto barrier_deadline = _operation_timeout
+ model::timeout_clock::now();
Expand Down
12 changes: 12 additions & 0 deletions src/v/cluster/data_migration_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@
#pragma once

#include "base/outcome.h"
#include "cloud_storage/remote_label.h"
#include "cluster/data_migration_types.h"
#include "cluster/fwd.h"
#include "features/fwd.h"
#include "rpc/fwd.h"

#include <seastar/core/sharded.hh>

namespace cloud_storage {
class topic_mount_handler;
}

namespace cluster::data_migrations {

class frontend : public ss::peering_sharded_service<frontend> {
Expand All @@ -32,6 +37,7 @@ class frontend : public ss::peering_sharded_service<frontend> {
ss::sharded<controller_stm>&,
ss::sharded<partition_leaders_table>&,
ss::sharded<rpc::connection_cache>&,
std::optional<std::reference_wrapper<cloud_storage::topic_mount_handler>>,
ss::sharded<ss::abort_source>&);

ss::future<result<id>> create_migration(
Expand All @@ -50,6 +56,10 @@ class frontend : public ss::peering_sharded_service<frontend> {
ss::future<result<migration_metadata>> get_migration(id);
ss::future<chunked_vector<migration_metadata>> list_migrations();

using list_unmounted_topics_result
= result<chunked_vector<cloud_storage::labeled_namespaced_topic>>;
ss::future<list_unmounted_topics_result> list_unmounted_topics();

private:
/**
* Must be executed on data migrations shard
Expand Down Expand Up @@ -90,6 +100,8 @@ class frontend : public ss::peering_sharded_service<frontend> {
ss::sharded<controller_stm>& _controller;
ss::sharded<partition_leaders_table>& _leaders_table;
ss::sharded<rpc::connection_cache>& _connections;
std::optional<std::reference_wrapper<cloud_storage::topic_mount_handler>>
_topic_mount_handler;
ss::sharded<ss::abort_source>& _as;
std::chrono::milliseconds _operation_timeout;
};
Expand Down
31 changes: 31 additions & 0 deletions src/v/redpanda/admin/api-doc/migration.def.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,37 @@
}
}
},
"labeled_namespaced_topic": {
"type": "object",
"required": [
"topic"
],
"properties": {
"label": {
"type": "string",
"description": "Label of the topic"
},
"topic": {
"type": "string",
"description": "Topic name"
},
"ns": {
"type": "string",
"description": "Topic namespace, if not present it is assumed that topic is in kafka namespace"
}
}
},
"unmounted_topics_response": {
"type": "object",
"properties": {
"topics": {
"type": "array",
"items": {
"$ref": "labeled_namespaced_topic"
}
}
}
},
"outbound_migration": {
"type": "object",
"required": [
Expand Down
14 changes: 14 additions & 0 deletions src/v/redpanda/admin/api-doc/migration.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@
]
}
},
"/v1/topics/unmounted": {
"get": {
"operationId": "list_unmounted_topics",
"summary": "List all unmounted topics",
"responses": {
"200": {
"description": "List of unmounted topics",
"schema": {
"$ref": "unmounted_topics_response"
}
}
}
}
},
"/v1/topics/mount": {
"post": {
"operationId": "mount_topics",
Expand Down
2 changes: 2 additions & 0 deletions src/v/redpanda/admin/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,8 @@ class admin_server {
delete_migration(std::unique_ptr<ss::http::request>);

// Topic routes
ss::future<std::unique_ptr<ss::http::reply>> list_unmounted_topics(
std::unique_ptr<ss::http::request>, std::unique_ptr<ss::http::reply>);
ss::future<ss::json::json_return_type>
mount_topics(std::unique_ptr<ss::http::request>);
ss::future<ss::json::json_return_type>
Expand Down
74 changes: 74 additions & 0 deletions src/v/redpanda/admin/topics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,18 @@
#include "cluster/data_migration_frontend.h"
#include "cluster/data_migration_types.h"
#include "container/fragmented_vector.h"
#include "json/chunked_buffer.h"
#include "json/validator.h"
#include "redpanda/admin/api-doc/migration.json.hh"
#include "redpanda/admin/data_migration_utils.h"
#include "redpanda/admin/server.h"
#include "redpanda/admin/util.h"
#include "ssx/async_algorithm.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/do_with.hh>
#include <seastar/core/iostream.hh>
#include <seastar/json/json_elements.hh>

using admin::apply_validator;

Expand Down Expand Up @@ -120,9 +127,38 @@ json::validator make_unmount_array_validator() {
return json::validator(schema);
}

seastar::httpd::migration_json::labeled_namespaced_topic
to_admin_type(const cloud_storage::labeled_namespaced_topic& topic) {
seastar::httpd::migration_json::labeled_namespaced_topic ret;
ret.label = ss::sstring(topic.label.cluster_uuid());
ret.topic = topic.tp_ns.tp();
ret.ns = topic.tp_ns.ns();
return ret;
}

// Note: A similar method exists in pandaproxy::json. Extract it to
// `src/v/bytes/iostream.h`.
auto iobuf_body_writer(iobuf buf) {
return [buf = std::move(buf)](ss::output_stream<char> out) mutable {
return ss::do_with(
std::move(out),
[buf = std::move(buf)](ss::output_stream<char>& out) mutable {
return write_iobuf_to_output_stream(std::move(buf), out)
.finally([&out] { return out.close(); });
});
};
}

} // namespace

void admin_server::register_topic_routes() {
register_route_raw_async<superuser>(
ss::httpd::migration_json::list_unmounted_topics,
[this](
std::unique_ptr<ss::http::request> req,
std::unique_ptr<ss::http::reply> reply) {
return list_unmounted_topics(std::move(req), std::move(reply));
});
register_route<superuser>(
ss::httpd::migration_json::mount_topics,
[this](std::unique_ptr<ss::http::request> req) {
Expand All @@ -135,6 +171,44 @@ void admin_server::register_topic_routes() {
});
}

ss::future<std::unique_ptr<ss::http::reply>>
admin_server::list_unmounted_topics(
std::unique_ptr<ss::http::request> req,
std::unique_ptr<ss::http::reply> reply) {
auto result = co_await _controller->get_data_migration_frontend()
.local()
.list_unmounted_topics();
if (!result) {
vlog(
adminlog.warn,
"unable list unmounted topics - error: {}",
result.error());
co_await throw_on_error(*req, result.error(), model::controller_ntp);
throw ss::httpd::server_error_exception(
"unknown error when listing unmounted topics");
}

json::chunked_buffer buf;
json::Writer<json::chunked_buffer> writer(buf);
writer.StartObject();
writer.Key("topics");
writer.StartArray();
co_await ssx::async_for_each(
result.value().begin(),
result.value().end(),
[&writer](const cloud_storage::labeled_namespaced_topic& topic) {
auto json_str = to_admin_type(topic).to_json();
writer.RawValue(
json_str.c_str(), json_str.size(), rapidjson::Type::kObjectType);
});
writer.EndArray();
writer.EndObject();

reply->write_body("json", iobuf_body_writer(std::move(buf).as_iobuf()));

co_return std::move(reply);
}

ss::future<ss::json::json_return_type>
admin_server::mount_topics(std::unique_ptr<ss::http::request> req) {
static thread_local json::validator validator
Expand Down
4 changes: 4 additions & 0 deletions tests/rptest/services/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1766,6 +1766,10 @@ def delete_data_migration(self,
path = f"migrations/{migration_id}"
return self._request("DELETE", path, node=node)

def list_unmounted_topics(self, node: Optional[ClusterNode] = None):
path = "topics/unmounted"
return self._request("GET", path, node=node)

def unmount_topics(self,
topics: list[NamespacedTopic],
node: Optional[ClusterNode] = None):
Expand Down
48 changes: 48 additions & 0 deletions tests/rptest/tests/data_migrations_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,3 +799,51 @@ def test_migrated_topic_data_integrity(self, transfer_leadership: bool,
read_blocked=False,
produce_blocked=False)
remounted = True

@cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST)
def test_list_unmounted_topics(self):
topics = [TopicSpec(partition_count=3) for i in range(5)]

for t in topics:
self.client().create_topic(t)

admin = Admin(self.redpanda)
unmounted_response = admin.list_unmounted_topics().json()
assert len(unmounted_response["topics"]
) == 0, "There should be no unmounted topics"

outbound_topics = [make_namespaced_topic(t.name) for t in topics]
reply = self.admin.unmount_topics(outbound_topics).json()
self.logger.info(f"create migration reply: {reply}")

self.logger.info('waiting for partitions be deleted')
self.wait_partitions_disappear(topics)

unmounted_response = admin.list_unmounted_topics().json()
assert len(unmounted_response["topics"]) == len(
topics), "There should be unmounted topics"

# Mount 3 topics based on the unmounted topics response. This ensures
# that the response is correct/usable.
inbound_topics = [
InboundTopic(NamespacedTopic(topic=t["topic"], namespace=t["ns"]),
alias=None) for t in unmounted_response["topics"][:3]
]
mount_resp = self.admin.mount_topics(inbound_topics).json()

# Build expectations based on original topic specs that match the
# unmounted topics response.
expected_topic_specs = []
for t in unmounted_response["topics"][:3]:
expected_topic_specs.append(
TopicSpec(name=t["topic"], partition_count=3))

self.wait_partitions_appear(expected_topic_specs)

# Wait for the migration to complete. This guarantees that the mount manifests
# are deleted.
self.wait_migration_disappear(mount_resp["id"])

unmounted_response = admin.list_unmounted_topics().json()
assert len(unmounted_response["topics"]
) == 2, "There should be 2 unmounted topics"

0 comments on commit 61420e9

Please sign in to comment.