Skip to content

Commit

Permalink
cloud_storage: add partition manifest downloader
Browse files Browse the repository at this point in the history
There are various calls to download partition manifests (e.g. read replicas,
partition recovery, Raft snapshot recovery) where the application
doesn't necessarily care about the format or path of the manifest. These
callers typically use remote::try_download_partition_manifest(), which
checks for both the binary and JSON manifest.

Since the naming scheme of objects is changing, this introduces a
downloader that performs this same operation, but also takes into
account the naming scheme.

A later commit will plug this into the various users to replace
try_download_partition_manifest().

I considered moving this logic into the remote, but generally the remote
seems to have grown crowded with business logic. Naming scheme seems
like it should fall outside of the remote's concern, considering most
other methods in remote rely on callers to pass a path.
  • Loading branch information
andrwng committed Jun 27, 2024
1 parent 9663701 commit add60ce
Show file tree
Hide file tree
Showing 5 changed files with 355 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/v/cloud_storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ v_cc_library(
cache_probe.cc
download_exception.cc
partition_manifest.cc
partition_manifest_downloader.cc
partition_path_utils.cc
recursive_directory_walker.cc
remote.cc
Expand Down
67 changes: 67 additions & 0 deletions src/v/cloud_storage/partition_manifest_downloader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
#include "cloud_storage/partition_manifest_downloader.h"

#include "cloud_storage/types.h"

namespace cloud_storage {

partition_manifest_downloader::partition_manifest_downloader(
const cloud_storage_clients::bucket_name bucket,
const remote_path_provider& path_provider,
const model::ntp& ntp,
model::initial_revision_id rev,
remote& remote)
: bucket_(bucket)
, remote_path_provider_(path_provider)
, ntp_(ntp)
, rev_(rev)
, remote_(remote) {}

ss::future<result<find_partition_manifest_outcome, error_outcome>>
partition_manifest_downloader::download_manifest(
retry_chain_node& parent_retry,
ss::lowres_clock::time_point deadline,
model::timestamp_clock::duration backoff,
partition_manifest* manifest) {
retry_chain_node retry_node(deadline, backoff, &parent_retry);
co_return co_await download_manifest(retry_node, manifest);
}

ss::future<result<find_partition_manifest_outcome, error_outcome>>
partition_manifest_downloader::download_manifest(
retry_chain_node& retry_node, partition_manifest* manifest) {
auto bin_path = remote_manifest_path{
remote_path_provider_.partition_manifest_path(ntp_, rev_)};
auto bin_res = co_await remote_.download_manifest_bin(
bucket_, bin_path, *manifest, retry_node);
if (bin_res == download_result::success) {
co_return find_partition_manifest_outcome::success;
}
if (bin_res != download_result::notfound) {
co_return error_outcome::manifest_download_error;
}
auto json_str = remote_path_provider_.partition_manifest_path_json(
ntp_, rev_);
if (!json_str.has_value()) {
co_return find_partition_manifest_outcome::no_matching_manifest;
}
auto json_path = remote_manifest_path{*json_str};
auto json_res = co_await remote_.download_manifest_json(
bucket_, json_path, *manifest, retry_node);
if (json_res == download_result::success) {
co_return find_partition_manifest_outcome::success;
}
if (json_res == download_result::notfound) {
co_return find_partition_manifest_outcome::no_matching_manifest;
}
co_return error_outcome::manifest_download_error;
}

} // namespace cloud_storage
65 changes: 65 additions & 0 deletions src/v/cloud_storage/partition_manifest_downloader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
#pragma once

#include "base/outcome.h"
#include "cloud_storage/partition_manifest.h"
#include "cloud_storage/remote.h"
#include "cloud_storage/remote_label.h"
#include "cloud_storage/remote_path_provider.h"
#include "model/fundamental.h"

namespace cloud_storage {

enum class find_partition_manifest_outcome {
success = 0,

// There is no partition manifest for the given partition.
no_matching_manifest,
};

// Encapsulates downloading manifests for a given partition.
//
// Partition manifests have gone through a few format/naming schemes:
// - cluster-uuid-labeled, binary format
// - hash-prefixed, binary format
// - hash-prefixed, JSON format
//
// The handling of naming scheme evolution is handled by the path provider, and
// this class focuses primarily with deserializing the manifests from the
// correct format.
class partition_manifest_downloader {
public:
partition_manifest_downloader(
const cloud_storage_clients::bucket_name bucket,
const remote_path_provider& path_provider,
const model::ntp& ntp,
model::initial_revision_id rev,
remote& remote);

// Attempts to download the partition manifest, transparently checking for
// both the binary and JSON format (if supported by the path provider).
ss::future<result<find_partition_manifest_outcome, error_outcome>>
download_manifest(
retry_chain_node& parent_retry,
ss::lowres_clock::time_point deadline,
model::timestamp_clock::duration backoff,
partition_manifest*);
ss::future<result<find_partition_manifest_outcome, error_outcome>>
download_manifest(retry_chain_node& retry_node, partition_manifest*);

private:
const cloud_storage_clients::bucket_name bucket_;
const remote_path_provider& remote_path_provider_;
const model::ntp ntp_;
const model::initial_revision_id rev_;
remote& remote_;
};

} // namespace cloud_storage
1 change: 1 addition & 0 deletions src/v/cloud_storage/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ rp_test(
TIMEOUT 1000
BINARY_NAME gtest_cloud_storage
SOURCES
partition_manifest_downloader_test.cc
remote_path_provider_test.cc
remote_test.cc
s3_imposter.cc
Expand Down
221 changes: 221 additions & 0 deletions src/v/cloud_storage/tests/partition_manifest_downloader_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "bytes/iostream.h"
#include "bytes/streambuf.h"
#include "cloud_storage/partition_manifest.h"
#include "cloud_storage/partition_manifest_downloader.h"
#include "cloud_storage/partition_path_utils.h"
#include "cloud_storage/remote.h"
#include "cloud_storage/tests/s3_imposter.h"
#include "cloud_storage_clients/client_pool.h"
#include "model/fundamental.h"

#include <seastar/core/lowres_clock.hh>

#include <gtest/gtest.h>

using namespace cloud_storage;
using namespace std::chrono_literals;

namespace {

ss::abort_source never_abort{};

constexpr model::cloud_credentials_source config_file{
model::cloud_credentials_source::config_file};

const ss::sstring test_uuid_str = "deadbeef-0000-0000-0000-000000000000";
const model::cluster_uuid test_uuid{uuid_t::from_string(test_uuid_str)};
const remote_label test_label{test_uuid};
const model::ntp test_ntp{
model::ns{"test-ns"}, model::topic{"test-topic"}, model::partition_id{42}};
const model::initial_revision_id test_rev{0};

constexpr std::string_view empty_manifest_json = R"json({
"version": 1,
"namespace": "test-ns",
"topic": "test-topic",
"partition": 42,
"revision": 0,
"insync_offset": 0,
"last_offset": 0
})json";

partition_manifest dummy_partition_manifest() {
auto json_stream = make_iobuf_input_stream(
iobuf::from(empty_manifest_json));
partition_manifest pm;
pm.update(manifest_format::json, std::move(json_stream)).get();
return pm;
}
} // namespace

