Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/v/cloud_io/remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ int remote::delete_objects_max_keys() const {
// https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch
return 256;
case model::cloud_storage_backend::google_s3_compat:
[[fallthrough]];
return 100;
case model::cloud_storage_backend::unknown:
return 1;
}
Expand Down
126 changes: 118 additions & 8 deletions src/v/cloud_io/tests/s3_imposter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,68 @@ struct s3_imposter_fixture::content_handler {
}

return R"xml(<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/"></DeleteResult>)xml";
} else if (
request._method == "POST" && request._url.contains("batch")) {
vlog(fixt_log.trace, "Received batch request to {}", request._url);
if (
expect_iter != expectations.end()
&& expect_iter->second.body.has_value()) {
return expect_iter->second.body.value();
}
auto keys_to_delete = keys_from_batch_delete_request(ri);
vlog(
fixt_log.trace,
"Parsed batched DELETE request with {} keys",
keys_to_delete.size());

constexpr std::string_view boundary = "response_boundary";

ss::sstring response;
for (size_t i = 0; i < keys_to_delete.size(); ++i) {
const auto& [path, key] = keys_to_delete[i];
// ss::sstring to_delete = fmt::format("/{}", key().string());
auto expect_iter = expectations.find(path);
bool obj_present = expect_iter != expectations.end()
&& expect_iter->second.body.has_value();

if (!obj_present) {
// Missing objects are assumed to be not an error (e.g.
// caused by a delete retry).
vlog(
fixt_log.debug,
"Requested DELETE request of {}, not found",
path);
} else {
vlog(fixt_log.trace, "Batched DELETE request of {}", path);
expect_iter->second.body = std::nullopt;
}

std::string_view code = [&key, obj_present]() {
if (key == "failme") {
return "500 Internal Server Error";
}
return obj_present ? "204 No Content" : "404 Not Found";
}();

response += fmt::format(
"--{}\r\n"
"Content-Type: application/http\r\n"
"Content-ID: response-{}\r\n\r\n"
"HTTP/1.1 {}\r\n"
"X-GUploader-UploadID: test-upload-id-{}\r\n\r\n",
boundary,
i,
code,
i);
}

response += fmt::format("--{}--\r\n", boundary);

repl.set_status(reply::status_type::ok);
repl.set_content_type(
fmt::format("multipart/mixed; boundary={}", boundary));

return response;
} else {
vunreachable("Unhandled request method {}", request._method);
}
Expand Down Expand Up @@ -434,18 +496,25 @@ s3_imposter_fixture::get_targets() const {

void s3_imposter_fixture::set_expectations_and_listen(
std::vector<s3_imposter_fixture::expectation> expectations,
std::optional<absl::flat_hash_set<ss::sstring>> headers_to_store) {
std::optional<absl::flat_hash_set<ss::sstring>> headers_to_store,
std::set<ss::sstring> content_type_overrides) {
const ss::sstring url_prefix = "/" + url_base();
for (auto& expectation : expectations) {
expectation.url.insert(
expectation.url.begin(), url_prefix.begin(), url_prefix.end());
}
_server
->set_routes(
[this, &expectations, headers_to_store = std::move(headers_to_store)](
ss::httpd::routes& r) mutable {
set_routes(r, expectations, std::move(headers_to_store));
})
->set_routes([this,
&expectations,
headers_to_store = std::move(headers_to_store),
ct_overrides = std::move(content_type_overrides)](
ss::httpd::routes& r) mutable {
set_routes(
r,
expectations,
std::move(headers_to_store),
std::move(ct_overrides));
})
.get();
_server->listen(_server_addr).get();
}
Expand Down Expand Up @@ -491,7 +560,8 @@ ss::sstring s3_imposter_fixture::url_base() const {
void s3_imposter_fixture::set_routes(
ss::httpd::routes& r,
const std::vector<s3_imposter_fixture::expectation>& expectations,
std::optional<absl::flat_hash_set<ss::sstring>> headers_to_store) {
std::optional<absl::flat_hash_set<ss::sstring>> headers_to_store,
std::set<ss::sstring> content_type_overrides) {
using namespace ss::httpd;
using reply = ss::http::reply;
_content_handler = ss::make_shared<content_handler>(
Expand All @@ -500,7 +570,8 @@ void s3_imposter_fixture::set_routes(
[this](const_req req, reply& repl, [[maybe_unused]] ss::sstring& type) {
return _content_handler->handle(req, repl);
},
"xml");
"xml",
std::move(content_type_overrides));
r.add_default_handler(_handler.get());
}

Expand Down Expand Up @@ -569,3 +640,42 @@ keys_from_delete_objects_request(const http_test_utils::request_info& req) {

return keys;
}

std::vector<std::pair<ss::sstring, cloud_storage_clients::object_key>>
keys_from_batch_delete_request(const http_test_utils::request_info& req) {
std::vector<std::pair<ss::sstring, cloud_storage_clients::object_key>> keys;
auto buffer_stream = std::istringstream{std::string{req.content}};

// crudely iterate over request lines, stripping out object keys
constexpr std::string_view method{"DELETE "};

std::string line;
while (std::getline(buffer_stream, line)) {
auto pos = line.find(method);
if (pos != 0) {
continue;
}

pos += method.size();

auto ver_pos = line.find('\r');
if (ver_pos == line.npos) {
continue;
}

auto path = std::string_view{line}.substr(pos, ver_pos - pos);
auto last_slash_pos = path.find_last_of('/');
if (last_slash_pos == path.npos) {
continue;
}

auto key_pos = last_slash_pos + 1;
if (key_pos >= path.size()) {
continue;
}

auto key = path.substr(key_pos);
keys.emplace_back(path, cloud_storage_clients::object_key{key});
}
return keys;
}
9 changes: 7 additions & 2 deletions src/v/cloud_io/tests/s3_imposter.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class s3_imposter_fixture {
void set_expectations_and_listen(
std::vector<expectation> expectations,
std::optional<absl::flat_hash_set<ss::sstring>> headers_to_store
= std::nullopt);
= std::nullopt,
std::set<ss::sstring> content_type_overrides = {});

/// Update expectations for the REST API.
void add_expectations(std::vector<expectation> expectations);
Expand Down Expand Up @@ -128,7 +129,8 @@ class s3_imposter_fixture {
ss::httpd::routes& r,
const std::vector<expectation>& expectations,
std::optional<absl::flat_hash_set<ss::sstring>> headers_to_store
= std::nullopt);
= std::nullopt,
std::set<ss::sstring> content_type_overrides = {});

ss::socket_address _server_addr;
ss::shared_ptr<ss::httpd::http_server_control> _server;
Expand Down Expand Up @@ -163,3 +165,6 @@ cloud_storage_clients::http_byte_range parse_byte_header(std::string_view s);

std::vector<cloud_storage_clients::object_key>
keys_from_delete_objects_request(const http_test_utils::request_info&);

std::vector<std::pair<ss::sstring, cloud_storage_clients::object_key>>
keys_from_batch_delete_request(const http_test_utils::request_info&);
59 changes: 34 additions & 25 deletions src/v/cloud_storage/tests/remote_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,12 @@ TEST_P(all_types_remote_fixture, test_delete_objects_failure_handling) {
}

TEST_P(all_types_gcs_remote_fixture, test_delete_objects_on_unknown_backend) {
set_expectations_and_listen({});
set_expectations_and_listen(
{} /* expectations */,
std::nullopt /* headers_to_store */,
{"multipart/mixed; boundary=response_boundary"}
/* content_type_overrides */
);

retry_chain_node fib(never_abort, 60s, 20ms);

Expand Down Expand Up @@ -945,24 +950,28 @@ TEST_P(all_types_gcs_remote_fixture, test_delete_objects_on_unknown_backend) {
= remote.local().delete_objects(bucket_name, to_delete, fib).get();
ASSERT_EQ(cloud_storage::upload_result::success, result);

ASSERT_EQ(get_requests().size(), 4);
auto first_delete = get_requests()[2];
ASSERT_EQ(get_requests().size(), 3);
auto batch_delete = get_requests()[2];

std::unordered_set<ss::sstring> expected_urls{
"/" + url_base() + "p", "/" + url_base() + "q"};
ASSERT_EQ(first_delete.method, "DELETE");
ASSERT_TRUE(expected_urls.contains(first_delete.url));
// TODO(oren): if the url format was the issue here, we should convert them
// somehow in the s3 imposter backend so we can still simulate the lookup
// behavior. otoh, maybe that's just tetsing the fixture
auto json_api_url = [this](std::string_view key) {
return ssx::sformat("/storage/v1/b/{}/o/{}", bucket_name, key);
};

expected_urls.erase(first_delete.url);
auto second_delete = get_requests()[3];
ASSERT_EQ(second_delete.method, "DELETE");
ASSERT_TRUE(expected_urls.contains(second_delete.url));
std::vector<ss::sstring> expected_urls{
json_api_url("p"), json_api_url("q")};
ASSERT_EQ(batch_delete.method, "POST");
ASSERT_TRUE(batch_delete.content.contains(expected_urls[0]));
ASSERT_TRUE(batch_delete.content.contains(expected_urls[1]));
}

TEST_P(
all_types_gcs_remote_fixture,
test_delete_objects_on_unknown_backend_result_reduction) {
set_expectations_and_listen({});
set_expectations_and_listen(
{}, std::nullopt, {"multipart/mixed; boundary=response_boundary"});

retry_chain_node fib(never_abort, 5s, 20ms);

Expand All @@ -981,17 +990,17 @@ TEST_P(
// will time out
cloud_storage_clients::object_key{"failme"}};

// key 'failme' will produce a 500 error on the corresponding subrequest in
// s3_imposter. by design, this should fail the entire 'delete_objects'
// operations, though key 'p' may have been deleted in the process
auto result
= remote.local().delete_objects(bucket_name, to_delete, fib).get();
if (conf.url_style == cloud_storage_clients::s3_url_style::virtual_host) {
// Due to virtual-host style addressing, this will timeout as DNS tries
// to resolve the request with the provided bucket name.
ASSERT_EQ(cloud_storage::upload_result::timedout, result);
} else {
// But, if we have path style addressing, the object won't be found, a
// warning will be issued, and the request will return success instead.
ASSERT_EQ(cloud_storage::upload_result::success, result);
}
ASSERT_EQ(cloud_storage::upload_result::failed, result);

// drop the poison object key
to_delete.pop_back();
result = remote.local().delete_objects(bucket_name, to_delete, fib).get();
ASSERT_EQ(cloud_storage::upload_result::success, result);
}

TEST_P(all_types_remote_fixture, test_filter_by_source) { // NOLINT
Expand Down Expand Up @@ -1038,8 +1047,8 @@ TEST_P(all_types_remote_fixture, test_filter_by_source) { // NOLINT
ASSERT_TRUE(
subscription.get().type == api_activity_type::manifest_download);

// Remove the rtc node from the filter and re-subscribe. This time we should
// receive the notification.
// Remove the rtc node from the filter and re-subscribe. This time we
// should receive the notification.
flt.remove_source_to_ignore(&root_rtc);
subscription = remote.local().subscribe(flt);
res = remote.local()
Expand Down Expand Up @@ -1099,8 +1108,8 @@ TEST_P(all_types_remote_fixture, test_filter_lifetime_1) { // NOLINT
bucket_name, json_manifest_format_path, actual, child_rtc)
.get();
flt.reset();
// Notification should be received despite the fact that the filter object
// is destroyed.
// Notification should be received despite the fact that the filter
// object is destroyed.
ASSERT_TRUE(res == download_result::success);
ASSERT_TRUE(subscription.available());
ASSERT_TRUE(
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_storage_clients/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ redpanda_cc_library(
"@boost//:beast",
"@boost//:lexical_cast",
"@boost//:property_tree",
"@rapidjson",
"@seastar",
],
)
Loading