Skip to content

Commit ece6224

Browse files
Merge pull request #11212 from andrwng/cluster-metadata-plugin
cloud_metadata: plug upload loop into application
2 parents ea167b6 + 60926a3 commit ece6224

File tree

16 files changed

+707
-46
lines changed

16 files changed

+707
-46
lines changed

src/v/cluster/cloud_metadata/manifest_downloads.cc

+92-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ namespace {
2222
const std::regex cluster_metadata_manifest_prefix_expr{
2323
R"REGEX(cluster_metadata/[a-z0-9-]+/manifests/(\d+)/)REGEX"};
2424

25+
const std::regex cluster_metadata_manifest_expr{
26+
R"REGEX(cluster_metadata/([a-z0-9-]+)/manifests/(\d+)/cluster_manifest.json)REGEX"};
27+
2528
} // anonymous namespace
2629

2730
namespace cluster::cloud_metadata {
@@ -72,10 +75,14 @@ ss::future<cluster_manifest_result> download_highest_manifest_for_cluster(
7275
if (!matches_manifest_expr) {
7376
continue;
7477
}
78+
vassert(
79+
matches.size() >= 2,
80+
"Unexpected size of regex match",
81+
matches.size());
7582
const auto& meta_id_str = matches[1].str();
7683
cluster_metadata_id meta_id;
7784
try {
78-
meta_id = cluster_metadata_id(std::stoi(meta_id_str.c_str()));
85+
meta_id = cluster_metadata_id(std::stol(meta_id_str.c_str()));
7986
} catch (...) {
8087
vlog(
8188
clusterlog.debug,
@@ -145,4 +152,88 @@ ss::future<std::list<ss::sstring>> list_orphaned_by_manifest(
145152
co_return ret;
146153
}
147154

155+
ss::future<cluster_manifest_result> download_highest_manifest_in_bucket(
156+
cloud_storage::remote& remote,
157+
const cloud_storage_clients::bucket_name& bucket,
158+
retry_chain_node& retry_node) {
159+
// Look for unique cluster UUIDs for which we have metadata.
160+
constexpr auto cluster_prefix = "cluster_metadata/";
161+
vlog(clusterlog.trace, "Listing objects with prefix {}", cluster_prefix);
162+
auto list_res = co_await remote.list_objects(
163+
bucket,
164+
retry_node,
165+
cloud_storage_clients::object_key(cluster_prefix),
166+
std::nullopt);
167+
if (list_res.has_error()) {
168+
vlog(clusterlog.debug, "Error downloading manifest", list_res.error());
169+
co_return error_outcome::list_failed;
170+
}
171+
// Examine all cluster metadata in this bucket.
172+
auto& cluster_metadata_items = list_res.value().contents;
173+
if (cluster_metadata_items.empty()) {
174+
vlog(clusterlog.debug, "No manifests found in bucket {}", bucket());
175+
co_return error_outcome::no_matching_metadata;
176+
}
177+
178+
// Look through those that look like cluster metadata manifests and find
179+
// the one with the highest metadata ID. This will be the returned to the
180+
// caller.
181+
model::cluster_uuid uuid_with_highest_meta_id{};
182+
cluster_metadata_id highest_meta_id{};
183+
for (const auto& item : cluster_metadata_items) {
184+
std::smatch matches;
185+
std::string k = item.key;
186+
const auto matches_manifest_expr = std::regex_match(
187+
k.cbegin(), k.cend(), matches, cluster_metadata_manifest_expr);
188+
if (!matches_manifest_expr) {
189+
continue;
190+
}
191+
const auto& cluster_uuid_str = matches[1].str();
192+
const auto& meta_id_str = matches[2].str();
193+
cluster_metadata_id meta_id{};
194+
model::cluster_uuid cluster_uuid{};
195+
try {
196+
meta_id = cluster_metadata_id(std::stoi(meta_id_str.c_str()));
197+
} catch (...) {
198+
vlog(
199+
clusterlog.debug,
200+
"Ignoring invalid metadata ID: {}",
201+
meta_id_str);
202+
continue;
203+
}
204+
try {
205+
auto u = boost::lexical_cast<uuid_t::underlying_t>(
206+
cluster_uuid_str);
207+
std::vector<uint8_t> uuid_vec{u.begin(), u.end()};
208+
cluster_uuid = model::cluster_uuid(std::move(uuid_vec));
209+
} catch (...) {
210+
vlog(
211+
clusterlog.debug,
212+
"Ignoring invalid cluster UUID: {}",
213+
cluster_uuid_str);
214+
continue;
215+
}
216+
if (meta_id > highest_meta_id) {
217+
highest_meta_id = meta_id;
218+
uuid_with_highest_meta_id = cluster_uuid;
219+
}
220+
}
221+
if (highest_meta_id == cluster_metadata_id{}) {
222+
vlog(clusterlog.debug, "No valid manifests in bucket {}", bucket());
223+
co_return error_outcome::no_matching_metadata;
224+
}
225+
cluster_metadata_manifest manifest;
226+
auto manifest_res = co_await remote.download_manifest(
227+
bucket,
228+
cluster_manifest_key(uuid_with_highest_meta_id, highest_meta_id),
229+
manifest,
230+
retry_node);
231+
if (manifest_res != cloud_storage::download_result::success) {
232+
vlog(
233+
clusterlog.debug, "Manifest download failed with {}", manifest_res);
234+
co_return error_outcome::download_failed;
235+
}
236+
co_return manifest;
237+
}
238+
148239
} // namespace cluster::cloud_metadata

src/v/cluster/cloud_metadata/manifest_downloads.h

+11
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,15 @@ ss::future<std::list<ss::sstring>> list_orphaned_by_manifest(
4848
const cluster_metadata_manifest& manifest,
4949
retry_chain_node& retry_node);
5050

51+
// Looks through the given bucket for cluster metadata with the highest
52+
// metadata ID.
53+
// - list_failed/download_failed: there was a physical error sending requests
54+
// to remote storage, preventing us from returning an accurate result.
55+
// - no_matching_metadata: we were able to list an sift through the bucket, but
56+
// no cluster metadata manifest exists.
57+
ss::future<cluster_manifest_result> download_highest_manifest_in_bucket(
58+
cloud_storage::remote& remote,
59+
const cloud_storage_clients::bucket_name& bucket,
60+
retry_chain_node& retry_node);
61+
5162
} // namespace cluster::cloud_metadata

src/v/cluster/cloud_metadata/tests/manifest_downloads_test.cc

+55
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,58 @@ FIXTURE_TEST(test_download_manifest, cluster_metadata_fixture) {
9595
BOOST_CHECK(m_res.has_error());
9696
BOOST_CHECK_EQUAL(error_outcome::list_failed, m_res.error());
9797
}
98+
99+
FIXTURE_TEST(
100+
test_download_highest_manifest_in_bucket, cluster_metadata_fixture) {
101+
retry_chain_node retry_node(
102+
never_abort, ss::lowres_clock::time_point::max(), 10ms);
103+
auto m_res
104+
= download_highest_manifest_in_bucket(remote, bucket, retry_node).get();
105+
BOOST_REQUIRE(m_res.has_error());
106+
BOOST_REQUIRE_EQUAL(m_res.error(), error_outcome::no_matching_metadata);
107+
108+
cluster_metadata_manifest manifest;
109+
manifest.cluster_uuid = cluster_uuid;
110+
manifest.metadata_id = cluster_metadata_id(10);
111+
remote
112+
.upload_manifest(
113+
cloud_storage_clients::bucket_name("test-bucket"), manifest, retry_node)
114+
.get();
115+
116+
m_res
117+
= download_highest_manifest_in_bucket(remote, bucket, retry_node).get();
118+
BOOST_REQUIRE(m_res.has_value());
119+
BOOST_CHECK_EQUAL(cluster_uuid, m_res.value().cluster_uuid);
120+
BOOST_CHECK_EQUAL(10, m_res.value().metadata_id());
121+
122+
auto new_uuid = model::cluster_uuid(uuid_t::create());
123+
manifest.cluster_uuid = new_uuid;
124+
manifest.metadata_id = cluster_metadata_id(15);
125+
126+
// Upload a new manifest with a higher metadata ID for a new cluster.
127+
remote
128+
.upload_manifest(
129+
cloud_storage_clients::bucket_name("test-bucket"), manifest, retry_node)
130+
.get();
131+
m_res
132+
= download_highest_manifest_in_bucket(remote, bucket, retry_node).get();
133+
BOOST_REQUIRE(m_res.has_value());
134+
BOOST_REQUIRE_EQUAL(15, m_res.value().metadata_id());
135+
BOOST_REQUIRE_EQUAL(new_uuid, m_res.value().cluster_uuid);
136+
137+
// Sanity check that searching by the cluster UUIDs return the expected
138+
// manifests.
139+
m_res = download_highest_manifest_for_cluster(
140+
remote, cluster_uuid, bucket, retry_node)
141+
.get();
142+
BOOST_REQUIRE(m_res.has_value());
143+
BOOST_REQUIRE_EQUAL(10, m_res.value().metadata_id());
144+
BOOST_REQUIRE_EQUAL(cluster_uuid, m_res.value().cluster_uuid);
145+
146+
m_res = download_highest_manifest_for_cluster(
147+
remote, new_uuid, bucket, retry_node)
148+
.get();
149+
BOOST_REQUIRE(m_res.has_value());
150+
BOOST_REQUIRE_EQUAL(15, m_res.value().metadata_id());
151+
BOOST_REQUIRE_EQUAL(new_uuid, m_res.value().cluster_uuid);
152+
}

src/v/cluster/cloud_metadata/tests/uploader_test.cc

+52-31
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020
#include "cluster/config_frontend.h"
2121
#include "cluster/controller_snapshot.h"
2222
#include "cluster/types.h"
23+
#include "config/configuration.h"
2324
#include "model/fundamental.h"
2425
#include "redpanda/application.h"
2526
#include "redpanda/tests/fixture.h"
2627
#include "storage/snapshot.h"
2728
#include "test_utils/async.h"
29+
#include "test_utils/scoped_config.h"
2830

2931
#include <seastar/core/io_priority_class.hh>
3032
#include <seastar/core/lowres_clock.hh>
@@ -50,9 +52,9 @@ class cluster_metadata_uploader_fixture
5052
, bucket(cloud_storage_clients::bucket_name("test-bucket")) {
5153
set_expectations_and_listen({});
5254
wait_for_controller_leadership().get();
53-
tests::cooperative_spin_wait_with_timeout(5s, [this] {
55+
RPTEST_REQUIRE_EVENTUALLY(5s, [this] {
5456
return app.storage.local().get_cluster_uuid().has_value();
55-
}).get();
57+
});
5658
cluster_uuid = app.storage.local().get_cluster_uuid().value();
5759
}
5860

@@ -66,9 +68,23 @@ class cluster_metadata_uploader_fixture
6668
auto m_res = co_await download_highest_manifest_for_cluster(
6769
remote, cluster_uuid, bucket, retry_node);
6870
if (!m_res.has_value()) {
71+
vlog(
72+
logger.debug,
73+
"Current manifest has id {}, waiting for > {}",
74+
-1,
75+
initial_meta_id);
6976
co_return false;
7077
}
7178
if (m_res.value().metadata_id <= initial_meta_id) {
79+
vlog(
80+
logger.debug,
81+
"Current manifest has id {}, waiting for > {}",
82+
m_res.value(),
83+
initial_meta_id);
84+
co_return false;
85+
}
86+
if (m_res.value().controller_snapshot_path.empty()) {
87+
vlog(logger.debug, "Missing controller snapshot");
7288
co_return false;
7389
}
7490
*downloaded_manifest = std::move(m_res.value());
@@ -106,6 +122,7 @@ class cluster_metadata_uploader_fixture
106122
}
107123

108124
protected:
125+
scoped_config test_local_cfg;
109126
cluster::consensus_ptr raft0;
110127
cluster::controller_stm& controller_stm;
111128
cloud_storage::remote& remote;
@@ -115,8 +132,7 @@ class cluster_metadata_uploader_fixture
115132

116133
FIXTURE_TEST(
117134
test_download_highest_manifest, cluster_metadata_uploader_fixture) {
118-
cluster::cloud_metadata::uploader uploader(
119-
cluster_uuid, bucket, remote, raft0);
135+
auto& uploader = app.controller->metadata_uploader();
120136
retry_chain_node retry_node(
121137
never_abort, ss::lowres_clock::time_point::max(), 10ms);
122138

@@ -155,8 +171,7 @@ FIXTURE_TEST(
155171

156172
FIXTURE_TEST(
157173
test_download_highest_manifest_errors, cluster_metadata_uploader_fixture) {
158-
cluster::cloud_metadata::uploader uploader(
159-
cluster_uuid, bucket, remote, raft0);
174+
auto& uploader = app.controller->metadata_uploader();
160175
retry_chain_node retry_node(
161176
never_abort, ss::lowres_clock::time_point::min(), 10ms);
162177
auto down_res
@@ -166,8 +181,7 @@ FIXTURE_TEST(
166181
}
167182

168183
FIXTURE_TEST(test_upload_next_metadata, cluster_metadata_uploader_fixture) {
169-
cluster::cloud_metadata::uploader uploader(
170-
cluster_uuid, bucket, remote, raft0);
184+
auto& uploader = app.controller->metadata_uploader();
171185
retry_chain_node retry_node(
172186
never_abort, ss::lowres_clock::time_point::max(), 10ms);
173187
RPTEST_REQUIRE_EVENTUALLY(5s, [this] { return raft0->is_leader(); });
@@ -264,10 +278,9 @@ FIXTURE_TEST(test_upload_in_term, cluster_metadata_uploader_fixture) {
264278
};
265279
const auto snap_offset = get_local_snap_offset();
266280

267-
config::shard_local_cfg()
268-
.cloud_storage_cluster_metadata_upload_interval_ms.set_value(1000ms);
269-
cluster::cloud_metadata::uploader uploader(
270-
cluster_uuid, bucket, remote, raft0);
281+
test_local_cfg.get("cloud_storage_cluster_metadata_upload_interval_ms")
282+
.set_value(1000ms);
283+
auto& uploader = app.controller->metadata_uploader();
271284
cluster::cloud_metadata::cluster_metadata_id highest_meta_id{0};
272285

273286
// Checks that metadata is uploaded a new term, stepping down in between
@@ -280,13 +293,6 @@ FIXTURE_TEST(test_upload_in_term, cluster_metadata_uploader_fixture) {
280293

281294
// Start uploading in this term.
282295
auto upload_in_term = uploader.upload_until_term_change();
283-
auto defer = ss::defer([&] {
284-
uploader.stop_and_wait().get();
285-
try {
286-
upload_in_term.get();
287-
} catch (...) {
288-
}
289-
});
290296

291297
// Keep checking the latest manifest for whether the metadata ID is
292298
// some non-zero value (indicating we've uploaded multiple manifests);
@@ -305,7 +311,6 @@ FIXTURE_TEST(test_upload_in_term, cluster_metadata_uploader_fixture) {
305311
// Stop the upload loop and continue in a new term.
306312
raft0->step_down("forced stepdown").get();
307313
upload_in_term.get();
308-
defer.cancel();
309314
};
310315
for (int i = 0; i < 3; ++i) {
311316
check_uploads_in_term_and_stepdown(snap_offset);
@@ -333,20 +338,12 @@ FIXTURE_TEST(
333338
// Write a snapshot and begin the upload loop.
334339
RPTEST_REQUIRE_EVENTUALLY(
335340
5s, [this] { return controller_stm.maybe_write_snapshot(); });
336-
config::shard_local_cfg()
337-
.cloud_storage_cluster_metadata_upload_interval_ms.set_value(1000ms);
338-
cluster::cloud_metadata::uploader uploader(
339-
cluster_uuid, bucket, remote, raft0);
341+
test_local_cfg.get("cloud_storage_cluster_metadata_upload_interval_ms")
342+
.set_value(1000ms);
343+
auto& uploader = app.controller->metadata_uploader();
340344
RPTEST_REQUIRE_EVENTUALLY(5s, [this] { return raft0->is_leader(); });
341345

342346
auto upload_in_term = uploader.upload_until_term_change();
343-
auto defer = ss::defer([&] {
344-
uploader.stop_and_wait().get();
345-
try {
346-
upload_in_term.get();
347-
} catch (...) {
348-
}
349-
});
350347
// Wait for some valid metadata to show up.
351348
cluster::cloud_metadata::cluster_metadata_manifest manifest;
352349
RPTEST_REQUIRE_EVENTUALLY(5s, [this, &manifest] {
@@ -380,3 +377,27 @@ FIXTURE_TEST(
380377
return num_deletes >= 2;
381378
});
382379
}
380+
381+
FIXTURE_TEST(test_run_loop, cluster_metadata_uploader_fixture) {
382+
RPTEST_REQUIRE_EVENTUALLY(
383+
5s, [this] { return controller_stm.maybe_write_snapshot(); });
384+
test_local_cfg.get("cloud_storage_cluster_metadata_upload_interval_ms")
385+
.set_value(1000ms);
386+
auto& uploader = app.controller->metadata_uploader();
387+
// Run the upload loop and make sure that new leaders continue to upload.
388+
uploader.start();
389+
cluster::cloud_metadata::cluster_metadata_id highest_meta_id{-1};
390+
for (int i = 0; i < 3; i++) {
391+
auto initial_meta_id = highest_meta_id;
392+
cluster::cloud_metadata::cluster_metadata_manifest manifest;
393+
RPTEST_REQUIRE_EVENTUALLY(10s, [&]() -> ss::future<bool> {
394+
return downloaded_manifest_has_higher_id(
395+
initial_meta_id, &manifest);
396+
});
397+
BOOST_REQUIRE_GT(manifest.metadata_id, highest_meta_id);
398+
highest_meta_id = manifest.metadata_id;
399+
400+
// Stop the upload loop and continue in a new term.
401+
raft0->step_down("forced stepdown").get();
402+
}
403+
}

0 commit comments

Comments
 (0)