Skip to content

Commit

Permalink
c/migrations: list unmounted topics
Browse files Browse the repository at this point in the history
  • Loading branch information
nvartolomei committed Oct 4, 2024
1 parent 3cf8b53 commit b288d47
Show file tree
Hide file tree
Showing 19 changed files with 395 additions and 26 deletions.
21 changes: 21 additions & 0 deletions src/v/cloud_storage/remote_label.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#pragma once

#include "model/fundamental.h"
#include "model/metadata.h"
#include "serde/rw/envelope.h"
#include "serde/rw/uuid.h"

Expand Down Expand Up @@ -36,4 +37,24 @@ struct remote_label
}
};

struct labeled_namespaced_topic
: serde::envelope<
labeled_namespaced_topic,
serde::version<0>,
serde::compat_version<0>> {
using rpc_adl_exempt = std::true_type;

cloud_storage::remote_label label;
model::topic_namespace tp_ns;

auto serde_fields() { return std::tie(label, tp_ns); }

friend bool
operator==(const labeled_namespaced_topic&, const labeled_namespaced_topic&)
= default;

friend std::ostream&
operator<<(std::ostream&, const labeled_namespaced_topic&);
};

} // namespace cloud_storage
1 change: 1 addition & 0 deletions src/v/cloud_storage/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ redpanda_cc_gtest(
deps = [
":s3_imposter_gtest",
"//src/v/cloud_storage",
"//src/v/cloud_storage:remote_label",
"//src/v/cloud_storage_clients",
"//src/v/cluster:topic_configuration",
"//src/v/config",
Expand Down
3 changes: 3 additions & 0 deletions src/v/cloud_storage/tests/remote_path_provider_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,9 @@ TEST(BogusManifestPath, TestTopicMountManifestPath) {
"migration/foo",
"migration/foo/bar",
"migration/foo/bar/baz",
"migration/foo/bar/baz/qux",
// Valid label/uuid but too many path components.
"migration/deadbeef-0000-0000-0000-000000000000/kafka/tp/qux",
}) {
const auto tmp = topic_mount_manifest_path::parse(path);
ASSERT_FALSE(tmp.has_value())
Expand Down
89 changes: 80 additions & 9 deletions src/v/cloud_storage/tests/topic_mount_handler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#include "cloud_storage/remote.h"
#include "cloud_storage/remote_label.h"
#include "cloud_storage/tests/s3_imposter.h"
#include "cloud_storage/topic_mount_handler.h"
#include "cloud_storage/types.h"
Expand All @@ -23,6 +24,8 @@

#include <seastar/core/abort_source.hh>

#include <gtest/gtest.h>

#include <chrono>

using namespace cloud_storage;
Expand Down Expand Up @@ -52,9 +55,7 @@ get_topic_configuration(cluster::topic_properties topic_props) {

} // namespace

struct TopicMountHandlerFixture
: public s3_imposter_fixture
, public testing::TestWithParam<std::tuple<bool, bool>> {
struct TopicMountHandlerFixture : public s3_imposter_fixture {
TopicMountHandlerFixture() {
pool.start(10, ss::sharded_parameter([this] { return conf; })).get();
io.start(
Expand All @@ -79,7 +80,15 @@ struct TopicMountHandlerFixture
ss::sharded<remote> remote;
};

TEST_P(TopicMountHandlerFixture, TestMountTopicManifestDoesNotExist) {
struct TopicMountHandlerSuite
: public TopicMountHandlerFixture
, public testing::TestWithParam<std::tuple<bool, bool>> {};

struct TopicMountHandlerListSuite
: public TopicMountHandlerFixture
, public ::testing::Test {};

TEST_P(TopicMountHandlerSuite, TestMountTopicManifestDoesNotExist) {
set_expectations_and_listen({});

auto topic_props = cluster::topic_properties{};
Expand All @@ -105,7 +114,7 @@ TEST_P(TopicMountHandlerFixture, TestMountTopicManifestDoesNotExist) {
confirm_result, topic_mount_result::mount_manifest_does_not_exist);
}

TEST_P(TopicMountHandlerFixture, TestMountTopicManifestNotDeleted) {
TEST_P(TopicMountHandlerSuite, TestMountTopicManifestNotDeleted) {
set_expectations_and_listen({});
retry_chain_node rtc(never_abort, 10s, 20ms);

Expand Down Expand Up @@ -169,7 +178,7 @@ TEST_P(TopicMountHandlerFixture, TestMountTopicManifestNotDeleted) {
ASSERT_EQ(exists_result, download_result::success);
}

TEST_P(TopicMountHandlerFixture, TestMountTopicSuccess) {
TEST_P(TopicMountHandlerSuite, TestMountTopicSuccess) {
set_expectations_and_listen({});
retry_chain_node rtc(never_abort, 10s, 20ms);

Expand Down Expand Up @@ -217,7 +226,7 @@ TEST_P(TopicMountHandlerFixture, TestMountTopicSuccess) {
ASSERT_EQ(exists_result, download_result::notfound);
}

TEST_P(TopicMountHandlerFixture, TestUnmountTopicManifestNotCreated) {
TEST_P(TopicMountHandlerSuite, TestUnmountTopicManifestNotCreated) {
set_expectations_and_listen({});
retry_chain_node rtc(never_abort, 10s, 20ms);

Expand Down Expand Up @@ -269,7 +278,7 @@ TEST_P(TopicMountHandlerFixture, TestUnmountTopicManifestNotCreated) {
ASSERT_EQ(exists_result, download_result::notfound);
}

TEST_P(TopicMountHandlerFixture, TestUnmountTopicSuccess) {
TEST_P(TopicMountHandlerSuite, TestUnmountTopicSuccess) {
set_expectations_and_listen({});
retry_chain_node rtc(never_abort, 10s, 20ms);

Expand Down Expand Up @@ -305,7 +314,69 @@ TEST_P(TopicMountHandlerFixture, TestUnmountTopicSuccess) {
ASSERT_EQ(exists_result, download_result::success);
}

TEST_F(TopicMountHandlerListSuite, TestListUnmountedTopics) {
set_expectations_and_listen({});
retry_chain_node rtc(never_abort, 10s, 20ms);

auto handler = topic_mount_handler(bucket_name, remote.local());
auto result = handler.list_unmounted_topics(rtc).get();

// Create some dummy expectations to make sure we don't fail.
add_expectations({
expectation{.url = "foobar"},
expectation{.url = "migration/foo"},
expectation{.url = "migration/foo/bar"},
expectation{.url = "migration/foo/bar/baz"},
});

ASSERT_TRUE(result);
ASSERT_TRUE(result.value().empty());

std::vector<cluster::topic_configuration> topics{
cluster::topic_configuration(
model::ns("kafka"), model::topic("tp1"), 1, 1),
cluster::topic_configuration(
model::ns("kafka"), model::topic("tp2"), 1, 1),
};

for (auto label : {model::default_cluster_uuid, test_uuid}) {
for (const auto& tp_ns : {test_tp_ns, test_tp_ns_override}) {
auto topic = cluster::topic_configuration(tp_ns.ns, tp_ns.tp, 1, 1);
if (label != model::default_cluster_uuid) {
topic.properties.remote_label = remote_label{label};
}
if (tp_ns != test_tp_ns) {
topic.properties.remote_topic_namespace_override = tp_ns;
}
topics.push_back(topic);
}
}

auto expectations = std::vector<labeled_namespaced_topic>{};
for (const auto& topic : topics) {
ASSERT_EQ(
handler.unmount_topic(topic, rtc).get(),
topic_unmount_result::success);

expectations.emplace_back(labeled_namespaced_topic{
.label = topic.properties.remote_label.value_or(
remote_label{model::default_cluster_uuid}),
.tp_ns = topic.tp_ns,
});
}

result = handler.list_unmounted_topics(rtc).get();
ASSERT_TRUE(result);
ASSERT_EQ(result.value().size(), topics.size());

for (const auto& topic : result.value()) {
auto it = std::find(expectations.begin(), expectations.end(), topic);
ASSERT_NE(it, expectations.end());
expectations.erase(it);
}
}

INSTANTIATE_TEST_SUITE_P(
TopicMountHandlerOverride,
TopicMountHandlerFixture,
TopicMountHandlerSuite,
testing::Combine(testing::Bool(), testing::Bool()));
41 changes: 41 additions & 0 deletions src/v/cloud_storage/topic_mount_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#include "cloud_storage/remote.h"
#include "cloud_storage/remote_path_provider.h"
#include "cloud_storage/topic_mount_manifest.h"
#include "cloud_storage/types.h"
#include "model/metadata.h"
#include "utils/retry_chain_node.h"

namespace cloud_storage {

Expand Down Expand Up @@ -155,6 +157,45 @@ ss::future<topic_unmount_result> topic_mount_handler::unmount_topic(
co_return topic_unmount_result::success;
}

ss::future<result<chunked_vector<labeled_namespaced_topic>>>
topic_mount_handler::list_unmounted_topics(retry_chain_node& parent) {
auto log = retry_chain_logger(cst_log, parent);

cloud_storage_clients::object_key prefix{
cloud_storage::topic_mount_manifest_path::prefix()};

vlog(log.debug, "listing unmounted topics with prefix {}", prefix);

auto list_result = co_await _remote.list_objects(_bucket, parent, prefix);
if (!list_result) {
vlog(log.error, "failed to list objects: {}", list_result.error());
co_return list_result.error();
}

chunked_vector<labeled_namespaced_topic> ret;
for (const auto& item : list_result.assume_value().contents) {
vlog(log.trace, "parsing object {}", item.key);

auto path_parse_result
= cloud_storage::topic_mount_manifest_path::parse(
std::string(item.key));
if (!path_parse_result) {
vlog(log.error, "failed to parse object key {}", item.key);
continue;
}

ret.emplace_back(labeled_namespaced_topic{
.label = cloud_storage::remote_label(
path_parse_result->cluster_uuid()),
.tp_ns = path_parse_result->tp_ns(),
});
}

vlog(log.trace, "found {} unmounted topics", ret.size());

co_return ret;
}

ss::future<topic_mount_result> topic_mount_handler::prepare_mount_topic(
const cluster::topic_configuration& topic_cfg, retry_chain_node& parent) {
return mount_topic(topic_cfg, true, parent);
Expand Down
6 changes: 6 additions & 0 deletions src/v/cloud_storage/topic_mount_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
*/
#pragma once

#include "base/outcome.h"
#include "cloud_storage/fwd.h"
#include "cloud_storage/remote_label.h"
#include "cloud_storage_clients/types.h"
#include "cluster/topic_configuration.h"
#include "container/fragmented_vector.h"
#include "model/fundamental.h"
#include "utils/retry_chain_node.h"

Expand Down Expand Up @@ -63,6 +65,10 @@ class topic_mount_handler {
ss::future<topic_unmount_result> unmount_topic(
const cluster::topic_configuration& topic_cfg, retry_chain_node& parent);

// List from cloud storage all topics that are not mounted.
ss::future<result<chunked_vector<labeled_namespaced_topic>>>
list_unmounted_topics(retry_chain_node& parent);

private:
// Perform the mounting process by deleting the topic mount manifest.
// topic_cfg should be the recovered topic configuration from a topic
Expand Down
21 changes: 21 additions & 0 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "cluster/controller.h"

#include "base/likely.h"
#include "cloud_storage/topic_mount_handler.h"
#include "cluster/bootstrap_backend.h"
#include "cluster/client_quota_backend.h"
#include "cluster/client_quota_frontend.h"
Expand Down Expand Up @@ -305,6 +306,13 @@ ss::future<> controller::start(
std::ref(_members_table),
std::ref(_as));

if (auto bucket_opt = get_configured_bucket(); bucket_opt.has_value()) {
co_await _topic_mount_handler.start(
bucket_opt.value(), ss::sharded_parameter([this] {
return std::ref(_cloud_storage_api.local());
}));
}

co_await _data_migration_frontend.start(
_raft0->self().id(),
_cloud_storage_api.local_is_initialized(),
Expand All @@ -313,6 +321,15 @@ ss::future<> controller::start(
std::ref(_stm),
std::ref(_partition_leaders),
std::ref(_connections),
ss::sharded_parameter(
[this]() -> std::optional<
std::reference_wrapper<cloud_storage::topic_mount_handler>> {
if (_topic_mount_handler.local_is_initialized()) {
return std::ref(_topic_mount_handler.local());
} else {
return {};
}
}),
std::ref(_as));

co_await _data_migration_worker.start(
Expand Down Expand Up @@ -782,6 +799,9 @@ ss::future<> controller::start(
_cloud_storage_api.local_is_initialized()
? std::make_optional(std::ref(_cloud_storage_api.local()))
: std::nullopt,
_topic_mount_handler.local_is_initialized()
? std::make_optional(std::ref(_topic_mount_handler.local()))
: std::nullopt,
std::ref(_as.local()));
co_await _data_migration_backend.invoke_on_instance(
&data_migrations::backend::start);
Expand Down Expand Up @@ -846,6 +866,7 @@ ss::future<> controller::stop() {
co_await _members_backend.stop();
co_await _data_migration_worker.stop();
co_await _data_migration_frontend.stop();
co_await _topic_mount_handler.stop();
co_await _config_manager.stop();
co_await _api.stop();
co_await _shard_balancer.stop();
Expand Down
6 changes: 5 additions & 1 deletion src/v/cluster/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

#pragma once

#include "cloud_storage/fwd.h"
#include "cluster/cloud_metadata/producer_id_recovery_manager.h"
#include "cluster/controller_probe.h"
#include "cluster/controller_stm.h"
Expand All @@ -34,6 +33,10 @@
#include <chrono>
#include <vector>

namespace cloud_storage {
class topic_mount_handler;
}

namespace cluster {

class cluster_discovery;
Expand Down Expand Up @@ -325,6 +328,7 @@ class controller {
ss::sharded<client_quota::store> _quota_store; // instance per core
ss::sharded<client_quota::backend> _quota_backend; // single instance
ss::sharded<data_migrations::worker> _data_migration_worker;
ss::sharded<cloud_storage::topic_mount_handler> _topic_mount_handler;
ssx::single_sharded<data_migrations::backend> _data_migration_backend;
ss::sharded<data_migrations::irpc_frontend> _data_migration_irpc_frontend;
ss::gate _gate;
Expand Down
Loading

0 comments on commit b288d47

Please sign in to comment.