class PartitionManifestDownloaderTest
: public ::testing::Test
, public s3_imposter_fixture {
public:
void SetUp() override {
pool_.start(10, ss::sharded_parameter([this] { return conf; })).get();
remote_
.start(
std::ref(pool_),
ss::sharded_parameter([this] { return conf; }),
ss::sharded_parameter([] { return config_file; }))
.get();
// Tests will use the remote API, no hard coded responses.
set_expectations_and_listen({});
}

void TearDown() override {
pool_.local().shutdown_connections();
remote_.stop().get();
pool_.stop().get();
}

void upload_json_manifest(const partition_manifest& pm) {
retry_chain_node retry(never_abort, 1s, 10ms);
iobuf buf;
iobuf_ostreambuf obuf(buf);
std::ostream os(&obuf);
pm.serialize_json(os);
upload_request json_req{
.transfer_details = transfer_details{
.bucket = bucket_name,
.key = cloud_storage_clients::object_key{prefixed_partition_manifest_json_path(test_ntp, test_rev)},
.parent_rtc = retry,
},
.type = cloud_storage::upload_type::manifest,
.payload = std::move(buf),
};
auto upload_res
= remote_.local().upload_object(std::move(json_req)).get();
ASSERT_EQ(upload_result::success, upload_res);
}

protected:
ss::sharded<cloud_storage_clients::client_pool> pool_;
ss::sharded<remote> remote_;
};

