diff --git a/src/v/cloud_io/remote.cc b/src/v/cloud_io/remote.cc index 79c3772ee0603..b7aa3b63163f7 100644 --- a/src/v/cloud_io/remote.cc +++ b/src/v/cloud_io/remote.cc @@ -165,11 +165,11 @@ int remote::delete_objects_max_keys() const { case model::cloud_storage_backend::minio: // https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html return 1000; - case model::cloud_storage_backend::google_s3_compat: - [[fallthrough]]; case model::cloud_storage_backend::azure: // Will be supported once azurite supports batch blob delete - [[fallthrough]]; + return 256; + case model::cloud_storage_backend::google_s3_compat: + return 100; case model::cloud_storage_backend::unknown: return 1; } diff --git a/src/v/cloud_io/tests/s3_imposter.cc b/src/v/cloud_io/tests/s3_imposter.cc index d3036d08636b6..f0bccfe6223a4 100644 --- a/src/v/cloud_io/tests/s3_imposter.cc +++ b/src/v/cloud_io/tests/s3_imposter.cc @@ -369,6 +369,68 @@ struct s3_imposter_fixture::content_handler { } return R"xml()xml"; + } else if ( + request._method == "POST" && request._url.contains("batch")) { + vlog(fixt_log.trace, "Received batch request to {}", request._url); + if ( + expect_iter != expectations.end() + && expect_iter->second.body.has_value()) { + return expect_iter->second.body.value(); + } + auto keys_to_delete = keys_from_batch_delete_request(ri); + vlog( + fixt_log.trace, + "Parsed batched DELETE request with {} keys", + keys_to_delete.size()); + + constexpr std::string_view boundary = "response_boundary"; + + ss::sstring response; + for (size_t i = 0; i < keys_to_delete.size(); ++i) { + const auto& [path, key] = keys_to_delete[i]; + // ss::sstring to_delete = fmt::format("/{}", key().string()); + auto expect_iter = expectations.find(path); + bool obj_present = expect_iter != expectations.end() + && expect_iter->second.body.has_value(); + + if (!obj_present) { + // Missing objects are assumed to be not an error (e.g. + // caused by a delete retry). + vlog( + fixt_log.debug, + "Requested DELETE request of {}, not found", + path); + } else { + vlog(fixt_log.trace, "Batched DELETE request of {}", path); + expect_iter->second.body = std::nullopt; + } + + std::string_view code = [&key, obj_present]() { + if (key == "failme") { + return "500 Internal Server Error"; + } + return obj_present ? "204 No Content" : "404 Not Found"; + }(); + + response += fmt::format( + "--{}\r\n" + "Content-Type: application/http\r\n" + "Content-ID: response-{}\r\n\r\n" + "HTTP/1.1 {}\r\n" + "X-GUploader-UploadID: test-upload-id-{}\r\n\r\n", + boundary, + i, + code, + i); + } + + response += fmt::format("--{}--\r\n", boundary); + + repl.set_status(reply::status_type::ok); + repl.set_content_type( + fmt::format("multipart/mixed; boundary={}", boundary)); + + return response; } else { vunreachable("Unhandled request method {}", request._method); } @@ -434,18 +496,25 @@ s3_imposter_fixture::get_targets() const { void s3_imposter_fixture::set_expectations_and_listen( std::vector expectations, - std::optional> headers_to_store) { + std::optional> headers_to_store, + std::set content_type_overrides) { const ss::sstring url_prefix = "/" + url_base(); for (auto& expectation : expectations) { expectation.url.insert( expectation.url.begin(), url_prefix.begin(), url_prefix.end()); } _server - ->set_routes( - [this, &expectations, headers_to_store = std::move(headers_to_store)]( - ss::httpd::routes& r) mutable { - set_routes(r, expectations, std::move(headers_to_store)); - }) + ->set_routes([this, + &expectations, + headers_to_store = std::move(headers_to_store), + ct_overrides = std::move(content_type_overrides)]( + ss::httpd::routes& r) mutable { + set_routes( + r, + expectations, + std::move(headers_to_store), + std::move(ct_overrides)); + }) .get(); _server->listen(_server_addr).get(); } @@ -491,7 +560,8 @@ ss::sstring s3_imposter_fixture::url_base() const { void s3_imposter_fixture::set_routes( ss::httpd::routes& r, const std::vector& expectations, - std::optional> headers_to_store) { + std::optional> headers_to_store, + std::set content_type_overrides) { using namespace ss::httpd; using reply = ss::http::reply; _content_handler = ss::make_shared( @@ -500,7 +570,8 @@ void s3_imposter_fixture::set_routes( [this](const_req req, reply& repl, [[maybe_unused]] ss::sstring& type) { return _content_handler->handle(req, repl); }, - "xml"); + "xml", + std::move(content_type_overrides)); r.add_default_handler(_handler.get()); } @@ -569,3 +640,42 @@ keys_from_delete_objects_request(const http_test_utils::request_info& req) { return keys; } + +std::vector> +keys_from_batch_delete_request(const http_test_utils::request_info& req) { + std::vector> keys; + auto buffer_stream = std::istringstream{std::string{req.content}}; + + // crudely iterate over request lines, stripping out object keys + constexpr std::string_view method{"DELETE "}; + + std::string line; + while (std::getline(buffer_stream, line)) { + auto pos = line.find(method); + if (pos != 0) { + continue; + } + + pos += method.size(); + + auto ver_pos = line.find(" HTTP/"); + if (ver_pos == line.npos) { + continue; + } + + auto path = std::string_view{line}.substr(pos, ver_pos - pos); + auto last_slash_pos = path.find_last_of('/'); + if (last_slash_pos == path.npos) { + continue; + } + + auto key_pos = last_slash_pos + 1; + if (key_pos >= path.size()) { + continue; + } + + auto key = path.substr(key_pos); + keys.emplace_back(path, cloud_storage_clients::object_key{key}); + } + return keys; +} diff --git a/src/v/cloud_io/tests/s3_imposter.h b/src/v/cloud_io/tests/s3_imposter.h index 4875810cb9911..e17fb881f2d86 100644 --- a/src/v/cloud_io/tests/s3_imposter.h +++ b/src/v/cloud_io/tests/s3_imposter.h @@ -77,7 +77,8 @@ class s3_imposter_fixture { void set_expectations_and_listen( std::vector expectations, std::optional> headers_to_store - = std::nullopt); + = std::nullopt, + std::set content_type_overrides = {}); /// Update expectations for the REST API. void add_expectations(std::vector expectations); @@ -122,7 +123,8 @@ class s3_imposter_fixture { ss::httpd::routes& r, const std::vector& expectations, std::optional> headers_to_store - = std::nullopt); + = std::nullopt, + std::set content_type_overrides = {}); ss::socket_address _server_addr; ss::shared_ptr _server; @@ -157,3 +159,6 @@ cloud_storage_clients::http_byte_range parse_byte_header(std::string_view s); std::vector keys_from_delete_objects_request(const http_test_utils::request_info&); + +std::vector> +keys_from_batch_delete_request(const http_test_utils::request_info&); diff --git a/src/v/cloud_storage/tests/remote_test.cc b/src/v/cloud_storage/tests/remote_test.cc index 653b794ec8142..ed461ee44512d 100644 --- a/src/v/cloud_storage/tests/remote_test.cc +++ b/src/v/cloud_storage/tests/remote_test.cc @@ -917,7 +917,12 @@ TEST_P(all_types_remote_fixture, test_delete_objects_failure_handling) { } TEST_P(all_types_gcs_remote_fixture, test_delete_objects_on_unknown_backend) { - set_expectations_and_listen({}); + set_expectations_and_listen( + {} /* expectations */, + std::nullopt /* headers_to_store */, + {"multipart/mixed; boundary=response_boundary"} + /* content_type_overrides */ + ); retry_chain_node fib(never_abort, 60s, 20ms); @@ -945,24 +950,21 @@ TEST_P(all_types_gcs_remote_fixture, test_delete_objects_on_unknown_backend) { = remote.local().delete_objects(bucket_name, to_delete, fib).get(); ASSERT_EQ(cloud_storage::upload_result::success, result); - ASSERT_EQ(get_requests().size(), 4); - auto first_delete = get_requests()[2]; + ASSERT_EQ(get_requests().size(), 3); + auto batch_delete = get_requests()[2]; - std::unordered_set expected_urls{ + std::vector expected_urls{ "/" + url_base() + "p", "/" + url_base() + "q"}; - ASSERT_EQ(first_delete.method, "DELETE"); - ASSERT_TRUE(expected_urls.contains(first_delete.url)); - - expected_urls.erase(first_delete.url); - auto second_delete = get_requests()[3]; - ASSERT_EQ(second_delete.method, "DELETE"); - ASSERT_TRUE(expected_urls.contains(second_delete.url)); + ASSERT_EQ(batch_delete.method, "POST"); + ASSERT_TRUE(batch_delete.content.contains(expected_urls[0])); + ASSERT_TRUE(batch_delete.content.contains(expected_urls[1])); } TEST_P( all_types_gcs_remote_fixture, test_delete_objects_on_unknown_backend_result_reduction) { - set_expectations_and_listen({}); + set_expectations_and_listen( + {}, std::nullopt, {"multipart/mixed; boundary=response_boundary"}); retry_chain_node fib(never_abort, 5s, 20ms); @@ -981,17 +983,17 @@ TEST_P( // will time out cloud_storage_clients::object_key{"failme"}}; + // key 'failme' will produce a 500 error on the corresponding subrequest in + // s3_imposter. by design, this should fail the entire 'delete_objects' + // operations, though key 'p' may have been deleted in the process auto result = remote.local().delete_objects(bucket_name, to_delete, fib).get(); - if (conf.url_style == cloud_storage_clients::s3_url_style::virtual_host) { - // Due to virtual-host style addressing, this will timeout as DNS tries - // to resolve the request with the provided bucket name. - ASSERT_EQ(cloud_storage::upload_result::timedout, result); - } else { - // But, if we have path style addressing, the object won't be found, a - // warning will be issued, and the request will return success instead. - ASSERT_EQ(cloud_storage::upload_result::success, result); - } + ASSERT_EQ(cloud_storage::upload_result::failed, result); + + // drop the poison object key + to_delete.pop_back(); + result = remote.local().delete_objects(bucket_name, to_delete, fib).get(); + ASSERT_EQ(cloud_storage::upload_result::success, result); } TEST_P(all_types_remote_fixture, test_filter_by_source) { // NOLINT @@ -1038,8 +1040,8 @@ TEST_P(all_types_remote_fixture, test_filter_by_source) { // NOLINT ASSERT_TRUE( subscription.get().type == api_activity_type::manifest_download); - // Remove the rtc node from the filter and re-subscribe. This time we should - // receive the notification. + // Remove the rtc node from the filter and re-subscribe. This time we + // should receive the notification. flt.remove_source_to_ignore(&root_rtc); subscription = remote.local().subscribe(flt); res = remote.local() @@ -1099,8 +1101,8 @@ TEST_P(all_types_remote_fixture, test_filter_lifetime_1) { // NOLINT bucket_name, json_manifest_format_path, actual, child_rtc) .get(); flt.reset(); - // Notification should be received despite the fact that the filter object - // is destroyed. + // Notification should be received despite the fact that the filter + // object is destroyed. ASSERT_TRUE(res == download_result::success); ASSERT_TRUE(subscription.available()); ASSERT_TRUE( diff --git a/src/v/cloud_storage_clients/BUILD b/src/v/cloud_storage_clients/BUILD index 0141910bbe29d..5dc1d2a60da81 100644 --- a/src/v/cloud_storage_clients/BUILD +++ b/src/v/cloud_storage_clients/BUILD @@ -41,6 +41,7 @@ redpanda_cc_library( "//src/v/cloud_roles:auth_refresh_bg_op", "//src/v/cloud_roles:types", "//src/v/config", + "//src/v/container:chunked_hash_map", "//src/v/container:chunked_vector", "//src/v/container:intrusive", "//src/v/crash_tracker", @@ -64,9 +65,11 @@ redpanda_cc_library( "//src/v/utils:named_type", "//src/v/utils:retry_chain_node", "//src/v/utils:stop_signal", + "//src/v/utils:uuid", "@boost//:beast", "@boost//:lexical_cast", "@boost//:property_tree", + "@rapidjson", "@seastar", ], ) diff --git a/src/v/cloud_storage_clients/abs_client.cc b/src/v/cloud_storage_clients/abs_client.cc index ba9c8ad32946b..7339cb1d5d6bc 100644 --- a/src/v/cloud_storage_clients/abs_client.cc +++ b/src/v/cloud_storage_clients/abs_client.cc @@ -11,6 +11,7 @@ #include "cloud_storage_clients/abs_client.h" #include "base/vlog.h" +#include "bytes/iostream.h" #include "bytes/streambuf.h" #include "cloud_storage_clients/abs_error.h" #include "cloud_storage_clients/client_pool.h" @@ -20,10 +21,15 @@ #include "cloud_storage_clients/util.h" #include "cloud_storage_clients/xml_sax_parser.h" #include "config/configuration.h" +#include "container/chunked_hash_map.h" +#include "http/iobuf_body.h" #include "http/utils.h" #include "json/document.h" #include "json/istreamwrapper.h" +#include "utils/uuid.h" +#include +#include #include namespace { @@ -56,6 +62,9 @@ constexpr boost::beast::string_view expiry_option_name = "x-ms-expiry-option"; constexpr boost::beast::string_view expiry_option_value = "RelativeToNow"; constexpr boost::beast::string_view expiry_time_name = "x-ms-expiry-time"; +constexpr boost::beast::string_view content_type_multipart_val + = "multipart/mixed"; + constexpr boost::beast::string_view hierarchical_namespace_not_enabled_error_code = "HierarchicalNamespaceNotEnabled"; @@ -185,6 +194,72 @@ parse_header_error_response(const http::http_response::header_type& hdr) { return {code, message, hdr.result()}; } +/// Parse multipart/mixed batch delete response +/// The response format is: +/// --boundary +/// Content-Type: application/http +/// Content-ID: 0 +/// +/// HTTP/1.1 202 Accepted +/// x-ms-request-id: ... +/// ... +/// +/// --boundary +/// ... (more responses) +/// --boundary-- +static cloud_storage_clients::client::delete_objects_result +parse_batch_delete_response( + iobuf buf, + const ss::sstring& boundary, + const chunked_vector& keys) { + cloud_storage_clients::client::delete_objects_result result; + + // Simple multipart parser - split by boundary + auto boundary_delim = ssx::sformat("--{}", boundary); + util::multipart_response_parser parts{std::move(buf), boundary_delim}; + + constexpr auto convert_content_id = + [](std::string_view raw) -> std::optional { + size_t v{}; + auto res = std::from_chars(raw.data(), raw.data() + raw.size(), v); + return res.ec == std::errc{} ? std::make_optional(v) : std::nullopt; + }; + + chunked_hash_set content_ids_seen; + std::optional part; + while ((part = parts.get_part()).has_value()) { + iobuf_parser part_parser{std::move(part).value()}; + auto mime = util::mime_header::from(part_parser); + auto maybe_content_id = mime.content_id(convert_content_id); + if (!maybe_content_id.has_value()) { + throw std::runtime_error( + "ABS batch delete response part missing Content-ID"); + } + content_ids_seen.insert(maybe_content_id.value()); + // having stripped off the MIME headers, now we should have a complete + // header in header_buf + auto subrequest = util::multipart_subresponse::from(part_parser); + if (auto maybe_error = subrequest.error(error_code_name); + maybe_error.has_value()) { + result.undeleted_keys.push_back({ + .key = keys[maybe_content_id.value()], + .reason = std::move(maybe_error).value(), + }); + } + } + + for (auto id : std::views::iota(0ul, keys.size())) { + if (!content_ids_seen.contains(id)) { + result.undeleted_keys.push_back({ + .key = keys[id], + .reason = "Object missing from batch response", + }); + } + } + + return result; +} + abs_request_creator::abs_request_creator( const abs_configuration& conf, ss::lw_shared_ptr apply_credentials) @@ -304,6 +379,137 @@ abs_request_creator::make_delete_blob_request( return header; } +result>> +abs_request_creator::make_batch_delete_request( + const bucket_name& name, const chunked_vector& keys) { + // Azure Blob Storage Batch API + // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch + // + // POST /?comp=batch HTTP/1.1 + // Host: {storage-account-id}.blob.core.windows.net + // Content-Type: multipart/mixed; boundary=batch_ + // Content-Length: <...> + // x-ms-date: {req-datetime in RFC9110} # added by 'add_auth' + // x-ms-version: 2023-01-23 # added by 'add_auth' + // Authorization:{signature} # added by 'add_auth' + // + // Body structure: + // --batch_ + // Content-ID: 0 + // + // DELETE /{container-id}/{blob-id} HTTP/1.1 + // Content-Type: application/http + // Content-Transfer-Encoding: binary + // x-ms-delete-snapshots: include + // x-ms-date: {req-datetime in RFC9110} + // x-ms-version: 2023-01-23 # added by 'add_auth' + // Authorization:{signature} + // + // --batch_ + // ... (repeat for each blob) + // --batch_-- + + // Generate unique boundary using counter + auto boundary = fmt::format("batch_{}", uuid_t::create()); + + // Build the multipart body using iobuf and stream + iobuf body; + iobuf_ostreambuf obuf(body); + std::ostream out(&obuf); + + for (size_t i = 0; i < keys.size(); ++i) { + const auto& key = keys[i]; + + // Boundary line + fmt::print(out, "--{}\r\n", boundary); + + http::client::request_header part_header{}; + part_header.insert( + boost::beast::http::field::content_type, "application/http"); + part_header.insert( + boost::beast::http::field::content_transfer_encoding, "binary"); + part_header.insert( + boost::beast::http::field::content_id, fmt::to_string(i)); + + // Create individual delete request for this blob + + // Create a temporary header to get auth headers for this subrequest + // NOTE: Per + // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id#request-body + // - The subrequests should not have the x-ms-version header. + // - The subrequest URL should only have the path of the URL (without + // the host). + // - Each subrequest is authorized separately, with the provided + // information in the subrequest. + http::client::request_header subrequest_header{}; + subrequest_header.method(boost::beast::http::verb::delete_); + subrequest_header.target(fmt::format("/{}/{}", name(), key().string())); + subrequest_header.insert(delete_snapshot_name, delete_snapshot_value); + auto error_code = _apply_credentials->add_auth(subrequest_header); + if (error_code) { + return error_code; + } + + // Content-Length for DELETE is 0 + subrequest_header.insert( + boost::beast::http::field::content_length, fmt::to_string(0)); + + for (const auto& f : part_header) { + fmt::print(out, "{}: {}\r\n", f.name_string(), f.value()); + } + fmt::print(out, "\r\n"); + + fmt::print( + out, + "{} {} HTTP/1.1\r\n", + subrequest_header.method_string(), + subrequest_header.target()); + + // Add auth headers to body (excluding Host, as per Azure Batch API) + for (const auto& field : subrequest_header) { + // Skip method, target, and content ID as they're already included + fmt::print(out, "{}: {}\r\n", field.name_string(), field.value()); + } + + fmt::print(out, "\r\n"); + } + + // Final boundary + fmt::print(out, "--{}--\r\n", boundary); + + if (!out.good()) { + throw std::runtime_error( + fmt::format( + "failed to create batch delete request, state: {}", out.rdstate())); + } + + // Create the main request header + http::client::request_header header{}; + header.method(boost::beast::http::verb::post); + header.target("/?comp=batch"); + header.insert( + boost::beast::http::field::host, + boost::beast::string_view{_ap().data(), _ap().length()}); + header.insert( + boost::beast::http::field::content_type, + fmt::format("{}; boundary={}", content_type_multipart_val, boundary)); + header.insert( + boost::beast::http::field::content_length, + std::to_string(body.size_bytes())); + + auto error_code = _apply_credentials->add_auth(header); + if (error_code) { + return error_code; + } + + util::url_encode_target(header); + + // Convert iobuf to input_stream + auto stream = make_iobuf_input_stream(std::move(body)); + + return std::make_tuple(std::move(header), std::move(stream)); +} + result abs_request_creator::make_list_blobs_request( const bucket_name& name, @@ -857,24 +1063,68 @@ ss::future<> abs_client::do_delete_object( } } -ss::future> -abs_client::delete_objects( +ss::future +abs_client::do_batch_delete_objects( const bucket_name& bucket, const chunked_vector& keys, ss::lowres_clock::duration timeout) { - abs_client::delete_objects_result delete_objects_result; - for (const auto& key : keys) { - try { - auto res = co_await delete_object(bucket, key, timeout); - if (res.has_error()) { - delete_objects_result.undeleted_keys.push_back( - {key, fmt::format("{}", res.error())}); + auto request = _requestor.make_batch_delete_request(bucket, keys); + if (!request) { + co_return ss::coroutine::exception( + std::make_exception_ptr(std::system_error(request.error()))); + } + auto& [header, body] = request.value(); + vlog(abs_log.trace, "send batch delete request:\n{}", header); + + auto response_stream = co_await _client.request( + std::move(header), body, timeout); + + co_await response_stream->prefetch_headers(); + vassert(response_stream->is_header_done(), "Header is not received"); + + const auto status = response_stream->get_headers().result(); + if (status != boost::beast::http::status::accepted) { + const auto content_type = util::get_response_content_type( + response_stream->get_headers()); + auto buf = co_await http::drain(std::move(response_stream)); + throw parse_rest_error_response(content_type, status, std::move(buf)); + } + + // Extract boundary from Content-Type header + const auto& headers = response_stream->get_headers(); + auto content_type_it = headers.find( + boost::beast::http::field::content_type); + ss::sstring boundary; + if (content_type_it != headers.end()) { + ss::sstring content_type{ + content_type_it->value().data(), content_type_it->value().size()}; + auto boundary_pos = content_type.find("boundary="); + if (boundary_pos != ss::sstring::npos) { + boundary = content_type.substr(boundary_pos + 9); + // Remove quotes if present + if (!boundary.empty() && boundary.front() == '"') { + boundary = boundary.substr(1); + } + if (!boundary.empty() && boundary.back() == '"') { + boundary = boundary.substr(0, boundary.size() - 1); } - } catch (const std::exception& ex) { - delete_objects_result.undeleted_keys.push_back({key, ex.what()}); } } - co_return delete_objects_result; + + auto response_buf = co_await http::drain(std::move(response_stream)); + co_return parse_batch_delete_response( + std::move(response_buf), boundary, keys); +} + +ss::future> +abs_client::delete_objects( + const bucket_name& bucket, + const chunked_vector& keys, + ss::lowres_clock::duration timeout) { + // Use batch API for efficiency + const object_key dummy{""}; + co_return co_await send_request( + do_batch_delete_objects(bucket, keys, timeout), dummy); } ss::future> diff --git a/src/v/cloud_storage_clients/abs_client.h b/src/v/cloud_storage_clients/abs_client.h index c2e798a313735..e6e00c8b851f9 100644 --- a/src/v/cloud_storage_clients/abs_client.h +++ b/src/v/cloud_storage_clients/abs_client.h @@ -65,6 +65,19 @@ class abs_request_creator { result make_delete_blob_request(const bucket_name& name, const object_key& key); + /// \brief Create a 'Batch Delete' request header and body + /// + /// Uses the Azure Blob Storage Batch API to delete multiple blobs + /// in a single request. The request uses multipart/mixed encoding. + /// + /// \param name is a container + /// \param keys is a vector of blob names to delete + /// \return initialized and signed http header and body as input_stream or + /// error + result>> + make_batch_delete_request( + const bucket_name& name, const chunked_vector& keys); + // clang-format off /// \brief Initialize http header for 'List Blobs' request /// @@ -282,6 +295,11 @@ class abs_client : public client { const object_key& key, ss::lowres_clock::duration timeout); + ss::future do_batch_delete_objects( + const bucket_name& bucket, + const chunked_vector& keys, + ss::lowres_clock::duration timeout); + ss::future do_list_objects( const bucket_name& name, std::optional prefix, diff --git a/src/v/cloud_storage_clients/s3_client.cc b/src/v/cloud_storage_clients/s3_client.cc index 452b9037e536d..af51cb12ef9e2 100644 --- a/src/v/cloud_storage_clients/s3_client.cc +++ b/src/v/cloud_storage_clients/s3_client.cc @@ -25,9 +25,12 @@ #include "config/configuration.h" #include "config/node_config.h" #include "config/types.h" +#include "container/chunked_hash_map.h" #include "hashing/secure.h" #include "http/client.h" #include "http/utils.h" +#include "json/istreamwrapper.h" +#include "json/reader.h" #include "utils/base64.h" #include @@ -47,6 +50,7 @@ #include #include +#include #include #include #include @@ -382,6 +386,128 @@ request_creator::make_delete_objects_request( return {std::move(header), make_iobuf_input_stream(std::move(body))}; } +result>> +request_creator::make_gcs_batch_delete_request( + const bucket_name& name, const chunked_vector& keys) { + // Google Cloud Storage Batch API + // https://cloud.google.com/storage/docs/batch + // + // POST /batch/storage/v1 HTTP/1.1 + // Host: storage.googleapis.com + // Content-Type: multipart/mixed; boundary= + // Authorization: Bearer # added by 'add_auth' + // Content-Length: <...> + // + // Body structure: + // -- + // Content-Type: application/http + // Content-ID: + // + // DELETE /storage/v1/b//o/ HTTP/1.1 + // + // -- + // ... (repeat for each object) + // ---- + // + // Note: GCS batch API requires path-only URLs in subrequests + // Max 100 requests per batch, max 10MB payload + + // Generate unique boundary + auto boundary = fmt::format("batch_{}", uuid_t::create()); + + // Build the multipart body + iobuf body; + iobuf_ostreambuf obuf(body); + std::ostream out(&obuf); + + for (size_t i = 0; i < keys.size(); ++i) { + const auto& key = keys[i]; + + // Boundary line + fmt::print(out, "--{}\r\n", boundary); + + http::client::request_header part_header{}; + part_header.insert( + boost::beast::http::field::content_type, "application/http"); + part_header.insert( + boost::beast::http::field::content_transfer_encoding, "binary"); + part_header.insert( + boost::beast::http::field::content_id, fmt::to_string(i)); + + http::client::request_header subrequest_header{}; + subrequest_header.method(boost::beast::http::verb::delete_); + subrequest_header.target(make_target(name, key)); + subrequest_header.insert( + boost::beast::http::field::content_type, "application/json"); + subrequest_header.insert( + boost::beast::http::field::accept, "application/json"); + // Content-Length for DELETE is 0 + subrequest_header.insert( + boost::beast::http::field::content_length, fmt::to_string(0)); + util::url_encode_target(subrequest_header); + + // NOTE: Per docs.cloud.google.com/storage/docs/batch#http: + // if you provide an [Auth] header for a specific nested request, then + // that header applies only to the request that specified it. If you + // provide an [Auth] header for the outer request, then that header + // applies to all of the nested requests unless they override it with an + // [Auth] header of their own. + + // part header + + for (const auto& f : part_header) { + fmt::print(out, "{}: {}\r\n", f.name_string(), f.value()); + } + fmt::print(out, "\r\n"); + + // subrequest header + + fmt::print( + out, + "{} {} HTTP/1.1\r\n", + subrequest_header.method_string(), + subrequest_header.target()); + + for (const auto& f : subrequest_header) { + fmt::print(out, "{}: {}\r\n", f.name_string(), f.value()); + } + fmt::print(out, "\r\n"); + } + + // Final boundary + fmt::print(out, "--{}--\r\n", boundary); + + if (!out.good()) { + throw std::runtime_error(fmt_with_ctx( + fmt::format, + "failed to create GCS batch delete request, state: {}", + out.rdstate())); + } + + // Create the main request header + http::client::request_header header{}; + header.method(boost::beast::http::verb::post); + // GCS batch endpoint uses a fixed path, not bucket-specific + header.target("/batch/storage/v1"); + header.insert(boost::beast::http::field::host, "storage.googleapis.com"); + header.insert( + boost::beast::http::field::content_type, + fmt::format("multipart/mixed; boundary={}", boundary)); + header.insert( + boost::beast::http::field::content_length, + fmt::format("{}", body.size_bytes())); + + auto ec = _apply_credentials->add_auth(header); + if (ec) { + return ec; + } + + // Convert iobuf to input_stream + auto stream = make_iobuf_input_stream(std::move(body)); + + return {std::move(header), std::move(stream)}; +} + std::string request_creator::make_host(const bucket_name& name) const { switch (_ap_style) { case s3_url_style::virtual_host: @@ -504,6 +630,126 @@ ss::future parse_rest_error_response( "")); } +namespace { +struct gcs_error_handler + : public rapidjson::BaseReaderHandler, gcs_error_handler> { + enum class state : uint8_t { init, in_error, in_message, found }; + state st = state::init; + std::optional message; + bool Key(const char* str, rapidjson::SizeType len, bool) { + std::string_view k{str, len}; + if (st == state::init && k == "error") { + st = state::in_error; + } else if (st == state::in_error && k == "message") { + st = state::in_message; + } + return true; + } + + bool String(const char* str, rapidjson::SizeType len, bool) { + if (st == state::in_message) { + message.emplace(str, len); + st = state::found; + } + return true; + } + + bool EndObject(rapidjson::SizeType) { + if (st == state::in_error) { + st = state::init; + } + return true; + } +}; + +ss::sstring parse_gcs_error_reason(iobuf body) { + iobuf_istreambuf ibuf(body); + std::istream stream(&ibuf); + json::IStreamWrapper wrapper(stream); + json::Reader reader; + gcs_error_handler handler; + auto res = reader.Parse(wrapper, handler); + if (res && handler.message.has_value()) { + return std::move(handler.message).value(); + } else { + return "Unknown"; + } +}; +} // namespace + +/// Parse GCS batch delete response using multipart parsing utilities +/// GCS batch responses follow the multipart/mixed format similar to ABS +static cloud_storage_clients::client::delete_objects_result +parse_gcs_batch_delete_response( + iobuf buf, + const ss::sstring& boundary, + const chunked_vector& keys) { + cloud_storage_clients::client::delete_objects_result result; + + // Parse multipart response - split by boundary + auto boundary_delim = ssx::sformat("--{}", boundary); + util::multipart_response_parser parts{std::move(buf), boundary_delim}; + + constexpr auto convert_content_id = + [](std::string_view raw) -> std::optional { + constexpr std::string_view pfx = "response-"; + std::optional result{}; + if (auto pos = raw.find(pfx); pos != raw.npos) { + raw = raw.substr(pos + pfx.size()); + size_t v{}; + auto res = std::from_chars(raw.data(), raw.data() + raw.size(), v); + if (res.ec == std::errc{}) { + result = v; + } + } + return result; + }; + + chunked_hash_set content_ids_seen; + std::optional part; + while ((part = parts.get_part()).has_value()) { + iobuf_parser part_parser{std::move(part).value()}; + + // Parse MIME headers + auto mime = util::mime_header::from(part_parser); + auto maybe_content_id = mime.content_id(convert_content_id); + if (!maybe_content_id.has_value()) { + throw std::runtime_error( + "GCS batch delete response part missing Content-ID"); + } + content_ids_seen.insert(maybe_content_id.value()); + + // Parse the HTTP response for this subrequest + auto subrequest = util::multipart_subresponse::from(part_parser); + + // Check if this delete failed + // GCS uses standard HTTP status codes, with 2xx indicating success + // and 404 (Not Found) also treated as success for delete operations + + if (auto maybe_error = subrequest.error(parse_gcs_error_reason); + maybe_error.has_value()) { + // Extract error message from response if available + // GCS error responses may contain error details in the body + result.undeleted_keys.push_back({ + .key = keys[maybe_content_id.value()], + .reason = std::move(maybe_error).value(), + }); + } + } + + // Check for any keys that were not in the response + for (auto id : std::views::iota(0ul, keys.size())) { + if (!content_ids_seen.contains(id)) { + result.undeleted_keys.push_back({ + .key = keys[id], + .reason = "Object missing from GCS batch response", + }); + } + } + + return result; +} + /// Head response doesn't give us an XML encoded error object in /// the body. This method uses headers to generate an error object. template @@ -1228,7 +1474,79 @@ auto s3_client::delete_objects( ss::lowres_clock::duration timeout) -> ss::future> { const object_key dummy{""}; - co_return co_await send_request( - do_delete_objects(bucket, keys, timeout), bucket, dummy); + + // Determine which batch delete implementation to use based on backend + // GCS does not support S3-style batch deletes, so use GCS batch API + auto backend = config::shard_local_cfg().cloud_storage_backend(); + auto inferred_backend = infer_backend_from_uri(_requestor._ap); + + bool is_gcs = backend == model::cloud_storage_backend::google_s3_compat + || inferred_backend + == model::cloud_storage_backend::google_s3_compat; + + if (is_gcs) { + co_return co_await send_request( + do_gcs_batch_delete_objects(bucket, keys, timeout), bucket, dummy); + } else { + co_return co_await send_request( + do_delete_objects(bucket, keys, timeout), bucket, dummy); + } } + +auto s3_client::do_gcs_batch_delete_objects( + const bucket_name& bucket, + const chunked_vector& keys, + ss::lowres_clock::duration timeout) + -> ss::future { + auto request = _requestor.make_gcs_batch_delete_request(bucket, keys); + if (!request) { + co_return ss::coroutine::exception( + std::make_exception_ptr(std::system_error(request.error()))); + } + auto& [header, body] = request.value(); + vlog(s3_log.trace, "send GCS batch delete request:\n{}", header); + + auto response_stream = co_await _client.request( + std::move(header), body, timeout); + + co_await response_stream->prefetch_headers(); + vassert(response_stream->is_header_done(), "Header is not received"); + + const auto status = response_stream->get_headers().result(); + // GCS batch API returns 200 OK for successful batch requests + // Individual subrequest failures are encoded in the multipart response + if (status != boost::beast::http::status::ok) { + const auto content_type = util::get_response_content_type( + response_stream->get_headers()); + auto buf = co_await http::drain(std::move(response_stream)); + co_return co_await parse_rest_error_response( + content_type, status, std::move(buf)); + } + + // Extract boundary from Content-Type header + const auto& headers = response_stream->get_headers(); + auto content_type_it = headers.find( + boost::beast::http::field::content_type); + ss::sstring boundary; + if (content_type_it != headers.end()) { + ss::sstring content_type{ + content_type_it->value().data(), content_type_it->value().size()}; + auto boundary_pos = content_type.find("boundary="); + if (boundary_pos != ss::sstring::npos) { + boundary = content_type.substr(boundary_pos + 9); + // Remove quotes if present + if (!boundary.empty() && boundary.front() == '"') { + boundary = boundary.substr(1); + } + if (!boundary.empty() && boundary.back() == '"') { + boundary = boundary.substr(0, boundary.size() - 1); + } + } + } + + auto response_buf = co_await http::drain(std::move(response_stream)); + co_return parse_gcs_batch_delete_response( + std::move(response_buf), boundary, keys); +} + } // namespace cloud_storage_clients diff --git a/src/v/cloud_storage_clients/s3_client.h b/src/v/cloud_storage_clients/s3_client.h index a699a0e10cb5d..16319a771838e 100644 --- a/src/v/cloud_storage_clients/s3_client.h +++ b/src/v/cloud_storage_clients/s3_client.h @@ -85,6 +85,17 @@ class request_creator { make_delete_objects_request( const bucket_name& name, const chunked_vector& keys); + /// \brief Create a GCS batch delete request header and body + /// Uses the GCS batch API endpoint with multipart/mixed format + /// https://cloud.google.com/storage/docs/batch + /// + /// \param name of the bucket + /// \param keys to delete + /// \return the header and an the body as an input_stream + result>> + make_gcs_batch_delete_request( + const bucket_name& name, const chunked_vector& keys); + /// \brief Initialize http header for 'ListObjectsV2' request /// /// \param name of the bucket @@ -244,6 +255,11 @@ class s3_client : public client { const chunked_vector& keys, ss::lowres_clock::duration timeout); + ss::future do_gcs_batch_delete_objects( + const bucket_name& bucket, + const chunked_vector& keys, + ss::lowres_clock::duration timeout); + template ss::future> send_request( ss::future request_future, diff --git a/src/v/cloud_storage_clients/tests/BUILD b/src/v/cloud_storage_clients/tests/BUILD index 7693204faa6bc..5f9defaaa672c 100644 --- a/src/v/cloud_storage_clients/tests/BUILD +++ b/src/v/cloud_storage_clients/tests/BUILD @@ -95,6 +95,8 @@ redpanda_cc_btest( "util_test.cc", ], deps = [ + "//src/v/bytes:iobuf", + "//src/v/bytes:iobuf_parser", "//src/v/cloud_storage_clients", "//src/v/test_utils:seastar_boost", "@boost//:test", @@ -143,3 +145,42 @@ redpanda_cc_btest( "@seastar//:testing", ], ) + +redpanda_cc_gtest( + name = "abs_client_test", + timeout = "short", + srcs = [ + "abs_client_test.cc", + ], + deps = [ + "//src/v/base", + "//src/v/bytes:iobuf", + "//src/v/cloud_storage_clients", + "//src/v/http/tests:utils", + "//src/v/net", + "//src/v/net:types", + "//src/v/test_utils:gtest", + "//src/v/utils:unresolved_address", + "@googletest//:gtest", + "@seastar", + "@seastar//:testing", + ], +) + +redpanda_cc_gtest( + name = "gcs_client_test", + timeout = "short", + srcs = [ + "gcs_client_test.cc", + ], + deps = [ + "//src/v/base", + "//src/v/cloud_storage_clients", + "//src/v/http/tests:utils", + "//src/v/net", + "//src/v/test_utils:gtest", + "//src/v/utils:unresolved_address", + "@googletest//:gtest", + "@seastar", + ], +) diff --git a/src/v/cloud_storage_clients/tests/abs_client_test.cc b/src/v/cloud_storage_clients/tests/abs_client_test.cc new file mode 100644 index 0000000000000..8d3335aaa182a --- /dev/null +++ b/src/v/cloud_storage_clients/tests/abs_client_test.cc @@ -0,0 +1,423 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "base/seastarx.h" +#include "bytes/iobuf.h" +#include "cloud_storage_clients/abs_client.h" +#include "cloud_storage_clients/configuration.h" +#include "http/tests/utils.h" +#include "net/dns.h" +#include "utils/unresolved_address.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +using namespace std::chrono_literals; +using namespace cloud_storage_clients; + +static const uint16_t httpd_port_number = 4434; +static constexpr const char* httpd_host_name = "localhost"; + +namespace { + +/// Mock multipart/mixed response for batch delete success +/// Returns a response with all deletes successful (202 Accepted) +ss::sstring make_batch_delete_success_response( + const ss::sstring& boundary, size_t num_keys) { + ss::sstring response; + for (size_t i = 0; i < num_keys; ++i) { + response += fmt::format( + "--{}\r\n" + "Content-Type: application/http\r\n" + "Content-ID: {}\r\n\r\n" + "HTTP/1.1 202 Accepted\r\n" + "x-ms-request-id: test-request-id-{}\r\n" + "x-ms-version: 2023-01-03\r\n\r\n", + boundary, + i, + i); + } + response += fmt::format("--{}--\r\n", boundary); + return response; +} + +/// Mock multipart/mixed response for batch delete with errors +/// Returns a response with all deletes failed +ss::sstring make_batch_delete_error_response( + const ss::sstring& boundary, size_t num_keys, const ss::sstring& error_code) { + ss::sstring response; + for (size_t i = 0; i < num_keys; ++i) { + response += fmt::format( + "--{}\r\n" + "Content-Type: application/http\r\n" + "Content-ID: {}\r\n\r\n" + "HTTP/1.1 403 Forbidden\r\n" + "x-ms-error-code: {}\r\n" + "x-ms-request-id: test-request-id-{}\r\n" + "x-ms-version: 2023-01-03\r\n\r\n", + boundary, + i, + error_code, + i); + } + response += fmt::format("--{}--\r\n", boundary); + return response; +} + +/// Mock multipart/mixed response for batch delete with partial errors +/// First key succeeds, rest fail +ss::sstring make_batch_delete_partial_error_response( + const ss::sstring& boundary, size_t num_keys, const ss::sstring& error_code) { + ss::sstring response; + // First key succeeds + response += fmt::format( + "--{}\r\n" + "Content-Type: application/http\r\n" + "Content-ID: 0\r\n\r\n" + "HTTP/1.1 202 Accepted\r\n" + "x-ms-request-id: test-request-id-0\r\n" + "x-ms-version: 2023-01-03\r\n\r\n", + boundary); + + // Rest fail + for (size_t i = 1; i < num_keys; ++i) { + response += fmt::format( + "--{}\r\n" + "Content-Type: application/http\r\n" + "Content-ID: {}\r\n\r\n" + "HTTP/1.1 403 Forbidden\r\n" + "x-ms-error-code: {}\r\n" + "x-ms-request-id: test-request-id-{}\r\n" + "x-ms-version: 2023-01-03\r\n\r\n", + boundary, + i, + error_code, + i); + } + response += fmt::format("--{}--\r\n", boundary); + return response; +} + +/// Mock multipart/mixed response for batch delete with 404s (not found) +/// Returns 404 for all deletes (which should be treated as success) +ss::sstring make_batch_delete_not_found_response( + const ss::sstring& boundary, size_t num_keys) { + ss::sstring response; + for (size_t i = 0; i < num_keys; ++i) { + response += fmt::format( + "--{}\r\n" + "Content-Type: application/http\r\n" + "Content-ID: {}\r\n\r\n" + "HTTP/1.1 404 Not Found\r\n" + "x-ms-error-code: BlobNotFound\r\n" + "x-ms-request-id: test-request-id-{}\r\n" + "x-ms-version: 2023-01-03\r\n\r\n", + boundary, + i, + i); + } + response += fmt::format("--{}--\r\n", boundary); + return response; +} + +void set_routes(ss::httpd::routes& r) { + using namespace ss::httpd; + using reply = ss::http::reply; + using flexible_function_handler + = http::test_utils::flexible_function_handler; + + // Create a universal handler that dispatches based on Host header + // Path format: /?comp=batch + // The container name is embedded in the Host header + auto dispatch_handler = new flexible_function_handler( + []( + const_req req, reply& reply, ss::sstring& content_type) -> std::string { + auto host = std::string{req.get_header("Host")}; + + if ( + !req.has_query_param("comp") + || req.get_query_param("comp") != "batch") { + reply.set_status(reply::status_type::bad_request); + content_type = "text/plain"; + return "missing comp=batch query parameter"; + } + + // Count number of Content-ID entries in request body + size_t num_keys = 0; + size_t pos = 0; + while ((pos = req.content.find("Content-ID:", pos)) + != seastar::sstring::npos) { + ++num_keys; + pos += 11; + } + + auto response_boundary = ss::sstring{"batch_response_boundary"}; + std::string response_body; + + // Dispatch based on host + if (host.starts_with("test-success.")) { + response_body = make_batch_delete_success_response( + response_boundary, num_keys) + .c_str(); + reply.set_status(reply::status_type::accepted); + content_type = fmt::format( + "multipart/mixed; boundary={}", response_boundary); + } else if (host.starts_with("test-errors.")) { + response_body = make_batch_delete_error_response( + response_boundary, + num_keys, + "InvalidAuthenticationInfo") + .c_str(); + reply.set_status(reply::status_type::accepted); + content_type = fmt::format( + "multipart/mixed; boundary={}", response_boundary); + } else if (host.starts_with("test-partial.")) { + response_body = make_batch_delete_partial_error_response( + response_boundary, + num_keys, + "AuthenticationFailed") + .c_str(); + reply.set_status(reply::status_type::accepted); + content_type = fmt::format( + "multipart/mixed; boundary={}", response_boundary); + } else if (host.starts_with("test-notfound.")) { + response_body = make_batch_delete_not_found_response( + response_boundary, num_keys) + .c_str(); + reply.set_status(reply::status_type::accepted); + content_type = fmt::format( + "multipart/mixed; boundary={}", response_boundary); + } else if (host.starts_with("test-servererror.")) { + reply.set_status(reply::status_type::internal_server_error); + content_type = "application/xml"; + return R"xml( + + InternalError + The server encountered an internal error. +)xml"; + } else { + reply.set_status(reply::status_type::bad_request); + content_type = "text/plain"; + return "unknown test host"; + } + + return response_body; + }, + "txt"); + + r.add(operation_type::POST, url("/"), dispatch_handler); +} + +abs_configuration make_test_configuration(std::string_view account) { + net::unresolved_address server_addr(httpd_host_name, httpd_port_number); + abs_configuration conf; + conf.storage_account_name = cloud_roles::storage_account(account); + conf.uri = access_point_uri( + fmt::format("{}.{}", conf.storage_account_name(), httpd_host_name)); + // Use a valid base64-encoded test key (88 characters, typical Azure key + // length) This is a randomly generated test key + conf.shared_key = cloud_roles::private_key_str( + "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/" + "K1SZFPTOtr/KBHBeksoGMGw=="); + conf.is_hns_enabled = false; + conf.server_addr = server_addr; + return conf; +} + +ss::lw_shared_ptr +make_test_credentials(const abs_configuration& cfg) { + return ss::make_lw_shared( + cloud_roles::make_credentials_applier( + cloud_roles::abs_credentials{ + cfg.storage_account_name, cfg.shared_key.value()})); +} + +class abs_client_fixture : public ::testing::Test { +public: + void set_up(std::string_view account = "test-success") { + server = ss::make_shared(); + server->start().get(); + server->set_routes(set_routes).get(); + + auto conf = make_test_configuration(account); + auto resolved = net::resolve_dns(conf.server_addr).get(); + server->listen(resolved).get(); + + auto transport_conf = build_transport_configuration(conf).get(); + client = ss::make_shared( + nullptr, + conf, + transport_conf, + conf.make_probe(), + make_test_credentials(conf)); + } + + void TearDown() override { + client->shutdown(); + server->stop().get(); + } + + ss::shared_ptr server; + ss::shared_ptr client; +}; + +} // anonymous namespace + +TEST_F(abs_client_fixture, test_batch_delete_success) { + set_up("test-success"); + // Test deleting 3 objects successfully + auto keys = chunked_vector{ + object_key{"key1"}, object_key{"key2"}, object_key{"key3"}}; + + auto result = client + ->delete_objects( + bucket_name{"test-success"}, + keys, + http::default_connect_timeout) + .get(); + + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE(result.value().undeleted_keys.empty()); +} + +TEST_F(abs_client_fixture, test_batch_delete_single_key) { + set_up("test-success"); + // Test deleting a single object + auto keys = chunked_vector{object_key{"single-key"}}; + + auto result = client + ->delete_objects( + bucket_name{"test-success"}, + keys, + http::default_connect_timeout) + .get(); + + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE(result.value().undeleted_keys.empty()); +} + +TEST_F(abs_client_fixture, test_batch_delete_many_keys) { + set_up("test-success"); + // Test deleting many objects (simulate batch behavior) + chunked_vector keys; + for (int i = 0; i < 10; ++i) { + keys.push_back(object_key{fmt::format("key-{}", i)}); + } + + auto result = client + ->delete_objects( + bucket_name{"test-success"}, + keys, + http::default_connect_timeout) + .get(); + + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE(result.value().undeleted_keys.empty()); +} + +TEST_F(abs_client_fixture, test_batch_delete_all_errors) { + set_up("test-errors"); + auto keys = chunked_vector{ + object_key{"key1"}, object_key{"key2"}, object_key{"key3"}}; + + auto result = client + ->delete_objects( + bucket_name{"test-errors"}, + keys, + http::default_connect_timeout) + .get(); + + ASSERT_TRUE(result.has_value()); + // All keys should have failed + EXPECT_EQ(result.value().undeleted_keys.size(), 3); + + // Verify all keys are in undeleted_keys + for (size_t i = 0; i < keys.size(); ++i) { + bool found = false; + for (const auto& undeleted : result.value().undeleted_keys) { + if (undeleted.key == keys[i]) { + found = true; + EXPECT_FALSE(undeleted.reason.empty()); + break; + } + } + EXPECT_TRUE(found) << "Key " << keys[i]() + << " not found in undeleted_keys"; + } +} + +TEST_F(abs_client_fixture, test_batch_delete_partial_errors) { + set_up("test-partial"); + auto keys = chunked_vector{ + object_key{"key1"}, object_key{"key2"}, object_key{"key3"}}; + + auto result = client + ->delete_objects( + bucket_name{"test-partial"}, + keys, + http::default_connect_timeout) + .get(); + + ASSERT_TRUE(result.has_value()); + // First key succeeds, rest fail + EXPECT_EQ(result.value().undeleted_keys.size(), 2); + + // Verify the failed keys are key2 and key3 + for (const auto& undeleted : result.value().undeleted_keys) { + EXPECT_TRUE(undeleted.key == keys[1] || undeleted.key == keys[2]); + EXPECT_FALSE(undeleted.reason.empty()); + } +} + +TEST_F(abs_client_fixture, test_batch_delete_not_found) { + set_up("test-notfound"); + // Test deleting non-existent objects (404 should be treated as success) + auto keys = chunked_vector{ + object_key{"missing1"}, object_key{"missing2"}}; + + auto result = client + ->delete_objects( + bucket_name{"test-notfound"}, + keys, + http::default_connect_timeout) + .get(); + + ASSERT_TRUE(result.has_value()); + // 404 is treated as success for delete operations + EXPECT_TRUE(result.value().undeleted_keys.empty()); +} + +TEST_F(abs_client_fixture, test_batch_delete_server_error) { + set_up("test-servererror"); + auto keys = chunked_vector{object_key{"key1"}}; + + auto result = client + ->delete_objects( + bucket_name{"test-servererror"}, + keys, + http::default_connect_timeout) + .get(); + + // Server error should result in error_outcome::retry + ASSERT_FALSE(result.has_value()); + EXPECT_EQ(result.error(), error_outcome::retry); +} diff --git a/src/v/cloud_storage_clients/tests/gcs_client_test.cc b/src/v/cloud_storage_clients/tests/gcs_client_test.cc new file mode 100644 index 0000000000000..fe41c81a1d671 --- /dev/null +++ b/src/v/cloud_storage_clients/tests/gcs_client_test.cc @@ -0,0 +1,438 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "base/seastarx.h" +#include "cloud_storage_clients/configuration.h" +#include "cloud_storage_clients/s3_client.h" +#include "http/tests/utils.h" +#include "net/dns.h" +#include "utils/unresolved_address.h" + +#include +#include +#include +#include +#include + +#include + +using namespace std::chrono_literals; +using namespace cloud_storage_clients; + +static const uint16_t httpd_port_number = 14435; +static constexpr const char* httpd_host_name = "127.0.0.1"; + +namespace { + +/// Mock multipart/mixed response for GCS batch delete success +/// Returns a response with all deletes successful (204 No Content) +ss::sstring make_gcs_batch_delete_success_response( + const ss::sstring& boundary, size_t num_keys) { + ss::sstring response; + for (size_t i = 0; i < num_keys; ++i) { + response += fmt::format( + "--{}\r\n" + "Content-Type: application/http\r\n" + "Content-ID: response-{}\r\n\r\n" + "HTTP/1.1 204 No Content\r\n" + "X-GUploader-UploadID: test-upload-id-{}\r\n\r\n", + boundary, + i, + i); + } + response += fmt::format("--{}--\r\n", boundary); + return response; +} + +/// Mock multipart/mixed response for GCS batch delete with errors +/// Returns a response with all deletes failed (403 Forbidden) +ss::sstring make_gcs_batch_delete_error_response( + const ss::sstring& boundary, size_t num_keys) { + ss::sstring response; + for (size_t i = 0; i < num_keys; ++i) { + response += fmt::format( + "--{}\r\n" + "Content-Type: application/http\r\n" + "Content-ID: response-{}\r\n\r\n" + "HTTP/1.1 403 Forbidden\r\n" + "Content-Type: application/json; charset=UTF-8\r\n" + "Content-Length: 150\r\n\r\n" + R"json({{ + "error": {{ + "code": 403, + "message": "Access denied", + "errors": [{{ + "domain": "global", + "reason": "forbidden" + }}] + }} +}})json", + boundary, + i); + } + response += fmt::format("--{}--\r\n", boundary); + return response; +} + +/// Mock multipart/mixed response for GCS batch delete with partial errors +/// First key succeeds, rest fail +ss::sstring make_gcs_batch_delete_partial_error_response( + const ss::sstring& boundary, size_t num_keys) { + ss::sstring response; + // First key succeeds + response += fmt::format( + "--{}\r\n" + "Content-Type: application/http\r\n" + "Content-ID: response-0\r\n\r\n" + "HTTP/1.1 204 No Content\r\n" + "X-GUploader-UploadID: test-upload-id-0\r\n\r\n", + boundary); + + // Rest fail + for (size_t i = 1; i < num_keys; ++i) { + response += fmt::format( + "--{}\r\n" + "Content-Type: application/http\r\n" + "Content-ID: response-{}\r\n\r\n" + "HTTP/1.1 403 Forbidden\r\n" + "Content-Type: application/json; charset=UTF-8\r\n\r\n" + R"json({{ + "error": {{ + "code": 403, + "message": "Forbidden" + }} +}})json", + boundary, + i); + } + response += fmt::format("--{}--\r\n", boundary); + return response; +} + +/// Mock multipart/mixed response for GCS batch delete with 404s (not found) +/// Returns 404 for all deletes (which should be treated as success) +ss::sstring make_gcs_batch_delete_not_found_response( + const ss::sstring& boundary, size_t num_keys) { + ss::sstring response; + for (size_t i = 0; i < num_keys; ++i) { + response += fmt::format( + "--{}\r\n" + "Content-Type: application/http\r\n" + "Content-ID: response-{}\r\n\r\n" + "HTTP/1.1 404 Not Found\r\n" + "Content-Type: application/json; charset=UTF-8\r\n\r\n" + R"json({{ + "error": {{ + "code": 404, + "message": "Not Found" + }} +}})json", + boundary, + i); + } + response += fmt::format("--{}--\r\n", boundary); + return response; +} + +void set_routes(ss::httpd::routes& r) { + using namespace ss::httpd; + using reply = ss::http::reply; + using flexible_function_handler + = http::test_utils::flexible_function_handler; + + // GCS batch API endpoint: POST /batch/storage/v1 + // Dispatch based on Authorization header to determine test scenario + auto dispatch_handler = new flexible_function_handler( + []( + const_req req, reply& reply, ss::sstring& content_type) -> std::string { + // Check that this is a batch request + if (req._url != "/batch/storage/v1") { + reply.set_status(reply::status_type::bad_request); + content_type = "application/json"; + return R"json({"error": {"code": 400, "message": "Invalid endpoint"}})json"; + } + + // Count number of Content-ID entries in request body + size_t num_keys = 0; + size_t pos = 0; + while ((pos = req.content.find("Content-ID:", pos)) + != seastar::sstring::npos) { + ++num_keys; + pos += 11; + } + + auto response_boundary = ss::sstring{"batch_response_boundary"}; + std::string response_body; + + // Dispatch based on Authorization header (test scenarios) + auto auth = std::string{req.get_header("Authorization")}; + if (auth.find("test-success") != std::string::npos) { + response_body = make_gcs_batch_delete_success_response( + response_boundary, num_keys) + .c_str(); + reply.set_status(reply::status_type::ok); + content_type = fmt::format( + "multipart/mixed; boundary={}", response_boundary); + } else if (auth.find("test-errors") != std::string::npos) { + response_body = make_gcs_batch_delete_error_response( + response_boundary, num_keys) + .c_str(); + reply.set_status(reply::status_type::ok); + content_type = fmt::format( + "multipart/mixed; boundary={}", response_boundary); + } else if (auth.find("test-partial") != std::string::npos) { + response_body = make_gcs_batch_delete_partial_error_response( + response_boundary, num_keys) + .c_str(); + reply.set_status(reply::status_type::ok); + content_type = fmt::format( + "multipart/mixed; boundary={}", response_boundary); + } else if (auth.find("test-notfound") != std::string::npos) { + response_body = make_gcs_batch_delete_not_found_response( + response_boundary, num_keys) + .c_str(); + reply.set_status(reply::status_type::ok); + content_type = fmt::format( + "multipart/mixed; boundary={}", response_boundary); + } else if (auth.find("test-servererror") != std::string::npos) { + reply.set_status(reply::status_type::internal_server_error); + content_type = "application/json"; + return R"json({"error": {"code": 500, "message": "Internal Server Error"}})json"; + } else { + reply.set_status(reply::status_type::unauthorized); + content_type = "application/json"; + return R"json({"error": {"code": 401, "message": "Unauthorized"}})json"; + } + + return response_body; + }, + "txt"); + + r.add(operation_type::POST, url("/batch/storage/v1"), dispatch_handler); +} + +s3_configuration make_test_configuration(std::string_view scenario) { + net::unresolved_address server_addr(httpd_host_name, httpd_port_number); + s3_configuration conf; + // Use a test access key that encodes the scenario name for dispatch + conf.access_key = cloud_roles::public_key_str( + fmt::format("AKIAIOSFODNN7EXAMPLE-{}", scenario)); + conf.secret_key = cloud_roles::private_key_str( + "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"); + conf.region = cloud_roles::aws_region_name("auto"); + // Critical: Set URI to match GCS pattern so backend inference works + conf.uri = access_point_uri("storage.googleapis.com"); + conf.server_addr = server_addr; + conf.url_style = s3_url_style::path; + return conf; +} + +ss::lw_shared_ptr +make_test_credentials(const s3_configuration& cfg) { + return ss::make_lw_shared( + cloud_roles::make_credentials_applier( + cloud_roles::aws_credentials{ + cfg.access_key.value(), + cfg.secret_key.value(), + std::nullopt, + cfg.region})); +} + +class gcs_client_fixture : public ::testing::Test { +public: + void set_up(std::string_view scenario = "test-success") { + server = ss::make_shared(); + server->start().get(); + server->set_routes(set_routes).get(); + + auto conf = make_test_configuration(scenario); + auto resolved = net::resolve_dns(conf.server_addr).get(); + server->listen(resolved).get(); + + auto transport_conf = build_transport_configuration(conf).get(); + client = ss::make_shared( + nullptr, + conf, + transport_conf, + conf.make_probe(), + make_test_credentials(conf)); + } + + void TearDown() override { + if (client) { + client->shutdown(); + client->stop().get(); + } + if (server) { + server->stop().get(); + } + } + + ss::shared_ptr server; + ss::shared_ptr client; +}; + +} // anonymous namespace + +TEST_F(gcs_client_fixture, test_gcs_batch_delete_success) { + set_up("test-success"); + // Test deleting 3 objects successfully + auto keys = chunked_vector{ + object_key{"key1"}, object_key{"key2"}, object_key{"key3"}}; + + auto result = client + ->delete_objects( + bucket_name{"test-bucket"}, + keys, + http::default_connect_timeout) + .get(); + + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE(result.value().undeleted_keys.empty()); +} + +TEST_F(gcs_client_fixture, test_gcs_batch_delete_single_key) { + set_up("test-success"); + // Test deleting a single object + auto keys = chunked_vector{object_key{"single-key"}}; + + auto result = client + ->delete_objects( + bucket_name{"test-bucket"}, + keys, + http::default_connect_timeout) + .get(); + + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE(result.value().undeleted_keys.empty()); +} + +TEST_F(gcs_client_fixture, test_gcs_batch_delete_many_keys) { + set_up("test-success"); + // Test deleting many objects (simulate batch behavior) + chunked_vector keys; + for (int i = 0; i < 10; ++i) { + keys.push_back(object_key{fmt::format("key-{}", i)}); + } + + auto result = client + ->delete_objects( + bucket_name{"test-bucket"}, + keys, + http::default_connect_timeout) + .get(); + + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE(result.value().undeleted_keys.empty()); +} + +TEST_F(gcs_client_fixture, test_gcs_batch_delete_all_errors) { + set_up("test-errors"); + auto keys = chunked_vector{ + object_key{"key1"}, object_key{"key2"}, object_key{"key3"}}; + + auto result = client + ->delete_objects( + bucket_name{"test-bucket"}, + keys, + http::default_connect_timeout) + .get(); + + ASSERT_TRUE(result.has_value()); + // All keys should have failed + EXPECT_EQ(result.value().undeleted_keys.size(), 3); + + // Verify all keys are in undeleted_keys + for (const auto& k : keys) { + bool found = false; + for (const auto& undeleted : result.value().undeleted_keys) { + if (undeleted.key == k) { + found = true; + EXPECT_NE( + undeleted.reason.find("Access denied"), ss::sstring::npos); + break; + } + } + EXPECT_TRUE(found) << "Key " << k << " not found in undeleted_keys"; + } +} + +TEST_F(gcs_client_fixture, test_gcs_batch_delete_partial_errors) { + set_up("test-partial"); + auto keys = chunked_vector{ + object_key{"key1"}, object_key{"key2"}, object_key{"key3"}}; + + auto result = client + ->delete_objects( + bucket_name{"test-bucket"}, + keys, + http::default_connect_timeout) + .get(); + + ASSERT_TRUE(result.has_value()); + // First key succeeds, rest fail + EXPECT_EQ(result.value().undeleted_keys.size(), 2); + + // Verify the failed keys are key2 and key3 + for (const auto& undeleted : result.value().undeleted_keys) { + EXPECT_TRUE(undeleted.key == keys[1] || undeleted.key == keys[2]); + EXPECT_NE(undeleted.reason.find("Forbidden"), ss::sstring::npos); + } +} + +TEST_F(gcs_client_fixture, test_gcs_batch_delete_not_found) { + set_up("test-notfound"); + // Test deleting non-existent objects (404 should be treated as success) + auto keys = chunked_vector{ + object_key{"missing1"}, object_key{"missing2"}}; + + auto result = client + ->delete_objects( + bucket_name{"test-bucket"}, + keys, + http::default_connect_timeout) + .get(); + + ASSERT_TRUE(result.has_value()); + // 404 is treated as success for delete operations + EXPECT_TRUE(result.value().undeleted_keys.empty()); +} + +TEST_F(gcs_client_fixture, test_gcs_batch_delete_server_error) { + set_up("test-servererror"); + auto keys = chunked_vector{object_key{"key1"}}; + + auto result = client + ->delete_objects( + bucket_name{"test-bucket"}, + keys, + http::default_connect_timeout) + .get(); + + // Server error should result in error_outcome::retry + ASSERT_FALSE(result.has_value()); + EXPECT_EQ(result.error(), error_outcome::retry); +} + +TEST_F(gcs_client_fixture, test_gcs_batch_delete_unauthorized) { + set_up("test-unauthorized"); + auto keys = chunked_vector{object_key{"key1"}}; + + auto result = client + ->delete_objects( + bucket_name{"test-bucket"}, + keys, + http::default_connect_timeout) + .get(); + + // Server error should result in error_outcome::retry + ASSERT_FALSE(result.has_value()); + EXPECT_EQ(result.error(), error_outcome::fail); +} diff --git a/src/v/cloud_storage_clients/tests/util_test.cc b/src/v/cloud_storage_clients/tests/util_test.cc index a26fb9bdb5122..a5e3a70fd6cca 100644 --- a/src/v/cloud_storage_clients/tests/util_test.cc +++ b/src/v/cloud_storage_clients/tests/util_test.cc @@ -1,3 +1,5 @@ +#include "bytes/iobuf.h" +#include "bytes/iobuf_parser.h" #include "cloud_storage_clients/util.h" #include @@ -22,3 +24,519 @@ BOOST_AUTO_TEST_CASE(test_all_paths_to_file) { auto result4 = util::all_paths_to_file(object_key{"foo"}); BOOST_REQUIRE_EQUAL(result4, std::vector{object_key{"foo"}}); } + +// ============================================================================ +// mime_header tests +// ============================================================================ + +namespace { +constexpr auto convert_cid = [](std::string_view raw) { + return std::make_optional(std::stoi(std::string{raw})); +}; +} // namespace + +BOOST_AUTO_TEST_CASE(test_mime_header_parse_basic) { + using namespace cloud_storage_clients; + + const char* mime_data = "Content-Type: application/http\r\n" + "Content-ID: 42\r\n" + "\r\n"; + + iobuf buf; + buf.append(mime_data, strlen(mime_data)); + iobuf_parser parser(std::move(buf)); + + auto header = util::mime_header::from(parser); + + // Check Content-Type + auto content_type = header.get(boost::beast::http::field::content_type); + BOOST_REQUIRE(content_type.has_value()); + BOOST_REQUIRE_EQUAL(content_type.value(), "application/http"); + + // Check Content-ID + auto content_id = header.content_id(convert_cid); + BOOST_REQUIRE(content_id.has_value()); + BOOST_REQUIRE_EQUAL(content_id.value(), 42); +} + +BOOST_AUTO_TEST_CASE(test_mime_header_parse_only_content_type) { + using namespace cloud_storage_clients; + + const char* mime_data = "Content-Type: text/plain\r\n" + "\r\n"; + + iobuf buf; + buf.append(mime_data, strlen(mime_data)); + iobuf_parser parser(std::move(buf)); + + auto header = util::mime_header::from(parser); + + auto content_type = header.get(boost::beast::http::field::content_type); + BOOST_REQUIRE(content_type.has_value()); + BOOST_REQUIRE_EQUAL(content_type.value(), "text/plain"); + + // Content-ID should be absent + auto content_id = header.content_id(convert_cid); + BOOST_REQUIRE(!content_id.has_value()); +} + +BOOST_AUTO_TEST_CASE(test_mime_header_parse_only_content_id) { + using namespace cloud_storage_clients; + + const char* mime_data = "Content-ID: 0\r\n" + "\r\n"; + + iobuf buf; + buf.append(mime_data, strlen(mime_data)); + iobuf_parser parser(std::move(buf)); + + auto header = util::mime_header::from(parser); + + auto content_id = header.content_id(convert_cid); + BOOST_REQUIRE(content_id.has_value()); + BOOST_REQUIRE_EQUAL(content_id.value(), 0); + + // Content-Type should be absent + auto content_type = header.get(boost::beast::http::field::content_type); + BOOST_REQUIRE(!content_type.has_value()); +} + +BOOST_AUTO_TEST_CASE(test_mime_header_parse_empty) { + using namespace cloud_storage_clients; + + const char* mime_data = "\r\n"; + + iobuf buf; + buf.append(mime_data, strlen(mime_data)); + iobuf_parser parser(std::move(buf)); + + auto header = util::mime_header::from(parser); + + // Both fields should be absent + auto content_type = header.get(boost::beast::http::field::content_type); + BOOST_REQUIRE(!content_type.has_value()); + + auto content_id = header.content_id(convert_cid); + BOOST_REQUIRE(!content_id.has_value()); +} + +BOOST_AUTO_TEST_CASE(test_mime_header_parse_extra_headers) { + using namespace cloud_storage_clients; + + const char* mime_data = "Content-Type: application/http\r\n" + "Content-ID: 5\r\n" + "X-Custom-Header: custom-value\r\n" + "\r\n"; + + iobuf buf; + buf.append(mime_data, strlen(mime_data)); + iobuf_parser parser(std::move(buf)); + + auto header = util::mime_header::from(parser); + + // Standard fields should still work + auto content_type = header.get(boost::beast::http::field::content_type); + BOOST_REQUIRE(content_type.has_value()); + BOOST_REQUIRE_EQUAL(content_type.value(), "application/http"); + + auto content_id = header.content_id(convert_cid); + BOOST_REQUIRE(content_id.has_value()); + BOOST_REQUIRE_EQUAL(content_id.value(), 5); +} + +// ============================================================================ +// multipart_response_parser tests +// ============================================================================ + +BOOST_AUTO_TEST_CASE(test_multipart_parser_single_part) { + using namespace cloud_storage_clients; + + const char* multipart_data = "--boundary\r\n" + "Content-Type: text/plain\r\n" + "\r\n" + "Hello World\r\n" + "--boundary--\r\n"; + + iobuf buf; + buf.append(multipart_data, strlen(multipart_data)); + + util::multipart_response_parser parser( + std::move(buf), ss::sstring("--boundary")); + + // Get first part + auto part1 = parser.get_part(); + BOOST_REQUIRE(part1.has_value()); + + // Verify part contains the expected data + iobuf_parser part_parser(std::move(part1.value())); + auto content = part_parser.read_string(part_parser.bytes_left()); + BOOST_REQUIRE( + content.find("Content-Type: text/plain") != ss::sstring::npos); + BOOST_REQUIRE(content.find("Hello World") != ss::sstring::npos); + + // Should be no more parts + auto part2 = parser.get_part(); + BOOST_REQUIRE(!part2.has_value()); +} + +BOOST_AUTO_TEST_CASE(test_multipart_parser_multiple_parts) { + using namespace cloud_storage_clients; + + const char* multipart_data = "--boundary\r\n" + "Content-ID: 0\r\n" + "\r\n" + "First part\r\n" + "--boundary\r\n" + "Content-ID: 1\r\n" + "\r\n" + "Second part\r\n" + "--boundary\r\n" + "Content-ID: 2\r\n" + "\r\n" + "Third part\r\n" + "--boundary--\r\n"; + + iobuf buf; + buf.append(multipart_data, strlen(multipart_data)); + + util::multipart_response_parser parser( + std::move(buf), ss::sstring("--boundary")); + + // Get all three parts + auto part1 = parser.get_part(); + BOOST_REQUIRE(part1.has_value()); + + auto part2 = parser.get_part(); + BOOST_REQUIRE(part2.has_value()); + + auto part3 = parser.get_part(); + BOOST_REQUIRE(part3.has_value()); + + // Verify content + iobuf_parser p1(std::move(part1.value())); + auto content1 = p1.read_string(p1.bytes_left()); + BOOST_REQUIRE(content1.find("First part") != ss::sstring::npos); + + iobuf_parser p2(std::move(part2.value())); + auto content2 = p2.read_string(p2.bytes_left()); + BOOST_REQUIRE(content2.find("Second part") != ss::sstring::npos); + + iobuf_parser p3(std::move(part3.value())); + auto content3 = p3.read_string(p3.bytes_left()); + BOOST_REQUIRE(content3.find("Third part") != ss::sstring::npos); + + // Should be no more parts + auto part4 = parser.get_part(); + BOOST_REQUIRE(!part4.has_value()); +} + +BOOST_AUTO_TEST_CASE(test_multipart_parser_empty_parts) { + using namespace cloud_storage_clients; + + const char* multipart_data = "--boundary\r\n" + "\r\n" + "--boundary\r\n" + "\r\n" + "--boundary--\r\n"; + + iobuf buf; + buf.append(multipart_data, strlen(multipart_data)); + + util::multipart_response_parser parser( + std::move(buf), ss::sstring("--boundary")); + + // Even with empty parts, parser should handle gracefully + auto part1 = parser.get_part(); + BOOST_REQUIRE(!part1.has_value()); + + auto part2 = parser.get_part(); + BOOST_REQUIRE(!part2.has_value()); + + BOOST_REQUIRE(!parser.get_part().has_value()); +} + +BOOST_AUTO_TEST_CASE(test_multipart_parser_no_end_boundary) { + using namespace cloud_storage_clients; + + const char* multipart_data = "--boundary\r\n" + "Content-ID: 0\r\n" + "\r\n" + "Data without end" + "--bound"; + + iobuf buf; + buf.append(multipart_data, strlen(multipart_data)); + + util::multipart_response_parser parser( + std::move(buf), ss::sstring("--boundary")); + + // Should handle missing/incomplete end boundary gracefully + auto part1 = parser.get_part(); + BOOST_REQUIRE(!part1.has_value()); +} + +BOOST_AUTO_TEST_CASE(test_multipart_parser_empty_buffer) { + using namespace cloud_storage_clients; + + iobuf buf; + util::multipart_response_parser parser( + std::move(buf), ss::sstring("--boundary")); + + // Empty buffer should return no parts + auto part = parser.get_part(); + BOOST_REQUIRE(!part.has_value()); +} + +// ============================================================================ +// multipart_subresponse tests +// ============================================================================ + +BOOST_AUTO_TEST_CASE(test_multipart_subresponse_parse_success) { + using namespace cloud_storage_clients; + + const char* http_response = "HTTP/1.1 202 Accepted\r\n" + "x-ms-request-id: abc-123\r\n" + "x-ms-version: 2023-01-03\r\n" + "Content-Length: 0\r\n" + "\r\n"; + + iobuf buf; + buf.append(http_response, strlen(http_response)); + iobuf_parser parser(std::move(buf)); + + auto subresponse = util::multipart_subresponse::from(parser); + + // Check status + BOOST_REQUIRE_EQUAL( + subresponse.result(), boost::beast::http::status::accepted); + BOOST_REQUIRE(subresponse.is_ok()); + + // Should have no error + auto error = subresponse.error("x-ms-error-code"); + BOOST_REQUIRE(!error.has_value()); +} + +BOOST_AUTO_TEST_CASE(test_multipart_subresponse_parse_ok) { + using namespace cloud_storage_clients; + + const char* http_response = "HTTP/1.1 200 OK\r\n" + "Content-Type: application/json\r\n" + "Content-Length: 0\r\n" + "\r\n"; + + iobuf buf; + buf.append(http_response, strlen(http_response)); + iobuf_parser parser(std::move(buf)); + + auto subresponse = util::multipart_subresponse::from(parser); + + BOOST_REQUIRE_EQUAL(subresponse.result(), boost::beast::http::status::ok); + BOOST_REQUIRE(subresponse.is_ok()); +} + +BOOST_AUTO_TEST_CASE(test_multipart_subresponse_parse_not_found) { + using namespace cloud_storage_clients; + + const char* http_response = "HTTP/1.1 404 Not Found\r\n" + "x-ms-error-code: BlobNotFound\r\n" + "Content-Length: 0\r\n" + "\r\n"; + + iobuf buf; + buf.append(http_response, strlen(http_response)); + iobuf_parser parser(std::move(buf)); + + auto subresponse = util::multipart_subresponse::from(parser); + + BOOST_REQUIRE_EQUAL( + subresponse.result(), boost::beast::http::status::not_found); + // 404 is considered "ok" for delete operations + BOOST_REQUIRE(subresponse.is_ok()); + + // Even though 404 is "ok", error() should return nullopt + auto error = subresponse.error("x-ms-error-code"); + BOOST_REQUIRE(!error.has_value()); +} + +BOOST_AUTO_TEST_CASE(test_multipart_subresponse_parse_error) { + using namespace cloud_storage_clients; + + const char* http_response = "HTTP/1.1 403 Forbidden\r\n" + "x-ms-error-code: AuthenticationFailed\r\n" + "Content-Length: 0\r\n" + "\r\n"; + + iobuf buf; + buf.append(http_response, strlen(http_response)); + iobuf_parser parser(std::move(buf)); + + auto subresponse = util::multipart_subresponse::from(parser); + + BOOST_REQUIRE_EQUAL( + subresponse.result(), boost::beast::http::status::forbidden); + BOOST_REQUIRE(!subresponse.is_ok()); + + // Should extract error message + auto error = subresponse.error("x-ms-error-code"); + BOOST_REQUIRE(error.has_value()); + BOOST_REQUIRE(error.value().find("403") != ss::sstring::npos); + BOOST_REQUIRE( + error.value().find("AuthenticationFailed") != ss::sstring::npos); +} + +BOOST_AUTO_TEST_CASE(test_multipart_subresponse_parse_error_no_code) { + using namespace cloud_storage_clients; + + const char* http_response = "HTTP/1.1 500 Internal Server Error\r\n" + "Content-Length: 0\r\n" + "\r\n"; + + iobuf buf; + buf.append(http_response, strlen(http_response)); + iobuf_parser parser(std::move(buf)); + + auto subresponse = util::multipart_subresponse::from(parser); + + BOOST_REQUIRE_EQUAL( + subresponse.result(), boost::beast::http::status::internal_server_error); + BOOST_REQUIRE(!subresponse.is_ok()); + + // Error should still be returned with "Unknown" reason + auto error = subresponse.error("x-ms-error-code"); + BOOST_REQUIRE(error.has_value()); + BOOST_REQUIRE(error.value().find("500") != ss::sstring::npos); + BOOST_REQUIRE(error.value().find("Unknown") != ss::sstring::npos); +} + +BOOST_AUTO_TEST_CASE(test_multipart_subresponse_parse_no_content) { + using namespace cloud_storage_clients; + + const char* http_response = "HTTP/1.1 204 No Content\r\n" + "\r\n"; + + iobuf buf; + buf.append(http_response, strlen(http_response)); + iobuf_parser parser(std::move(buf)); + + auto subresponse = util::multipart_subresponse::from(parser); + + BOOST_REQUIRE_EQUAL( + subresponse.result(), boost::beast::http::status::no_content); + BOOST_REQUIRE(subresponse.is_ok()); +} + +// ============================================================================ +// Integration tests - full multipart response parsing +// ============================================================================ + +BOOST_AUTO_TEST_CASE(test_full_multipart_parsing_success) { + using namespace cloud_storage_clients; + + // Simulate Azure Batch API response with multiple successful deletes + const char* batch_response = "--batch_boundary\r\n" + "Content-Type: application/http\r\n" + "Content-ID: 0\r\n" + "\r\n" + "HTTP/1.1 202 Accepted\r\n" + "x-ms-request-id: req-0\r\n" + "\r\n" + "--batch_boundary\r\n" + "Content-Type: application/http\r\n" + "Content-ID: 1\r\n" + "\r\n" + "HTTP/1.1 202 Accepted\r\n" + "x-ms-request-id: req-1\r\n" + "\r\n" + "--batch_boundary--\r\n"; + + iobuf buf; + buf.append(batch_response, strlen(batch_response)); + + util::multipart_response_parser parser( + std::move(buf), ss::sstring("--batch_boundary")); + + int successful_parts = 0; + + std::optional part; + while ((part = parser.get_part()).has_value()) { + iobuf_parser part_parser(std::move(part).value()); + + // Parse MIME headers + auto mime = util::mime_header::from(part_parser); + auto content_id = mime.content_id(convert_cid); + BOOST_REQUIRE(content_id.has_value()); + + // Parse HTTP response + auto subresponse = util::multipart_subresponse::from(part_parser); + BOOST_REQUIRE(subresponse.is_ok()); + BOOST_REQUIRE_EQUAL( + subresponse.result(), boost::beast::http::status::accepted); + + successful_parts++; + } + + BOOST_REQUIRE_EQUAL(successful_parts, 2); +} + +BOOST_AUTO_TEST_CASE(test_full_multipart_parsing_with_errors) { + using namespace cloud_storage_clients; + + // Simulate response with mixed success and errors + const char* batch_response = "--batch_boundary\r\n" + "Content-Type: application/http\r\n" + "Content-ID: 0\r\n" + "\r\n" + "HTTP/1.1 202 Accepted\r\n" + "x-ms-request-id: req-0\r\n" + "\r\n" + "--batch_boundary\r\n" + "Content-Type: application/http\r\n" + "Content-ID: 1\r\n" + "\r\n" + "HTTP/1.1 403 Forbidden\r\n" + "x-ms-error-code: InvalidCredentials\r\n" + "\r\n" + "--batch_boundary\r\n" + "Content-Type: application/http\r\n" + "Content-ID: 2\r\n" + "\r\n" + "HTTP/1.1 404 Not Found\r\n" + "x-ms-error-code: BlobNotFound\r\n" + "\r\n" + "--batch_boundary--\r\n"; + + iobuf buf; + buf.append(batch_response, strlen(batch_response)); + + util::multipart_response_parser parser( + std::move(buf), ss::sstring("--batch_boundary")); + + std::vector is_ok_results; + std::vector> content_ids; + + std::optional part; + while ((part = parser.get_part()).has_value()) { + iobuf_parser part_parser(std::move(part).value()); + + auto mime = util::mime_header::from(part_parser); + content_ids.push_back(mime.content_id(convert_cid)); + + auto subresponse = util::multipart_subresponse::from(part_parser); + is_ok_results.push_back(subresponse.is_ok()); + } + + BOOST_REQUIRE_EQUAL(is_ok_results.size(), 3); + BOOST_REQUIRE_EQUAL(content_ids.size(), 3); + + // First one should be successful + BOOST_REQUIRE_EQUAL(content_ids[0].value(), 0); + BOOST_REQUIRE(is_ok_results[0]); + + // Second one should be error + BOOST_REQUIRE_EQUAL(content_ids[1].value(), 1); + BOOST_REQUIRE(!is_ok_results[1]); + + // Third one (404) should be ok for deletes + BOOST_REQUIRE_EQUAL(content_ids[2].value(), 2); + BOOST_REQUIRE(is_ok_results[2]); +} diff --git a/src/v/cloud_storage_clients/util.cc b/src/v/cloud_storage_clients/util.cc index 1bcebf1d028c7..6db0dce73e44e 100644 --- a/src/v/cloud_storage_clients/util.cc +++ b/src/v/cloud_storage_clients/util.cc @@ -12,8 +12,10 @@ #include "base/vlog.h" #include "bytes/streambuf.h" +#include "container/chunked_vector.h" #include "http/utils.h" #include "net/connection.h" +#include "strings/string_switch.h" #include "utils/retry_chain_node.h" #include @@ -247,4 +249,193 @@ get_response_content_type(const http::client::response_header& headers) { return response_content_type::unknown; } +mime_header mime_header::from(iobuf_parser& in) { + mime_header result; + chunked_vector nl_stack; + nl_stack.reserve(4); + chunked_vector raw_headers; + static constexpr size_t max_buf = 1024; + std::string buf; + while (in.bytes_left()) { + auto c = in.consume_type(); + if (c == '\r' && (nl_stack.empty() || nl_stack.back() == '\n')) { + nl_stack.push_back(c); + } else if (c == '\n' && !nl_stack.empty() && nl_stack.back() == '\r') { + nl_stack.push_back(c); + } else if (c == '\r' || c == '\n' || buf.size() >= max_buf) { + throw std::runtime_error("Failed to parse MIME header"); + } else { + nl_stack.clear(); + buf.push_back(c); + } + if (nl_stack.size() == 2) { + raw_headers.emplace_back(std::move(buf)); + buf = {}; + } else if (nl_stack.size() == 4) { + break; + } + } + for (const auto& hdr : raw_headers) { + try { + field f = string_switch{std::string_view{hdr}} + .starts_with("Content-Type:", field::content_type) + .starts_with("Content-ID:", field::content_id) + .starts_with("Content-Length:", field::content_length) + .starts_with( + "Content-Transfer-Encoding:", + field::content_transfer_encoding); + constexpr std::string_view sep = ": "; + if (auto sep_pos = hdr.find(sep); sep_pos != hdr.npos) { + // quietly ignore duplicate fields + std::ignore = result._fields.try_emplace( + f, hdr.substr(sep_pos + sep.size())); + } + } catch (const std::runtime_error&) { + // ignore anything we don't explicitly match for + continue; + } + } + return result; +} + +std::optional mime_header::get(field f) const { + if (auto it = _fields.find(f); it != _fields.end()) { + return std::make_optional(it->second); + } + return std::nullopt; +} + +multipart_response_parser::multipart_response_parser(iobuf b, ss::sstring delim) + : _buffer(std::move(b)) + , _parser(std::cref(_buffer)) + , _delim(std::move(delim)) {} + +std::optional multipart_response_parser::get_part() { + advance_to_first_boundary(); + if (!_found_first || _done) { + return std::nullopt; + } + size_t delim_idx = 0; + std::optional part_start{}; + size_t part_len = 0; + while (delim_idx < _delim.size() && _parser.bytes_left()) { + auto c = _parser.consume_type(); + if (c == _delim[delim_idx]) { + ++delim_idx; + continue; + } + // unlikely, but we may have skipped some bytes that appeared to be + // part of a delimiter. + part_len += delim_idx + 1; + delim_idx = 0; + // eat up any leading CRLFs so the resulting part starts on text + if (!part_start.has_value()) [[unlikely]] { + if (c == '\r' || c == '\n') { + part_len = 0; + continue; + } + part_start = _parser.bytes_consumed() - 1; + part_len = 1; + } + } + // we ran out of bytes before reaching the end delimiter + if (delim_idx < _delim.size() || _parser.bytes_left() < 2) { + _done = true; + return std::nullopt; + } + // check for end delimiter + auto maybe_end = _parser.peek_bytes(2); + if (maybe_end[0] == '-' && maybe_end[1] == '-') { + _done = true; + } + if (part_len == 0 || !part_start.has_value()) { + return std::nullopt; + } + return std::make_optional( + _buffer.share(part_start.value(), part_len)); +} + +void multipart_response_parser::advance_to_first_boundary() { + size_t delim_idx = 0; + while (!_found_first && _parser.bytes_left()) { + auto c = _parser.consume_type(); + if (c == _delim[delim_idx]) { + ++delim_idx; + } + _found_first = delim_idx == _delim.size(); + } +} + +auto multipart_subresponse::result() const -> status { + vassert(!_ec, "Parser had errored: {}", _ec.message()); + vassert(_response.has_value(), "Response missing"); + vassert(_header_done, "Header not done"); + return _response.value().result(); +} + +bool multipart_subresponse::is_ok() const { + auto st = result(); + return st == status::ok || st == status::accepted + || st == status::no_content || st == status::not_found; +} + +std::optional +multipart_subresponse::error(std::string_view error_code_name) const { + if (is_ok()) { + return std::nullopt; + } + auto st = result(); + auto it = _response.value().find(error_code_name); + std::string_view reason = "Unknown"; + if (it != _response.value().end()) { + reason = it->value(); + } + return ssx::sformat( + "HTTP {} {} - {}", static_cast(st), st, reason); +} + +std::optional +multipart_subresponse::error(const std::function& parser) { + if (is_ok()) { + return std::nullopt; + } + auto st = result(); + auto reason = parser(_body.share()); + return ssx::sformat( + "HTTP {} {} - {}", static_cast(st), st, reason); +} + +multipart_subresponse multipart_subresponse::from(iobuf_parser& in) { + multipart_subresponse result{}; + auto buf = in.share(in.bytes_left()); + parser_t parser; + parser.eager(true); + parser.get().body().set_temporary_source(buf); + auto bufseq = iobuf_to_constbufseq(buf); + result._noctets = parser.put(bufseq, result._ec); + if (result._ec) { + throw std::runtime_error( + ssx::sformat( + "faled to parse multipart reponse part: {}, remaining bytes: " + "{}, n octets parsed: {}", + result._ec.message(), + buf.size_bytes(), + result._noctets)); + } + result._header_done = parser.is_header_done(); + result._body = parser.get().body().consume(); + result._response.emplace(parser.release()); + return result; +} + +std::vector +multipart_subresponse::iobuf_to_constbufseq(const iobuf& buf) { + std::vector seq; + for (const auto& fragm : buf) { + boost::asio::const_buffer cbuf{fragm.get(), fragm.size()}; + seq.push_back(cbuf); + } + return seq; +}; + } // namespace cloud_storage_clients::util diff --git a/src/v/cloud_storage_clients/util.h b/src/v/cloud_storage_clients/util.h index e7d6a5b79f8bf..15f9f5d787398 100644 --- a/src/v/cloud_storage_clients/util.h +++ b/src/v/cloud_storage_clients/util.h @@ -11,6 +11,7 @@ #pragma once #include "bytes/iobuf.h" +#include "bytes/iobuf_parser.h" #include "cloud_storage_clients/types.h" #include "http/client.h" @@ -51,4 +52,63 @@ void url_encode_target(http::client::request_header& header); response_content_type get_response_content_type(const http::client::response_header& headers); +struct mime_header { + static constexpr std::string_view content_type_field = "Content-Type"; + static constexpr std::string_view content_id_field = "Content-ID"; + +public: + using field = boost::beast::http::field; + using field_map_t = std::unordered_map; + + template + std::optional content_id( + const std::function(std::string_view)> f) const { + return get(field::content_id).and_then(f); + } + std::optional get(field f) const; + field_map_t::const_iterator find(field f) const { return _fields.find(f); } + field_map_t::const_iterator end() const { return _fields.end(); } + + // TODO: should return optional or someting? + static mime_header from(iobuf_parser& in); + +private: + field_map_t _fields; +}; + +struct multipart_response_parser { + explicit multipart_response_parser(iobuf b, ss::sstring delim); + std::optional get_part(); + +private: + void advance_to_first_boundary(); + iobuf _buffer; + iobuf_const_parser _parser; + ss::sstring _delim; + bool _found_first{false}; + bool _done{false}; +}; + +struct multipart_subresponse { + using status = boost::beast::http::status; + status result() const; + bool is_ok() const; + std::optional error(std::string_view error_code_name) const; + // extract error message from the response body. non-const because it + // requires a share from the cached response body + std::optional error(const std::function&); + static multipart_subresponse from(iobuf_parser& in); + +private: + static std::vector + iobuf_to_constbufseq(const iobuf& buf); + using parser_t = boost::beast::http::response_parser; + using response_t = parser_t::value_type; + std::optional _response{}; + size_t _noctets{0}; + boost::beast::error_code _ec{}; + bool _header_done{false}; + iobuf _body; +}; + } // namespace cloud_storage_clients::util diff --git a/src/v/http/tests/utils.cc b/src/v/http/tests/utils.cc index ea86b05f5b4f5..e46f280dbc295 100644 --- a/src/v/http/tests/utils.cc +++ b/src/v/http/tests/utils.cc @@ -23,7 +23,9 @@ namespace http { namespace test_utils { flexible_function_handler::flexible_function_handler( - const flexible_handle_function& f_handle, ss::sstring content_type) + const flexible_handle_function& f_handle, + ss::sstring content_type, + std::set content_type_overrides) : _f_handle([this, f_handle]( std::unique_ptr req, std::unique_ptr rep) { @@ -32,7 +34,8 @@ flexible_function_handler::flexible_function_handler( return ss::make_ready_future>( std::move(rep)); }) - , _content_type(std::move(content_type)) {} + , _content_type(std::move(content_type)) + , _content_type_overrides(std::move(content_type_overrides)) {} ss::future> flexible_function_handler::handle( [[maybe_unused]] const ss::sstring& path, @@ -40,7 +43,10 @@ ss::future> flexible_function_handler::handle( std::unique_ptr rep) { return _f_handle(std::move(req), std::move(rep)) .then([this](std::unique_ptr rep) { - if (_content_type == "xml") { + if (_content_type_overrides.contains( + rep->get_header("Content-Type"))) { + rep->done(); + } else if (_content_type == "xml") { // Because `application/xml` is not implemented as a mapping // in `http/mime_types.cc`, in order to construct a reply with // the `Content-Type` header set to `application/xml`, we diff --git a/src/v/http/tests/utils.h b/src/v/http/tests/utils.h index 623f2cc3e2b28..557eb532d2552 100644 --- a/src/v/http/tests/utils.h +++ b/src/v/http/tests/utils.h @@ -37,7 +37,8 @@ class flexible_function_handler : public ss::httpd::handler_base { public: flexible_function_handler( const flexible_handle_function& f_handle, - ss::sstring content_type = "txt"); + ss::sstring content_type = "txt", + std::set content_type_overrides = {}); ss::future> handle( [[maybe_unused]] const ss::sstring& path, @@ -47,6 +48,7 @@ class flexible_function_handler : public ss::httpd::handler_base { private: ss::httpd::future_handler_function _f_handle; ss::sstring _content_type; + std::set _content_type_overrides; }; } // namespace test_utils diff --git a/src/v/strings/string_switch.h b/src/v/strings/string_switch.h index 2b81c023e6bce..cd45cbecc1195 100644 --- a/src/v/strings/string_switch.h +++ b/src/v/strings/string_switch.h @@ -78,6 +78,20 @@ class string_switch { return *this; } + constexpr string_switch& starts_with(std::string_view S, T value) { + if (!result && view.starts_with(S)) { + result = std::move(value); + } + return *this; + } + + constexpr string_switch& ends_with(std::string_view S, T value) { + if (!result && view.ends_with(S)) { + result = std::move(value); + } + return *this; + } + constexpr string_switch& match_all(std::string_view S0, std::string_view S1, T value) { return match(S0, value).match(S1, value);