From c5f5f80e73b85a795c07bf8cf5e098931aedcf7f Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Tue, 18 Jun 2024 14:01:40 -0700 Subject: [PATCH] cloud_storage: add manifest exists to downloader Topic recovery validation checks if partition manifests exist, and use a method that's built into the cloud_storage::remote. In the effort to change our object naming scheme, this means this needs to be updated to support the new naming. Rather than adding more business logic into the remote, this adds a method to the new partition_manifest_downloader to check if a given partition's manifest exists. A later commit will plumb this into topic recovery validation. --- .../partition_manifest_downloader.cc | 36 ++++++ .../partition_manifest_downloader.h | 3 + .../partition_manifest_downloader_test.cc | 117 +++++++++++++++--- 3 files changed, 137 insertions(+), 19 deletions(-) diff --git a/src/v/cloud_storage/partition_manifest_downloader.cc b/src/v/cloud_storage/partition_manifest_downloader.cc index 77d5dab0e642..7096c8155182 100644 --- a/src/v/cloud_storage/partition_manifest_downloader.cc +++ b/src/v/cloud_storage/partition_manifest_downloader.cc @@ -64,4 +64,40 @@ partition_manifest_downloader::download_manifest( co_return error_outcome::manifest_download_error; } +ss::future> +partition_manifest_downloader::manifest_exists(retry_chain_node& retry_node) { + const auto bin_path = remote_manifest_path{ + remote_path_provider_.partition_manifest_path(ntp_, rev_)}; + auto bin_res = co_await remote_.object_exists( + bucket_, + cloud_storage_clients::object_key{bin_path}, + retry_node, + existence_check_type::manifest); + if (bin_res == download_result::success) { + co_return find_partition_manifest_outcome::success; + } + if (bin_res != download_result::notfound) { + co_return error_outcome::manifest_download_error; + } + + const auto json_path_str + = remote_path_provider_.partition_manifest_path_json(ntp_, rev_); + if (!json_path_str.has_value()) { + co_return find_partition_manifest_outcome::no_matching_manifest; + } + const auto json_path = remote_manifest_path{json_path_str.value()}; + auto json_res = co_await remote_.object_exists( + bucket_, + cloud_storage_clients::object_key{json_path}, + retry_node, + existence_check_type::manifest); + if (json_res == download_result::success) { + co_return find_partition_manifest_outcome::success; + } + if (json_res != download_result::notfound) { + co_return error_outcome::manifest_download_error; + } + co_return find_partition_manifest_outcome::no_matching_manifest; +} + } // namespace cloud_storage diff --git a/src/v/cloud_storage/partition_manifest_downloader.h b/src/v/cloud_storage/partition_manifest_downloader.h index 5c7c40aa4b47..5bf06e6ac911 100644 --- a/src/v/cloud_storage/partition_manifest_downloader.h +++ b/src/v/cloud_storage/partition_manifest_downloader.h @@ -54,6 +54,9 @@ class partition_manifest_downloader { ss::future> download_manifest(retry_chain_node& retry_node, partition_manifest*); + ss::future> + manifest_exists(retry_chain_node& retry_node); + private: const cloud_storage_clients::bucket_name bucket_; const remote_path_provider& remote_path_provider_; diff --git a/src/v/cloud_storage/tests/partition_manifest_downloader_test.cc b/src/v/cloud_storage/tests/partition_manifest_downloader_test.cc index 58d11eea71bd..dc57764964e1 100644 --- a/src/v/cloud_storage/tests/partition_manifest_downloader_test.cc +++ b/src/v/cloud_storage/tests/partition_manifest_downloader_test.cc @@ -79,7 +79,31 @@ class PartitionManifestDownloaderTest pool_.stop().get(); } - void upload_json_manifest(const partition_manifest& pm) { + void upload_labeled_bin_manifest(const partition_manifest& pm) { + retry_chain_node retry(never_abort, 1s, 10ms); + auto labeled_path = labeled_partition_manifest_path( + test_label, test_ntp, test_rev); + auto upload_res + = remote_.local() + .upload_manifest( + bucket_name, pm, remote_manifest_path{labeled_path}, retry) + .get(); + ASSERT_EQ(upload_result::success, upload_res); + } + + void upload_prefixed_bin_manifest(const partition_manifest& pm) { + retry_chain_node retry(never_abort, 1s, 10ms); + auto hash_path = prefixed_partition_manifest_bin_path( + test_ntp, test_rev); + auto upload_res + = remote_.local() + .upload_manifest( + bucket_name, pm, remote_manifest_path{hash_path}, retry) + .get(); + ASSERT_EQ(upload_result::success, upload_res); + } + + void upload_prefixed_json_manifest(const partition_manifest& pm) { retry_chain_node retry(never_abort, 1s, 10ms); iobuf buf; iobuf_ostreambuf obuf(buf); @@ -106,16 +130,8 @@ class PartitionManifestDownloaderTest TEST_F(PartitionManifestDownloaderTest, TestDownloadLabeledManifest) { auto pm = dummy_partition_manifest(); - auto labeled_path = labeled_partition_manifest_path( - test_label, test_ntp, test_rev); + ASSERT_NO_FATAL_FAILURE(upload_labeled_bin_manifest(pm)); retry_chain_node retry(never_abort, 1s, 10ms); - auto upload_res - = remote_.local() - .upload_manifest( - bucket_name, pm, remote_manifest_path{labeled_path}, retry) - .get(); - ASSERT_EQ(upload_result::success, upload_res); - { remote_path_provider path_provider(test_label); partition_manifest_downloader dl( @@ -143,15 +159,8 @@ TEST_F(PartitionManifestDownloaderTest, TestDownloadLabeledManifest) { TEST_F(PartitionManifestDownloaderTest, TestDownloadPrefixedManifest) { auto pm = dummy_partition_manifest(); - auto hash_path = prefixed_partition_manifest_bin_path(test_ntp, test_rev); + ASSERT_NO_FATAL_FAILURE(upload_prefixed_bin_manifest(pm)); retry_chain_node retry(never_abort, 1s, 10ms); - auto upload_res - = remote_.local() - .upload_manifest( - bucket_name, pm, remote_manifest_path{hash_path}, retry) - .get(); - ASSERT_EQ(upload_result::success, upload_res); - { remote_path_provider path_provider(std::nullopt); partition_manifest_downloader dl( @@ -179,7 +188,7 @@ TEST_F(PartitionManifestDownloaderTest, TestDownloadPrefixedManifest) { TEST_F(PartitionManifestDownloaderTest, TestDownloadJsonManifest) { auto pm = dummy_partition_manifest(); - ASSERT_NO_FATAL_FAILURE(upload_json_manifest(pm)); + ASSERT_NO_FATAL_FAILURE(upload_prefixed_json_manifest(pm)); retry_chain_node retry(never_abort, 1s, 10ms); remote_path_provider path_provider(std::nullopt); @@ -219,3 +228,73 @@ TEST_F(PartitionManifestDownloaderTest, TestDownloadJsonManifest) { new_last_req.url.c_str(), "/20000000/meta/test-ns/test-topic/42_0/manifest.bin"); } + +TEST_F(PartitionManifestDownloaderTest, TestLabeledManifestExists) { + remote_path_provider path_provider(test_label); + partition_manifest_downloader dl( + bucket_name, path_provider, test_ntp, test_rev, remote_.local()); + + // Upload both a binary and JSON manifest with the hash prefix scheme. + auto pm = dummy_partition_manifest(); + ASSERT_NO_FATAL_FAILURE(upload_prefixed_bin_manifest(pm)); + ASSERT_NO_FATAL_FAILURE(upload_prefixed_json_manifest(pm)); + + // Since the path provider is expecting a labeled manifest, it will not see + // either hash-prefixed manifest. + retry_chain_node retry(never_abort, 1s, 10ms); + auto exists_res = dl.manifest_exists(retry).get(); + ASSERT_FALSE(exists_res.has_error()); + ASSERT_EQ( + exists_res.value(), + find_partition_manifest_outcome::no_matching_manifest); + + // Once a labeled manifest exists, we're good. + ASSERT_NO_FATAL_FAILURE(upload_labeled_bin_manifest(pm)); + exists_res = dl.manifest_exists(retry).get(); + ASSERT_FALSE(exists_res.has_error()); + ASSERT_EQ(exists_res.value(), find_partition_manifest_outcome::success); +} + +TEST_F(PartitionManifestDownloaderTest, TestPrefixedJSONManifestExists) { + remote_path_provider path_provider(std::nullopt); + partition_manifest_downloader dl( + bucket_name, path_provider, test_ntp, test_rev, remote_.local()); + auto pm = dummy_partition_manifest(); + ASSERT_NO_FATAL_FAILURE(upload_labeled_bin_manifest(pm)); + + // With the hash-prefixed path provider, we won't find labeled manifests. + retry_chain_node retry(never_abort, 1s, 10ms); + auto exists_res = dl.manifest_exists(retry).get(); + ASSERT_FALSE(exists_res.has_error()); + ASSERT_EQ( + exists_res.value(), + find_partition_manifest_outcome::no_matching_manifest); + + // Once a JSON manifest exists, we're good. + ASSERT_NO_FATAL_FAILURE(upload_prefixed_json_manifest(pm)); + exists_res = dl.manifest_exists(retry).get(); + ASSERT_FALSE(exists_res.has_error()); + ASSERT_EQ(exists_res.value(), find_partition_manifest_outcome::success); +} + +TEST_F(PartitionManifestDownloaderTest, TestPrefixedBinaryManifestExists) { + remote_path_provider path_provider(std::nullopt); + partition_manifest_downloader dl( + bucket_name, path_provider, test_ntp, test_rev, remote_.local()); + auto pm = dummy_partition_manifest(); + ASSERT_NO_FATAL_FAILURE(upload_labeled_bin_manifest(pm)); + + // With the hash-prefixed path provider, we won't find labeled manifests. + retry_chain_node retry(never_abort, 1s, 10ms); + auto exists_res = dl.manifest_exists(retry).get(); + ASSERT_FALSE(exists_res.has_error()); + ASSERT_EQ( + exists_res.value(), + find_partition_manifest_outcome::no_matching_manifest); + + // Once a hash-prefixed binary manifest exists, we're good. + ASSERT_NO_FATAL_FAILURE(upload_prefixed_bin_manifest(pm)); + exists_res = dl.manifest_exists(retry).get(); + ASSERT_FALSE(exists_res.has_error()); + ASSERT_EQ(exists_res.value(), find_partition_manifest_outcome::success); +}