TEST_F(PartitionManifestDownloaderTest, TestDownloadLabeledManifest) {
auto pm = dummy_partition_manifest();
auto labeled_path = labeled_partition_manifest_path(
test_label, test_ntp, test_rev);
retry_chain_node retry(never_abort, 1s, 10ms);
auto upload_res
= remote_.local()
.upload_manifest(
bucket_name, pm, remote_manifest_path{labeled_path}, retry)
.get();
ASSERT_EQ(upload_result::success, upload_res);

{
remote_path_provider path_provider(test_label);
partition_manifest_downloader dl(
bucket_name, path_provider, test_ntp, test_rev, remote_.local());
partition_manifest dl_pm;
auto dl_res = dl.download_manifest(retry, &dl_pm).get();
ASSERT_FALSE(dl_res.has_error());
ASSERT_EQ(dl_res.value(), find_partition_manifest_outcome::success);
ASSERT_EQ(pm, dl_pm);
}
{
// The downloader can only look for what has been allowed by the path
// provider, i.e. only those without any label.
remote_path_provider path_provider(std::nullopt);
partition_manifest_downloader dl(
bucket_name, path_provider, test_ntp, test_rev, remote_.local());
partition_manifest dl_pm;
auto dl_res = dl.download_manifest(retry, &dl_pm).get();
ASSERT_FALSE(dl_res.has_error());
ASSERT_EQ(
dl_res.value(),
find_partition_manifest_outcome::no_matching_manifest);
}
}

TEST_F(PartitionManifestDownloaderTest, TestDownloadPrefixedManifest) {
auto pm = dummy_partition_manifest();
auto hash_path = prefixed_partition_manifest_bin_path(test_ntp, test_rev);
retry_chain_node retry(never_abort, 1s, 10ms);
auto upload_res
= remote_.local()
.upload_manifest(
bucket_name, pm, remote_manifest_path{hash_path}, retry)
.get();
ASSERT_EQ(upload_result::success, upload_res);

{
remote_path_provider path_provider(std::nullopt);
partition_manifest_downloader dl(
bucket_name, path_provider, test_ntp, test_rev, remote_.local());
partition_manifest dl_pm;
auto dl_res = dl.download_manifest(retry, &dl_pm).get();
ASSERT_FALSE(dl_res.has_error());
ASSERT_EQ(dl_res.value(), find_partition_manifest_outcome::success);
ASSERT_EQ(pm, dl_pm);
}
{
// The downloader can only look for what has been allowed by the path
// provider, i.e. only those with the supplied remote label.
remote_path_provider path_provider(test_label);
partition_manifest_downloader dl(
bucket_name, path_provider, test_ntp, test_rev, remote_.local());
partition_manifest dl_pm;
auto dl_res = dl.download_manifest(retry, &dl_pm).get();
ASSERT_FALSE(dl_res.has_error());
ASSERT_EQ(
dl_res.value(),
find_partition_manifest_outcome::no_matching_manifest);
}
}

TEST_F(PartitionManifestDownloaderTest, TestDownloadJsonManifest) {
auto pm = dummy_partition_manifest();
ASSERT_NO_FATAL_FAILURE(upload_json_manifest(pm));
retry_chain_node retry(never_abort, 1s, 10ms);

remote_path_provider path_provider(std::nullopt);
partition_manifest_downloader dl(
bucket_name, path_provider, test_ntp, test_rev, remote_.local());
partition_manifest dl_pm;
auto dl_res = dl.download_manifest(retry, &dl_pm).get();
ASSERT_FALSE(dl_res.has_error());
ASSERT_EQ(dl_res.value(), find_partition_manifest_outcome::success);
ASSERT_EQ(pm, dl_pm);

ASSERT_FALSE(get_requests().empty());
const auto& last_req = get_requests().back();
EXPECT_STREQ(last_req.method.c_str(), "GET");
EXPECT_STREQ(
last_req.url.c_str(),
"/20000000/meta/test-ns/test-topic/42_0/manifest.json");

// Once there is both a binary and a JSON manifest, the binary one
// should be preferred.
auto bin_path = prefixed_partition_manifest_bin_path(test_ntp, test_rev);
auto bin_res = remote_.local()
.upload_manifest(
bucket_name, pm, remote_manifest_path{bin_path}, retry)
.get();
ASSERT_EQ(bin_res, upload_result::success);

dl_res = dl.download_manifest(retry, &dl_pm).get();
ASSERT_FALSE(dl_res.has_error());
ASSERT_EQ(dl_res.value(), find_partition_manifest_outcome::success);
ASSERT_EQ(pm, dl_pm);

ASSERT_FALSE(get_requests().empty());
const auto& new_last_req = get_requests().back();
EXPECT_STREQ(new_last_req.method.c_str(), "GET");
EXPECT_STREQ(
new_last_req.url.c_str(),
"/20000000/meta/test-ns/test-topic/42_0/manifest.bin");
}

0 comments on commit add60ce

Please sign in to comment.