Skip to content

Commit

Permalink
cloud_storage: add manifest exists to downloader
Browse files Browse the repository at this point in the history
Topic recovery validation checks if partition manifests exist, and use a
method that's built into the cloud_storage::remote. In the effort to
change our object naming scheme, this means this needs to be updated to
support the new naming.

Rather than adding more business logic into the remote, this adds a
method to the new partition_manifest_downloader to check if a given
partition's manifest exists.

A later commit will plumb this into topic recovery validation.
  • Loading branch information
andrwng committed Jun 27, 2024
1 parent add60ce commit aa96693
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 19 deletions.
36 changes: 36 additions & 0 deletions src/v/cloud_storage/partition_manifest_downloader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,40 @@ partition_manifest_downloader::download_manifest(
co_return error_outcome::manifest_download_error;
}

ss::future<result<find_partition_manifest_outcome, error_outcome>>
partition_manifest_downloader::manifest_exists(retry_chain_node& retry_node) {
const auto bin_path = remote_manifest_path{
remote_path_provider_.partition_manifest_path(ntp_, rev_)};
auto bin_res = co_await remote_.object_exists(
bucket_,
cloud_storage_clients::object_key{bin_path},
retry_node,
existence_check_type::manifest);
if (bin_res == download_result::success) {
co_return find_partition_manifest_outcome::success;
}
if (bin_res != download_result::notfound) {
co_return error_outcome::manifest_download_error;
}

const auto json_path_str
= remote_path_provider_.partition_manifest_path_json(ntp_, rev_);
if (!json_path_str.has_value()) {
co_return find_partition_manifest_outcome::no_matching_manifest;
}
const auto json_path = remote_manifest_path{json_path_str.value()};
auto json_res = co_await remote_.object_exists(
bucket_,
cloud_storage_clients::object_key{json_path},
retry_node,
existence_check_type::manifest);
if (json_res == download_result::success) {
co_return find_partition_manifest_outcome::success;
}
if (json_res != download_result::notfound) {
co_return error_outcome::manifest_download_error;
}
co_return find_partition_manifest_outcome::no_matching_manifest;
}

} // namespace cloud_storage
3 changes: 3 additions & 0 deletions src/v/cloud_storage/partition_manifest_downloader.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class partition_manifest_downloader {
ss::future<result<find_partition_manifest_outcome, error_outcome>>
download_manifest(retry_chain_node& retry_node, partition_manifest*);

ss::future<result<find_partition_manifest_outcome, error_outcome>>
manifest_exists(retry_chain_node& retry_node);

private:
const cloud_storage_clients::bucket_name bucket_;
const remote_path_provider& remote_path_provider_;
Expand Down
117 changes: 98 additions & 19 deletions src/v/cloud_storage/tests/partition_manifest_downloader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,31 @@ class PartitionManifestDownloaderTest
pool_.stop().get();
}

