Skip to content

Commit

Permalink
c/migrations: list mountable admin endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
nvartolomei committed Oct 23, 2024
1 parent 763685d commit a702401
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/v/cluster/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@ redpanda_cc_library(
"//src/v/bytes:streambuf",
"//src/v/cloud_storage",
"//src/v/cloud_storage:remote_label",
"//src/v/cloud_storage:topic_mount_manifest_path",
"//src/v/cloud_storage_clients",
"//src/v/config",
"//src/v/container:chunked_hash_map",
Expand Down
9 changes: 9 additions & 0 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,15 @@ ss::future<> controller::start(
std::ref(_stm),
std::ref(_partition_leaders),
std::ref(_connections),
ss::sharded_parameter(
[this]() -> std::optional<
std::reference_wrapper<cloud_storage::topic_mount_handler>> {
if (_topic_mount_handler.local_is_initialized()) {
return std::ref(_topic_mount_handler.local());
} else {
return {};
}
}),
std::ref(_as));

co_await _data_migration_worker.start(
Expand Down
18 changes: 18 additions & 0 deletions src/v/cluster/data_migration_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
#include "cluster/data_migration_frontend.h"

#include "cloud_storage/topic_mount_handler.h"
#include "cluster/cluster_utils.h"
#include "cluster/commands.h"
#include "cluster/controller_stm.h"
Expand All @@ -26,6 +27,7 @@
#include "rpc/connection_cache.h"
#include "ssx/future-util.h"
#include "ssx/single_sharded.h"
#include "utils/retry_chain_node.h"

#include <fmt/ostream.h>

Expand All @@ -39,6 +41,8 @@ frontend::frontend(
ss::sharded<controller_stm>& stm,
ss::sharded<partition_leaders_table>& leaders,
ss::sharded<rpc::connection_cache>& connections,
std::optional<std::reference_wrapper<cloud_storage::topic_mount_handler>>
topic_mount_handler,
ss::sharded<ss::abort_source>& as)
: _self(self)
, _cloud_storage_api_initialized(cloud_storage_api_initialized)
Expand All @@ -47,6 +51,7 @@ frontend::frontend(
, _controller(stm)
, _leaders_table(leaders)
, _connections(connections)
, _topic_mount_handler(topic_mount_handler)
, _as(as)
, _operation_timeout(10s) {}

Expand Down Expand Up @@ -303,6 +308,19 @@ frontend::get_migration(id migration_id) {
});
}

ss::future<frontend::list_mountable_topics_result>
frontend::list_mountable_topics() {
if (!_topic_mount_handler.has_value()) {
vlog(dm_log.warn, "topic mount handler is not initialized");
co_return co_await ssx::now<frontend::list_mountable_topics_result>(
errc::feature_disabled);
}

auto rtc = retry_chain_node{_as.local(), 30s, 100ms};

co_return co_await _topic_mount_handler->get().list_mountable_topics(rtc);
}

ss::future<std::error_code> frontend::insert_barrier() {
const auto barrier_deadline = _operation_timeout
+ model::timeout_clock::now();
Expand Down
13 changes: 13 additions & 0 deletions src/v/cluster/data_migration_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#pragma once

#include "base/outcome.h"
#include "cloud_storage/remote_label.h"
#include "cloud_storage/topic_mount_manifest_path.h"
#include "cluster/data_migration_types.h"
#include "cluster/fwd.h"
#include "features/fwd.h"
Expand All @@ -19,6 +21,10 @@

#include <seastar/core/sharded.hh>

namespace cloud_storage {
class topic_mount_handler;
}

namespace cluster::data_migrations {

class frontend : public ss::peering_sharded_service<frontend> {
Expand All @@ -33,6 +39,7 @@ class frontend : public ss::peering_sharded_service<frontend> {
ss::sharded<controller_stm>&,
ss::sharded<partition_leaders_table>&,
ss::sharded<rpc::connection_cache>&,
std::optional<std::reference_wrapper<cloud_storage::topic_mount_handler>>,
ss::sharded<ss::abort_source>&);

ss::future<result<id>> create_migration(
Expand All @@ -51,6 +58,10 @@ class frontend : public ss::peering_sharded_service<frontend> {
ss::future<result<migration_metadata>> get_migration(id);
ss::future<chunked_vector<migration_metadata>> list_migrations();

using list_mountable_topics_result
= result<chunked_vector<cloud_storage::topic_mount_manifest_path>>;
ss::future<list_mountable_topics_result> list_mountable_topics();

private:
/**
* Must be executed on data migrations shard
Expand Down Expand Up @@ -91,6 +102,8 @@ class frontend : public ss::peering_sharded_service<frontend> {
ss::sharded<controller_stm>& _controller;
ss::sharded<partition_leaders_table>& _leaders_table;
ss::sharded<rpc::connection_cache>& _connections;
std::optional<std::reference_wrapper<cloud_storage::topic_mount_handler>>
_topic_mount_handler;
ss::sharded<ss::abort_source>& _as;
std::chrono::milliseconds _operation_timeout;
};
Expand Down
32 changes: 30 additions & 2 deletions src/v/redpanda/admin/api-doc/migration.def.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,35 @@
},
"ns": {
"type": "string",
"description": "Topic namespace, if not present it is assumed that topic is in kafka namespace"
"description": "Topic namespace. If not present it is assumed that topic is in the \"kafka\" namespace"
}
}
},
"mountable_topic": {
"type": "object",
"properties": {
"topic_location": {
"type": "string",
"description": "Unique topic location in cloud storage with the format <topic name>/<cluster_uuid>/<initial_revision>"
},
"topic": {
"type": "string",
"description": "Topic name"
},
"ns": {
"type": "string",
"description": "Topic namespace. If not present it is assumed that topic is in the \"kafka\" namespace"
}
}
},
"list_mountable_topics_response": {
"type": "object",
"properties": {
"topics": {
"type": "array",
"items": {
"$ref": "mountable_topic"
}
}
}
},
Expand Down Expand Up @@ -183,4 +211,4 @@
"type": "int"
}
}
}
}
14 changes: 14 additions & 0 deletions src/v/redpanda/admin/api-doc/migration.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@
]
}
},
"/v1/topics/mountable": {
"get": {
"operationId": "list_mountable_topics",
"summary": "List mountable topics",
"responses": {
"200": {
"description": "List of mountable topics",
"schema": {
"$ref": "list_mountable_topics_response"
}
}
}
}
},
"/v1/topics/mount": {
"post": {
"operationId": "mount_topics",
Expand Down
2 changes: 2 additions & 0 deletions src/v/redpanda/admin/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,8 @@ class admin_server {
delete_migration(std::unique_ptr<ss::http::request>);

// Topic routes
ss::future<std::unique_ptr<ss::http::reply>> list_mountable_topics(
std::unique_ptr<ss::http::request>, std::unique_ptr<ss::http::reply>);
ss::future<ss::json::json_return_type>
mount_topics(std::unique_ptr<ss::http::request>);
ss::future<ss::json::json_return_type>
Expand Down
80 changes: 80 additions & 0 deletions src/v/redpanda/admin/topics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,20 @@
#include "cluster/data_migration_frontend.h"
#include "cluster/data_migration_types.h"
#include "container/fragmented_vector.h"
#include "json/chunked_buffer.h"
#include "json/validator.h"
#include "redpanda/admin/api-doc/migration.json.hh"
#include "redpanda/admin/data_migration_utils.h"
#include "redpanda/admin/server.h"
#include "redpanda/admin/util.h"
#include "ssx/async_algorithm.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/do_with.hh>
#include <seastar/core/iostream.hh>
#include <seastar/json/json_elements.hh>

#include <optional>

using admin::apply_validator;

Expand Down Expand Up @@ -120,9 +129,42 @@ json::validator make_unmount_array_validator() {
return json::validator(schema);
}

seastar::httpd::migration_json::mountable_topic to_admin_type(
const cloud_storage::topic_mount_manifest_path& mount_manifest_path) {
seastar::httpd::migration_json::mountable_topic ret;
ret.topic_location = fmt::format(
"{}/{}/{}",
mount_manifest_path.tp_ns().tp(),
mount_manifest_path.cluster_uuid(),
mount_manifest_path.rev());
ret.topic = mount_manifest_path.tp_ns().tp();
ret.ns = mount_manifest_path.tp_ns().ns();
return ret;
}

// Note: A similar method exists in pandaproxy::json. Extract it to
// `src/v/bytes/iostream.h`.
auto iobuf_body_writer(iobuf buf) {
return [buf = std::move(buf)](ss::output_stream<char> out) mutable {
return ss::do_with(
std::move(out),
[buf = std::move(buf)](ss::output_stream<char>& out) mutable {
return write_iobuf_to_output_stream(std::move(buf), out)
.finally([&out] { return out.close(); });
});
};
}

} // namespace

void admin_server::register_topic_routes() {
register_route_raw_async<superuser>(
ss::httpd::migration_json::list_mountable_topics,
[this](
std::unique_ptr<ss::http::request> req,
std::unique_ptr<ss::http::reply> reply) {
return list_mountable_topics(std::move(req), std::move(reply));
});
register_route<superuser>(
ss::httpd::migration_json::mount_topics,
[this](std::unique_ptr<ss::http::request> req) {
Expand All @@ -135,6 +177,44 @@ void admin_server::register_topic_routes() {
});
}

ss::future<std::unique_ptr<ss::http::reply>>
admin_server::list_mountable_topics(
std::unique_ptr<ss::http::request> req,
std::unique_ptr<ss::http::reply> reply) {
auto result = co_await _controller->get_data_migration_frontend()
.local()
.list_mountable_topics();
if (!result) {
vlog(
adminlog.warn,
"unable list mountable topics - error: {}",
result.error());
co_await throw_on_error(*req, result.error(), model::controller_ntp);
throw ss::httpd::server_error_exception(
"unknown error when listing mountable topics");
}

json::chunked_buffer buf;
json::Writer<json::chunked_buffer> writer(buf);
writer.StartObject();
writer.Key("topics");
writer.StartArray();
co_await ssx::async_for_each(
result.value().begin(),
result.value().end(),
[&writer](const cloud_storage::topic_mount_manifest_path& topic) {
auto json_str = to_admin_type(topic).to_json();
writer.RawValue(
json_str.c_str(), json_str.size(), rapidjson::Type::kObjectType);
});
writer.EndArray();
writer.EndObject();

reply->write_body("json", iobuf_body_writer(std::move(buf).as_iobuf()));

co_return std::move(reply);
}

ss::future<ss::json::json_return_type>
admin_server::mount_topics(std::unique_ptr<ss::http::request> req) {
static thread_local json::validator validator
Expand Down
4 changes: 4 additions & 0 deletions tests/rptest/services/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1782,6 +1782,10 @@ def delete_data_migration(self,
path = f"migrations/{migration_id}"
return self._request("DELETE", path, node=node)

def list_mountable_topics(self, node: Optional[ClusterNode] = None):
path = "topics/mountable"
return self._request("GET", path, node=node)

def unmount_topics(self,
topics: list[NamespacedTopic],
node: Optional[ClusterNode] = None):
Expand Down
66 changes: 66 additions & 0 deletions tests/rptest/tests/data_migrations_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,3 +942,69 @@ def test_migrated_topic_data_integrity(self, transfer_leadership: bool,
read_blocked=False,
produce_blocked=False)
remounted = True

@cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST)
def test_list_mountable_topics(self):
topics = [TopicSpec(partition_count=3) for i in range(5)]

for t in topics:
self.client().create_topic(t)

admin = Admin(self.redpanda)
list_mountable_res = admin.list_mountable_topics().json()
assert len(list_mountable_res["topics"]
) == 0, "There should be no mountable topics"

outbound_topics = [make_namespaced_topic(t.name) for t in topics]
reply = self.admin.unmount_topics(outbound_topics).json()
self.logger.info(f"create migration reply: {reply}")

self.logger.info('waiting for partitions be deleted')
self.wait_partitions_disappear(topics)

list_mountable_res = admin.list_mountable_topics().json()
assert len(list_mountable_res["topics"]) == len(
topics), "There should be mountable topics"

initial_names = [t.name for t in topics]
mountable_topic_names = [
t["topic"] for t in list_mountable_res["topics"]
]
assert set(initial_names) == set(mountable_topic_names), \
f"Initial topics: {initial_names} should match mountable topics: {mountable_topic_names}"

for t in list_mountable_res["topics"]:
assert t["ns"] == "kafka", f"namespace is not set correctly: {t}"
assert t["topic_location"] != t["topic"] and "/" in t[
"topic_location"], f"topic location is not set correctly: {t}"

# Mount 3 topics based on the mountable topics response. This ensures
# that the response is correct/usable.
# The first 2 are mounted by name, the third by location.
inbound_topics = [
InboundTopic(NamespacedTopic(topic=t["topic"], namespace=t["ns"]))
for t in list_mountable_res["topics"][:2]
] + [
InboundTopic(
NamespacedTopic(
topic=list_mountable_res["topics"][2]["topic_location"],
namespace=list_mountable_res["topics"][2]["ns"]))
]
mount_resp = self.admin.mount_topics(inbound_topics).json()

# Build expectations based on original topic specs that match the
# mountable topics response.
expected_topic_specs = []
for t in list_mountable_res["topics"][:3]:
expected_topic_specs.append(
TopicSpec(name=t["topic"], partition_count=3))

self.wait_partitions_appear(expected_topic_specs)

# Wait for the migration to complete. This guarantees that the mount manifests
# are deleted.
self.wait_migration_disappear(mount_resp["id"])

list_mountable_res = admin.list_mountable_topics().json()
assert len(list_mountable_res["topics"]
) == 2, "There should be 2 mountable topics"

0 comments on commit a702401

Please sign in to comment.