void upload_json_manifest(const partition_manifest& pm) {
void upload_labeled_bin_manifest(const partition_manifest& pm) {
retry_chain_node retry(never_abort, 1s, 10ms);
auto labeled_path = labeled_partition_manifest_path(
test_label, test_ntp, test_rev);
auto upload_res
= remote_.local()
.upload_manifest(
bucket_name, pm, remote_manifest_path{labeled_path}, retry)
.get();
ASSERT_EQ(upload_result::success, upload_res);
}

void upload_prefixed_bin_manifest(const partition_manifest& pm) {
retry_chain_node retry(never_abort, 1s, 10ms);
auto hash_path = prefixed_partition_manifest_bin_path(
test_ntp, test_rev);
auto upload_res
= remote_.local()
.upload_manifest(
bucket_name, pm, remote_manifest_path{hash_path}, retry)
.get();
ASSERT_EQ(upload_result::success, upload_res);
}

void upload_prefixed_json_manifest(const partition_manifest& pm) {
retry_chain_node retry(never_abort, 1s, 10ms);
iobuf buf;
iobuf_ostreambuf obuf(buf);
Expand All @@ -106,16 +130,8 @@ class PartitionManifestDownloaderTest

TEST_F(PartitionManifestDownloaderTest, TestDownloadLabeledManifest) {
auto pm = dummy_partition_manifest();
auto labeled_path = labeled_partition_manifest_path(
test_label, test_ntp, test_rev);
ASSERT_NO_FATAL_FAILURE(upload_labeled_bin_manifest(pm));
retry_chain_node retry(never_abort, 1s, 10ms);
auto upload_res
= remote_.local()
.upload_manifest(
bucket_name, pm, remote_manifest_path{labeled_path}, retry)
.get();
ASSERT_EQ(upload_result::success, upload_res);

{
remote_path_provider path_provider(test_label);
partition_manifest_downloader dl(
Expand Down Expand Up @@ -143,15 +159,8 @@ TEST_F(PartitionManifestDownloaderTest, TestDownloadLabeledManifest) {

TEST_F(PartitionManifestDownloaderTest, TestDownloadPrefixedManifest) {
auto pm = dummy_partition_manifest();
auto hash_path = prefixed_partition_manifest_bin_path(test_ntp, test_rev);
ASSERT_NO_FATAL_FAILURE(upload_prefixed_bin_manifest(pm));
retry_chain_node retry(never_abort, 1s, 10ms);
auto upload_res
= remote_.local()
.upload_manifest(
bucket_name, pm, remote_manifest_path{hash_path}, retry)
.get();
ASSERT_EQ(upload_result::success, upload_res);

{
remote_path_provider path_provider(std::nullopt);
partition_manifest_downloader dl(
Expand Down Expand Up @@ -179,7 +188,7 @@ TEST_F(PartitionManifestDownloaderTest, TestDownloadPrefixedManifest) {

TEST_F(PartitionManifestDownloaderTest, TestDownloadJsonManifest) {
auto pm = dummy_partition_manifest();
ASSERT_NO_FATAL_FAILURE(upload_json_manifest(pm));
ASSERT_NO_FATAL_FAILURE(upload_prefixed_json_manifest(pm));
retry_chain_node retry(never_abort, 1s, 10ms);

remote_path_provider path_provider(std::nullopt);
Expand Down Expand Up @@ -219,3 +228,73 @@ TEST_F(PartitionManifestDownloaderTest, TestDownloadJsonManifest) {
new_last_req.url.c_str(),
"/20000000/meta/test-ns/test-topic/42_0/manifest.bin");
}

TEST_F(PartitionManifestDownloaderTest, TestLabeledManifestExists) {
remote_path_provider path_provider(test_label);
partition_manifest_downloader dl(
bucket_name, path_provider, test_ntp, test_rev, remote_.local());

// Upload both a binary and JSON manifest with the hash prefix scheme.
auto pm = dummy_partition_manifest();
ASSERT_NO_FATAL_FAILURE(upload_prefixed_bin_manifest(pm));
ASSERT_NO_FATAL_FAILURE(upload_prefixed_json_manifest(pm));

// Since the path provider is expecting a labeled manifest, it will not see
// either hash-prefixed manifest.
retry_chain_node retry(never_abort, 1s, 10ms);
auto exists_res = dl.manifest_exists(retry).get();
ASSERT_FALSE(exists_res.has_error());
ASSERT_EQ(
exists_res.value(),
find_partition_manifest_outcome::no_matching_manifest);

// Once a labeled manifest exists, we're good.
ASSERT_NO_FATAL_FAILURE(upload_labeled_bin_manifest(pm));
exists_res = dl.manifest_exists(retry).get();
ASSERT_FALSE(exists_res.has_error());
ASSERT_EQ(exists_res.value(), find_partition_manifest_outcome::success);
}

TEST_F(PartitionManifestDownloaderTest, TestPrefixedJSONManifestExists) {
remote_path_provider path_provider(std::nullopt);
partition_manifest_downloader dl(
bucket_name, path_provider, test_ntp, test_rev, remote_.local());
auto pm = dummy_partition_manifest();
ASSERT_NO_FATAL_FAILURE(upload_labeled_bin_manifest(pm));

// With the hash-prefixed path provider, we won't find labeled manifests.
retry_chain_node retry(never_abort, 1s, 10ms);
auto exists_res = dl.manifest_exists(retry).get();
ASSERT_FALSE(exists_res.has_error());
ASSERT_EQ(
exists_res.value(),
find_partition_manifest_outcome::no_matching_manifest);

// Once a JSON manifest exists, we're good.
ASSERT_NO_FATAL_FAILURE(upload_prefixed_json_manifest(pm));
exists_res = dl.manifest_exists(retry).get();
ASSERT_FALSE(exists_res.has_error());
ASSERT_EQ(exists_res.value(), find_partition_manifest_outcome::success);
}

TEST_F(PartitionManifestDownloaderTest, TestPrefixedBinaryManifestExists) {
remote_path_provider path_provider(std::nullopt);
partition_manifest_downloader dl(
bucket_name, path_provider, test_ntp, test_rev, remote_.local());
auto pm = dummy_partition_manifest();
ASSERT_NO_FATAL_FAILURE(upload_labeled_bin_manifest(pm));

// With the hash-prefixed path provider, we won't find labeled manifests.
retry_chain_node retry(never_abort, 1s, 10ms);
auto exists_res = dl.manifest_exists(retry).get();
ASSERT_FALSE(exists_res.has_error());
ASSERT_EQ(
exists_res.value(),
find_partition_manifest_outcome::no_matching_manifest);

// Once a hash-prefixed binary manifest exists, we're good.
ASSERT_NO_FATAL_FAILURE(upload_prefixed_bin_manifest(pm));
exists_res = dl.manifest_exists(retry).get();
ASSERT_FALSE(exists_res.has_error());
ASSERT_EQ(exists_res.value(), find_partition_manifest_outcome::success);
}

0 comments on commit aa96693

Please sign in to comment.