diff --git a/src/v/cloud_io/remote.cc b/src/v/cloud_io/remote.cc index 781a4344cf3f0..bb1840f4a26e0 100644 --- a/src/v/cloud_io/remote.cc +++ b/src/v/cloud_io/remote.cc @@ -170,7 +170,7 @@ int remote::delete_objects_max_keys() const { // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch return 256; case model::cloud_storage_backend::google_s3_compat: - [[fallthrough]]; + return 100; case model::cloud_storage_backend::unknown: return 1; } diff --git a/src/v/cloud_io/tests/s3_imposter.cc b/src/v/cloud_io/tests/s3_imposter.cc index d3036d08636b6..2c8ad53f0390f 100644 --- a/src/v/cloud_io/tests/s3_imposter.cc +++ b/src/v/cloud_io/tests/s3_imposter.cc @@ -369,6 +369,68 @@ struct s3_imposter_fixture::content_handler { } return R"xml()xml"; + } else if ( + request._method == "POST" && request._url.contains("batch")) { + vlog(fixt_log.trace, "Received batch request to {}", request._url); + if ( + expect_iter != expectations.end() + && expect_iter->second.body.has_value()) { + return expect_iter->second.body.value(); + } + auto keys_to_delete = keys_from_batch_delete_request(ri); + vlog( + fixt_log.trace, + "Parsed batched DELETE request with {} keys", + keys_to_delete.size()); + + constexpr std::string_view boundary = "response_boundary"; + + ss::sstring response; + for (size_t i = 0; i < keys_to_delete.size(); ++i) { + const auto& [path, key] = keys_to_delete[i]; + // ss::sstring to_delete = fmt::format("/{}", key().string()); + auto expect_iter = expectations.find(path); + bool obj_present = expect_iter != expectations.end() + && expect_iter->second.body.has_value(); + + if (!obj_present) { + // Missing objects are assumed to be not an error (e.g. + // caused by a delete retry). + vlog( + fixt_log.debug, + "Requested DELETE request of {}, not found", + path); + } else { + vlog(fixt_log.trace, "Batched DELETE request of {}", path); + expect_iter->second.body = std::nullopt; + } + + std::string_view code = [&key, obj_present]() { + if (key == "failme") { + return "500 Internal Server Error"; + } + return obj_present ? "204 No Content" : "404 Not Found"; + }(); + + response += fmt::format( + "--{}\r\n" + "Content-Type: application/http\r\n" + "Content-ID: response-{}\r\n\r\n" + "HTTP/1.1 {}\r\n" + "X-GUploader-UploadID: test-upload-id-{}\r\n\r\n", + boundary, + i, + code, + i); + } + + response += fmt::format("--{}--\r\n", boundary); + + repl.set_status(reply::status_type::ok); + repl.set_content_type( + fmt::format("multipart/mixed; boundary={}", boundary)); + + return response; } else { vunreachable("Unhandled request method {}", request._method); } @@ -434,18 +496,25 @@ s3_imposter_fixture::get_targets() const { void s3_imposter_fixture::set_expectations_and_listen( std::vector expectations, - std::optional> headers_to_store) { + std::optional> headers_to_store, + std::set content_type_overrides) { const ss::sstring url_prefix = "/" + url_base(); for (auto& expectation : expectations) { expectation.url.insert( expectation.url.begin(), url_prefix.begin(), url_prefix.end()); } _server - ->set_routes( - [this, &expectations, headers_to_store = std::move(headers_to_store)]( - ss::httpd::routes& r) mutable { - set_routes(r, expectations, std::move(headers_to_store)); - }) + ->set_routes([this, + &expectations, + headers_to_store = std::move(headers_to_store), + ct_overrides = std::move(content_type_overrides)]( + ss::httpd::routes& r) mutable { + set_routes( + r, + expectations, + std::move(headers_to_store), + std::move(ct_overrides)); + }) .get(); _server->listen(_server_addr).get(); } @@ -491,7 +560,8 @@ ss::sstring s3_imposter_fixture::url_base() const { void s3_imposter_fixture::set_routes( ss::httpd::routes& r, const std::vector& expectations, - std::optional> headers_to_store) { + std::optional> headers_to_store, + std::set content_type_overrides) { using namespace ss::httpd; using reply = ss::http::reply; _content_handler = ss::make_shared( @@ -500,7 +570,8 @@ void s3_imposter_fixture::set_routes( [this](const_req req, reply& repl, [[maybe_unused]] ss::sstring& type) { return _content_handler->handle(req, repl); }, - "xml"); + "xml", + std::move(content_type_overrides)); r.add_default_handler(_handler.get()); } @@ -569,3 +640,42 @@ keys_from_delete_objects_request(const http_test_utils::request_info& req) { return keys; } + +std::vector> +keys_from_batch_delete_request(const http_test_utils::request_info& req) { + std::vector> keys; + auto buffer_stream = std::istringstream{std::string{req.content}}; + + // crudely iterate over request lines, stripping out object keys + constexpr std::string_view method{"DELETE "}; + + std::string line; + while (std::getline(buffer_stream, line)) { + auto pos = line.find(method); + if (pos != 0) { + continue; + } + + pos += method.size(); + + auto ver_pos = line.find('\r'); + if (ver_pos == line.npos) { + continue; + } + + auto path = std::string_view{line}.substr(pos, ver_pos - pos); + auto last_slash_pos = path.find_last_of('/'); + if (last_slash_pos == path.npos) { + continue; + } + + auto key_pos = last_slash_pos + 1; + if (key_pos >= path.size()) { + continue; + } + + auto key = path.substr(key_pos); + keys.emplace_back(path, cloud_storage_clients::object_key{key}); + } + return keys; +} diff --git a/src/v/cloud_io/tests/s3_imposter.h b/src/v/cloud_io/tests/s3_imposter.h index a21a8afb002b7..2c30330f93d0b 100644 --- a/src/v/cloud_io/tests/s3_imposter.h +++ b/src/v/cloud_io/tests/s3_imposter.h @@ -83,7 +83,8 @@ class s3_imposter_fixture { void set_expectations_and_listen( std::vector expectations, std::optional> headers_to_store - = std::nullopt); + = std::nullopt, + std::set content_type_overrides = {}); /// Update expectations for the REST API. void add_expectations(std::vector expectations); @@ -128,7 +129,8 @@ class s3_imposter_fixture { ss::httpd::routes& r, const std::vector& expectations, std::optional> headers_to_store - = std::nullopt); + = std::nullopt, + std::set content_type_overrides = {}); ss::socket_address _server_addr; ss::shared_ptr _server; @@ -163,3 +165,6 @@ cloud_storage_clients::http_byte_range parse_byte_header(std::string_view s); std::vector keys_from_delete_objects_request(const http_test_utils::request_info&); + +std::vector> +keys_from_batch_delete_request(const http_test_utils::request_info&); diff --git a/src/v/cloud_storage/tests/remote_test.cc b/src/v/cloud_storage/tests/remote_test.cc index 653b794ec8142..41b22d4c4ddf0 100644 --- a/src/v/cloud_storage/tests/remote_test.cc +++ b/src/v/cloud_storage/tests/remote_test.cc @@ -917,7 +917,12 @@ TEST_P(all_types_remote_fixture, test_delete_objects_failure_handling) { } TEST_P(all_types_gcs_remote_fixture, test_delete_objects_on_unknown_backend) { - set_expectations_and_listen({}); + set_expectations_and_listen( + {} /* expectations */, + std::nullopt /* headers_to_store */, + {"multipart/mixed; boundary=response_boundary"} + /* content_type_overrides */ + ); retry_chain_node fib(never_abort, 60s, 20ms); @@ -945,24 +950,28 @@ TEST_P(all_types_gcs_remote_fixture, test_delete_objects_on_unknown_backend) { = remote.local().delete_objects(bucket_name, to_delete, fib).get(); ASSERT_EQ(cloud_storage::upload_result::success, result); - ASSERT_EQ(get_requests().size(), 4); - auto first_delete = get_requests()[2]; + ASSERT_EQ(get_requests().size(), 3); + auto batch_delete = get_requests()[2]; - std::unordered_set expected_urls{ - "/" + url_base() + "p", "/" + url_base() + "q"}; - ASSERT_EQ(first_delete.method, "DELETE"); - ASSERT_TRUE(expected_urls.contains(first_delete.url)); + // TODO(oren): if the url format was the issue here, we should convert them + // somehow in the s3 imposter backend so we can still simulate the lookup + // behavior. otoh, maybe that's just tetsing the fixture + auto json_api_url = [this](std::string_view key) { + return ssx::sformat("/storage/v1/b/{}/o/{}", bucket_name, key); + }; - expected_urls.erase(first_delete.url); - auto second_delete = get_requests()[3]; - ASSERT_EQ(second_delete.method, "DELETE"); - ASSERT_TRUE(expected_urls.contains(second_delete.url)); + std::vector expected_urls{ + json_api_url("p"), json_api_url("q")}; + ASSERT_EQ(batch_delete.method, "POST"); + ASSERT_TRUE(batch_delete.content.contains(expected_urls[0])); + ASSERT_TRUE(batch_delete.content.contains(expected_urls[1])); } TEST_P( all_types_gcs_remote_fixture, test_delete_objects_on_unknown_backend_result_reduction) { - set_expectations_and_listen({}); + set_expectations_and_listen( + {}, std::nullopt, {"multipart/mixed; boundary=response_boundary"}); retry_chain_node fib(never_abort, 5s, 20ms); @@ -981,17 +990,17 @@ TEST_P( // will time out cloud_storage_clients::object_key{"failme"}}; + // key 'failme' will produce a 500 error on the corresponding subrequest in + // s3_imposter. by design, this should fail the entire 'delete_objects' + // operations, though key 'p' may have been deleted in the process auto result = remote.local().delete_objects(bucket_name, to_delete, fib).get(); - if (conf.url_style == cloud_storage_clients::s3_url_style::virtual_host) { - // Due to virtual-host style addressing, this will timeout as DNS tries - // to resolve the request with the provided bucket name. - ASSERT_EQ(cloud_storage::upload_result::timedout, result); - } else { - // But, if we have path style addressing, the object won't be found, a - // warning will be issued, and the request will return success instead. - ASSERT_EQ(cloud_storage::upload_result::success, result); - } + ASSERT_EQ(cloud_storage::upload_result::failed, result); + + // drop the poison object key + to_delete.pop_back(); + result = remote.local().delete_objects(bucket_name, to_delete, fib).get(); + ASSERT_EQ(cloud_storage::upload_result::success, result); } TEST_P(all_types_remote_fixture, test_filter_by_source) { // NOLINT @@ -1038,8 +1047,8 @@ TEST_P(all_types_remote_fixture, test_filter_by_source) { // NOLINT ASSERT_TRUE( subscription.get().type == api_activity_type::manifest_download); - // Remove the rtc node from the filter and re-subscribe. This time we should - // receive the notification. + // Remove the rtc node from the filter and re-subscribe. This time we + // should receive the notification. flt.remove_source_to_ignore(&root_rtc); subscription = remote.local().subscribe(flt); res = remote.local() @@ -1099,8 +1108,8 @@ TEST_P(all_types_remote_fixture, test_filter_lifetime_1) { // NOLINT bucket_name, json_manifest_format_path, actual, child_rtc) .get(); flt.reset(); - // Notification should be received despite the fact that the filter object - // is destroyed. + // Notification should be received despite the fact that the filter + // object is destroyed. ASSERT_TRUE(res == download_result::success); ASSERT_TRUE(subscription.available()); ASSERT_TRUE( diff --git a/src/v/cloud_storage_clients/BUILD b/src/v/cloud_storage_clients/BUILD index 7beae6c549f42..2e2c04e989873 100644 --- a/src/v/cloud_storage_clients/BUILD +++ b/src/v/cloud_storage_clients/BUILD @@ -72,6 +72,7 @@ redpanda_cc_library( "@boost//:beast", "@boost//:lexical_cast", "@boost//:property_tree", + "@rapidjson", "@seastar", ], ) diff --git a/src/v/cloud_storage_clients/s3_client.cc b/src/v/cloud_storage_clients/s3_client.cc index 855eb97bc2851..d47e756c704d3 100644 --- a/src/v/cloud_storage_clients/s3_client.cc +++ b/src/v/cloud_storage_clients/s3_client.cc @@ -25,9 +25,12 @@ #include "config/configuration.h" #include "config/node_config.h" #include "config/types.h" +#include "container/chunked_hash_map.h" #include "hashing/secure.h" #include "http/client.h" #include "http/utils.h" +#include "json/istreamwrapper.h" +#include "json/reader.h" #include "utils/base64.h" #include @@ -47,6 +50,7 @@ #include #include +#include #include #include #include @@ -384,6 +388,131 @@ request_creator::make_delete_objects_request( return {std::move(header), make_iobuf_input_stream(std::move(body))}; } +result>> +request_creator::make_gcs_batch_delete_request( + const plain_bucket_name& name, const chunked_vector& keys) { + // Google Cloud Storage Batch API + // https://cloud.google.com/storage/docs/batch + // + // POST /batch/storage/v1 HTTP/1.1 + // Host: storage.googleapis.com + // Content-Type: multipart/mixed; boundary= + // Authorization: Bearer # added by 'add_auth' + // Content-Length: <...> + // + // Body structure: + // -- + // Content-Type: application/http + // Content-ID: + // + // DELETE /storage/v1/b//o/ HTTP/1.1 + // + // -- + // ... (repeat for each object) + // ---- + // + // Note: GCS batch API requires path-only URLs in subrequests + // Max 100 requests per batch, max 10MB payload + + // Generate unique boundary + auto boundary = fmt::format("batch_{}", uuid_t::create()); + + // Build the multipart body + iobuf body; + iobuf_ostreambuf obuf(body); + std::ostream out(&obuf); + + auto encoded_bucket = http::uri_encode(name(), http::uri_encode_slash::yes); + + for (size_t i = 0; i < keys.size(); ++i) { + const auto& key = keys[i]; + + auto encoded_key = http::uri_encode( + key().string(), http::uri_encode_slash::yes); + + // Boundary line + fmt::print(out, "--{}\r\n", boundary); + + http::client::request_header part_header{}; + part_header.insert( + boost::beast::http::field::content_type, "application/http"); + part_header.insert( + boost::beast::http::field::content_transfer_encoding, "binary"); + part_header.insert( + boost::beast::http::field::content_id, fmt::to_string(i)); + + http::client::request_header subrequest_header{}; + subrequest_header.method(boost::beast::http::verb::delete_); + subrequest_header.target( + fmt::format("/storage/v1/b/{}/o/{}", encoded_bucket, encoded_key)); + + // NOTE: Per docs.cloud.google.com/storage/docs/batch#http: + // if you provide an [Auth] header for a specific nested request, then + // that header applies only to the request that specified it. If you + // provide an [Auth] header for the outer request, then that header + // applies to all of the nested requests unless they override it with an + // [Auth] header of their own. + + // part header + + for (const auto& f : part_header) { + fmt::print(out, "{}: {}\r\n", f.name_string(), f.value()); + } + fmt::print(out, "\r\n"); + + // subrequest header + + fmt::print( + out, + "{} {}\r\n", + subrequest_header.method_string(), + subrequest_header.target()); + + for (const auto& f : subrequest_header) { + fmt::print(out, "{}: {}\r\n", f.name_string(), f.value()); + } + fmt::print(out, "\r\n"); + } + + // Final boundary + fmt::print(out, "--{}--\r\n", boundary); + + if (!out.good()) { + throw std::runtime_error(fmt_with_ctx( + fmt::format, + "failed to create GCS batch delete request, state: {}", + out.rdstate())); + } + + vlog( + s3_log.trace, + "RAW BATCH DELETE REQUEST:\n{}", + body.linearize_to_string()); + + // Create the main request header + http::client::request_header header{}; + header.method(boost::beast::http::verb::post); + // GCS batch endpoint uses a fixed path, not bucket-specific + header.target("/batch/storage/v1"); + header.insert(boost::beast::http::field::host, "storage.googleapis.com"); + header.insert( + boost::beast::http::field::content_type, + fmt::format("multipart/mixed; boundary={}", boundary)); + header.insert( + boost::beast::http::field::content_length, + fmt::format("{}", body.size_bytes())); + + auto ec = _apply_credentials->add_auth(header); + if (ec) { + return ec; + } + + // Convert iobuf to input_stream + auto stream = make_iobuf_input_stream(std::move(body)); + + return {std::move(header), std::move(stream)}; +} + std::string request_creator::make_host(const plain_bucket_name& name) const { switch (_ap_style) { case s3_url_style::virtual_host: @@ -506,6 +635,125 @@ ss::future parse_rest_error_response( "")); } +namespace { +struct gcs_error_handler + : public rapidjson::BaseReaderHandler, gcs_error_handler> { + enum class state : uint8_t { init, in_error, in_message, found }; + state st = state::init; + std::optional message; + bool Key(const char* str, rapidjson::SizeType len, bool) { + std::string_view k{str, len}; + if (st == state::init && k == "error") { + st = state::in_error; + } else if (st == state::in_error && k == "message") { + st = state::in_message; + } + return true; + } + + bool String(const char* str, rapidjson::SizeType len, bool) { + if (st == state::in_message) { + message.emplace(str, len); + st = state::found; + } + return true; + } + + bool EndObject(rapidjson::SizeType) { + if (st == state::in_error) { + st = state::init; + } + return true; + } +}; + +ss::sstring parse_gcs_error_reason(iobuf body) { + iobuf_istreambuf ibuf(body); + std::istream stream(&ibuf); + json::IStreamWrapper wrapper(stream); + json::Reader reader; + gcs_error_handler handler; + auto res = reader.Parse(wrapper, handler); + if (res && handler.message.has_value()) { + return std::move(handler.message).value(); + } else { + return "Unknown"; + } +}; +} // namespace + +/// Parse GCS batch delete response using multipart parsing utilities +/// GCS batch responses follow the multipart/mixed format similar to ABS +static cloud_storage_clients::client::delete_objects_result +parse_gcs_batch_delete_response( + iobuf buf, + std::string_view boundary, + const chunked_vector& keys) { + cloud_storage_clients::client::delete_objects_result result; + + // Parse multipart response - split by boundary + auto boundary_delim = ssx::sformat("--{}", boundary); + util::multipart_response_parser parts{std::move(buf), boundary_delim}; + + constexpr auto convert_content_id = + [](std::string_view raw) -> std::optional { + vlog(s3_log.trace, "BATCH PART RAW CONTENT ID: {}", raw); + constexpr std::string_view pfx = "response-"; + std::optional result{}; + if (auto pos = raw.find(pfx); pos != raw.npos) { + raw = raw.substr(pos + pfx.size()); + size_t v{}; + auto res = std::from_chars(raw.data(), raw.data() + raw.size(), v); + if (res.ec == std::errc{}) { + result = v; + } + } + return result; + }; + + chunked_hash_set content_ids_seen; + std::optional part; + while ((part = parts.get_part()).has_value()) { + iobuf_parser part_parser{std::move(part).value()}; + auto mime = util::mime_header::from(part_parser); + auto maybe_content_id = mime.content_id(convert_content_id); + if (!maybe_content_id.has_value()) { + vlog( + s3_log.debug, + "MIME header missing 'Content-ID' from batch response, skipping " + "part"); + continue; + } + vlog( + s3_log.trace, "BATCH PART CONTENT_ID: {}", maybe_content_id.value()); + content_ids_seen.insert(maybe_content_id.value()); + // having stripped off the leading MIME headers, we should have a + // complete HTTP response at the front of the parser + auto subrequest = util::multipart_subresponse::from(part_parser); + if (auto maybe_error = subrequest.error(parse_gcs_error_reason); + maybe_error.has_value()) { + // Extract error message from response if available + // GCS error responses may contain error details in the body + result.undeleted_keys.push_back({ + .key = keys[maybe_content_id.value()], + .reason = std::move(maybe_error).value(), + }); + } + } + + // Check for any keys that were not in the response + for (auto id : std::views::iota(0ul, keys.size())) { + if (!content_ids_seen.contains(id)) { + result.undeleted_keys.push_back({ + .key = keys[id], + .reason = "Object missing from batch response", + }); + } + } + + return result; +} + /// Head response doesn't give us an XML encoded error object in /// the body. This method uses headers to generate an error object. template @@ -622,6 +870,51 @@ ss::future> s3_client::send_request( co_return outcome; } +// Build transport configuration for GCS batch API endpoint. +// The batch endpoint must be accessed at storage.googleapis.com directly, +// not through bucket-specific virtual host URLs. +std::optional +s3_client::make_gcs_batch_transport_conf( + const net::base_transport::configuration& base_conf, + const access_point_uri& uri, + const ss::abort_source* as, + ss::lowres_clock::duration max_idle) { + // Only create batch config for GCS backends + // Determine which batch delete implementation to use based on backend + // GCS does not support S3-style batch deletes, so use GCS batch API + auto backend = config::shard_local_cfg().cloud_storage_backend(); + auto inferred_backend = infer_backend_from_uri(uri); + + bool is_gcs = backend == model::cloud_storage_backend::google_s3_compat + || inferred_backend + == model::cloud_storage_backend::google_s3_compat; + if (!is_gcs) { + return std::nullopt; + } + + // TODO: better + bool is_test = base_conf.server_addr.host() == "localhost" + || base_conf.server_addr.host() == "127.0.0.1"; + + return s3_client::gcs_batch_client_conf{ + .transport_conf = net::base_transport::configuration{ + .server_addr = net::unresolved_address{ + is_test + ? base_conf.server_addr.host() + : "storage.googleapis.com", + base_conf.server_addr.port(), + base_conf.server_addr.family()}, + .credentials = base_conf.credentials, + .tls_sni_hostname = (base_conf.tls_sni_hostname.has_value() + ? std::make_optional("storage.googleapis.com") + : std::nullopt), + .wait_for_tls_server_eof = base_conf.wait_for_tls_server_eof, + }, + .max_idle_time = max_idle, + .as = as, + }; +} + s3_client::s3_client( ss::weak_ptr pool_ptr, const s3_configuration& conf, @@ -631,7 +924,9 @@ s3_client::s3_client( : client(std::move(pool_ptr)) , _requestor(conf, std::move(apply_credentials)) , _client(transport_conf, nullptr, probe) - , _probe(std::move(probe)) {} + , _probe(std::move(probe)) + , _gcs_batch_transport_conf(make_gcs_batch_transport_conf( + transport_conf, conf.uri, nullptr, conf.max_idle_time)) {} s3_client::s3_client( ss::weak_ptr pool_ptr, @@ -643,7 +938,9 @@ s3_client::s3_client( : client(std::move(pool_ptr)) , _requestor(conf, std::move(apply_credentials)) , _client(transport_conf, &as, probe, conf.max_idle_time) - , _probe(std::move(probe)) {} + , _probe(std::move(probe)) + , _gcs_batch_transport_conf(make_gcs_batch_transport_conf( + transport_conf, conf.uri, &as, conf.max_idle_time)) {} ss::future> s3_client::self_configure() { @@ -744,9 +1041,35 @@ s3_client::self_configure_test(const plain_bucket_name& bucket) { co_return list_objects_result; } -ss::future<> s3_client::stop() { return _client.stop(); } +ss::future<> s3_client::stop() { + co_await _client.stop(); + if (_gcs_batch_client.has_value()) { + co_await _gcs_batch_client->stop(); + } +} + +void s3_client::shutdown() { + _client.shutdown_now(); + if (_gcs_batch_client.has_value()) { + _gcs_batch_client->shutdown_now(); + } +} -void s3_client::shutdown() { _client.shutdown_now(); } +http::client& s3_client::get_gcs_batch_client() { + vassert( + _gcs_batch_transport_conf.has_value(), + "GCS batch client requested but transport config not available. " + "This should only be called for GCS backends."); + // _client(transport_conf, &as, probe, conf.max_idle_time); + if (!_gcs_batch_client.has_value()) { + _gcs_batch_client.emplace( + _gcs_batch_transport_conf.value().transport_conf, + _gcs_batch_transport_conf.value().as, + _probe, + _gcs_batch_transport_conf.value().max_idle_time); + } + return _gcs_batch_client.value(); +} ss::future> s3_client::get_object( @@ -1235,7 +1558,88 @@ auto s3_client::delete_objects( ss::lowres_clock::duration timeout) -> ss::future> { const object_key dummy{""}; - co_return co_await send_request( - do_delete_objects(bucket, keys, timeout), bucket, dummy); + + // Determine which batch delete implementation to use based on backend + // GCS does not support S3-style batch deletes, so use GCS batch API + auto backend = config::shard_local_cfg().cloud_storage_backend(); + auto inferred_backend = infer_backend_from_uri(_requestor._ap); + + bool is_gcs = backend == model::cloud_storage_backend::google_s3_compat + || inferred_backend + == model::cloud_storage_backend::google_s3_compat; + + if (is_gcs) { + co_return co_await send_request( + do_gcs_batch_delete_objects(bucket, keys, timeout), bucket, dummy); + } else { + co_return co_await send_request( + do_delete_objects(bucket, keys, timeout), bucket, dummy); + } +} + +auto s3_client::do_gcs_batch_delete_objects( + const plain_bucket_name& bucket, + const chunked_vector& keys, + ss::lowres_clock::duration timeout) + -> ss::future { + auto request = _requestor.make_gcs_batch_delete_request(bucket, keys); + if (!request) { + co_return ss::coroutine::exception( + std::make_exception_ptr(std::system_error(request.error()))); + } + auto& [header, body] = request.value(); + + vlog(s3_log.trace, "send GCS batch delete request:\n{}", header); + + std::exception_ptr ex; + std::optional result; + try { + auto response_stream = co_await _client.request( + std::move(header), body, timeout); + + co_await response_stream->prefetch_headers(); + vassert(response_stream->is_header_done(), "Header is not received"); + + const auto status = response_stream->get_headers().result(); + // GCS batch API returns 200 OK for successful batch requests + // Individual subrequest failures are encoded in the multipart response + if (status != boost::beast::http::status::ok) { + const auto content_type = util::get_response_content_type( + response_stream->get_headers()); + auto buf = co_await http::drain(std::move(response_stream)); + co_await body.close(); + co_return co_await parse_rest_error_response( + content_type, status, std::move(buf)); + } + + // Extract boundary from Content-Type header + const auto& headers = response_stream->get_headers(); + auto boundary = util::find_multipart_boundary(headers); + auto response_buf = co_await http::drain(std::move(response_stream)); + auto cl_it = headers.find(boost::beast::http::field::content_length); + vlog( + s3_log.trace, + "RAW BATCH DELETE RESPONSE content-length: {}:\n{}", + cl_it == headers.end() ? "Unknown" : cl_it->value(), + response_buf.linearize_to_string()); + if (!boundary.has_value()) { + throw std::runtime_error(boundary.error()); + } + vlog( + s3_log.trace, "BATCH DELETE RESPONSE BOUNDARY: {}", boundary.value()); + result = parse_gcs_batch_delete_response( + std::move(response_buf), boundary.value(), keys); + } catch (...) { + ex = std::current_exception(); + } + + co_await body.close(); + + if (ex) { + std::rethrow_exception(ex); + } + vassert(result.has_value(), "RESULT MISSING VALUE"); + co_return std::move(result).value(); } + } // namespace cloud_storage_clients diff --git a/src/v/cloud_storage_clients/s3_client.h b/src/v/cloud_storage_clients/s3_client.h index e1eb86e715f34..8edc77b7af23d 100644 --- a/src/v/cloud_storage_clients/s3_client.h +++ b/src/v/cloud_storage_clients/s3_client.h @@ -85,6 +85,17 @@ class request_creator { make_delete_objects_request( const plain_bucket_name& name, const chunked_vector& keys); + /// \brief Create a GCS batch delete request header and body + /// Uses the GCS batch API endpoint with multipart/mixed format + /// https://cloud.google.com/storage/docs/batch + /// + /// \param name of the bucket + /// \param keys to delete + /// \return the header and an the body as an input_stream + result>> + make_gcs_batch_delete_request( + const plain_bucket_name& name, const chunked_vector& keys); + /// \brief Initialize http header for 'ListObjectsV2' request /// /// \param name of the bucket @@ -244,6 +255,11 @@ class s3_client : public client { const chunked_vector& keys, ss::lowres_clock::duration timeout); + ss::future do_gcs_batch_delete_objects( + const plain_bucket_name& bucket, + const chunked_vector& keys, + ss::lowres_clock::duration timeout); + template ss::future> send_request( ss::future request_future, @@ -256,10 +272,38 @@ class s3_client : public client { // otherwise. ss::future self_configure_test(const plain_bucket_name& bucket); + // Returns a reference to the GCS batch client, lazily initializing it if + // needed. The batch client connects directly to storage.googleapis.com + // rather than the bucket-specific virtual host endpoint. + http::client& get_gcs_batch_client(); + private: request_creator _requestor; http::client _client; ss::shared_ptr _probe; + + // Dedicated HTTP client for GCS batch API operations. + // The GCS batch endpoint (storage.googleapis.com/batch/storage/v1) must be + // accessed directly, not through bucket-specific virtual host URLs. + // Lazily initialized on first use. + std::optional _gcs_batch_client; + + struct gcs_batch_client_conf { + net::base_transport::configuration transport_conf{}; + ss::lowres_clock::duration max_idle_time{}; + const ss::abort_source* as{nullptr}; + }; + + static std::optional + make_gcs_batch_transport_conf( + const net::base_transport::configuration& base_conf, + const access_point_uri& uri, + const ss::abort_source* as, + ss::lowres_clock::duration max_idle); + + // Transport configuration for the GCS batch client, stored during + // construction for lazy initialization. + std::optional _gcs_batch_transport_conf; }; std::variant diff --git a/src/v/cloud_storage_clients/tests/BUILD b/src/v/cloud_storage_clients/tests/BUILD index 7483ce1c8c73b..2c1dbb73bd0b1 100644 --- a/src/v/cloud_storage_clients/tests/BUILD +++ b/src/v/cloud_storage_clients/tests/BUILD @@ -177,6 +177,24 @@ redpanda_cc_gtest( "//src/v/utils:unresolved_address", "@googletest//:gtest", "@seastar", + ], +) + +redpanda_cc_gtest( + name = "gcs_client_test", + timeout = "short", + srcs = [ + "gcs_client_test.cc", + ], + deps = [ + "//src/v/base", + "//src/v/cloud_storage_clients", + "//src/v/http/tests:utils", + "//src/v/net", + "//src/v/test_utils:gtest", + "//src/v/utils:unresolved_address", + "@googletest//:gtest", + "@seastar", "@seastar//:testing", ], ) diff --git a/src/v/cloud_storage_clients/tests/gcs_client_test.cc b/src/v/cloud_storage_clients/tests/gcs_client_test.cc new file mode 100644 index 0000000000000..da02270e29cbe --- /dev/null +++ b/src/v/cloud_storage_clients/tests/gcs_client_test.cc @@ -0,0 +1,478 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "base/seastarx.h" +#include "cloud_storage_clients/configuration.h" +#include "cloud_storage_clients/s3_client.h" +#include "gmock/gmock.h" +#include "http/tests/utils.h" +#include "net/dns.h" +#include "utils/unresolved_address.h" + +#include +#include +#include +#include +#include + +#include +#include + +using namespace std::chrono_literals; +using namespace cloud_storage_clients; + +static const uint16_t httpd_port_number = 14435; +static constexpr const char* httpd_host_name = "127.0.0.1"; + +namespace { + +/// Mock multipart/mixed response for GCS batch delete success +/// Returns a response with all deletes successful (204 No Content) +ss::sstring +make_batch_delete_success_response(std::string_view boundary, size_t num_keys) { + ss::sstring response; + for (size_t i = 0; i < num_keys; ++i) { + response += fmt::format( + "--{}\r\n" + "Content-Type: application/http\r\n" + "Content-ID: response-{}\r\n\r\n" + "HTTP/1.1 204 No Content\r\n" + "X-GUploader-UploadID: test-upload-id-{}\r\n\r\n\r\n", + boundary, + i, + i); + } + response += fmt::format("--{}--\r\n", boundary); + return response; +} + +/// Mock multipart/mixed response for GCS batch delete with errors +/// Returns a response with all deletes failed (403 Forbidden) +ss::sstring +make_batch_delete_error_response(std::string_view boundary, size_t num_keys) { + ss::sstring response; + for (size_t i = 0; i < num_keys; ++i) { + response += fmt::format( + "--{}\r\n" + "Content-Type: application/http\r\n" + "Content-ID: response-{}\r\n\r\n" + "HTTP/1.1 403 Forbidden\r\n" + "Content-Type: application/json; charset=UTF-8\r\n" + "Content-Length: 150\r\n\r\n" + R"json({{ + "error": {{ + "code": 403, + "message": "Access denied", + "errors": [{{ + "domain": "global", + "reason": "forbidden" + }}] + }} +}})json" + "\r\n\r\n", + boundary, + i); + } + response += fmt::format("--{}--\r\n", boundary); + return response; +} + +/// Mock multipart/mixed response for GCS batch delete with partial errors +/// First key succeeds, rest fail +ss::sstring make_batch_delete_partial_error_response( + std::string_view boundary, size_t num_keys) { + ss::sstring response; + // First key succeeds + response += fmt::format( + "--{}\r\n" + "Content-Type: application/http\r\n" + "Content-ID: response-0\r\n\r\n" + "HTTP/1.1 204 No Content\r\n" + "X-GUploader-UploadID: test-upload-id-0\r\n\r\n", + boundary); + + response += fmt::format( + "--{}\r\n" + "Content-Type: application/http\r\n" + "Content-ID: response-DEAD BEEF\r\n\r\n" + "HTTP/1.1 204 No Content\r\n" + "X-GUploader-UploadID: test-upload-id-1\r\n\r\n", + boundary); + + // Rest fail + for (size_t i = 2; i < num_keys; ++i) { + response += fmt::format( + "--{}\r\n" + "Content-Type: application/http\r\n" + "Content-ID: response-{}\r\n\r\n" + "HTTP/1.1 403 Forbidden\r\n" + "Content-Type: application/json; charset=UTF-8\r\n\r\n" + R"json({{ + "error": {{ + "code": 403, + "message": "Forbidden" + }} +}})json" + "\r\n\r\n", + boundary, + i); + } + response += fmt::format("--{}--\r\n", boundary); + return response; +} + +/// Mock multipart/mixed response for GCS batch delete with 404s (not found) +/// Returns 404 for all deletes (which should be treated as success) +ss::sstring make_batch_delete_not_found_response( + std::string_view boundary, size_t num_keys) { + ss::sstring response; + for (size_t i = 0; i < num_keys; ++i) { + response += fmt::format( + "--{}\r\n" + "Content-Type: application/http\r\n" + "Content-ID: response-{}\r\n\r\n" + "HTTP/1.1 404 Not Found\r\n" + "Content-Type: application/json; charset=UTF-8\r\n\r\n" + R"json({{ + "error": {{ + "code": 404, + "message": "Not Found" + }} +}})json" + "\r\n\r\n", + boundary, + i); + } + response += fmt::format("--{}--\r\n", boundary); + return response; +} + +void set_routes(ss::httpd::routes& r) { + using namespace ss::httpd; + using reply = ss::http::reply; + using flexible_function_handler + = http::test_utils::flexible_function_handler; + + // GCS batch API endpoint: POST /batch/storage/v1 + // Dispatch based on Authorization header to determine test scenario + auto dispatch_handler = new flexible_function_handler( + []( + const_req req, reply& reply, ss::sstring& content_type) -> std::string { + // Check that this is a batch request + if (req._url != "/batch/storage/v1") { + reply.set_status(reply::status_type::bad_request); + content_type = "application/json"; + return R"json({"error": {"code": 400, "message": "Invalid endpoint"}})json"; + } + + // Count number of Content-ID entries in request body + size_t num_keys = 0; + size_t pos = 0; + while ((pos = req.content.find("Content-ID:", pos)) + != seastar::sstring::npos) { + ++num_keys; + pos += 11; + } + + constexpr std::string_view response_boundary{ + "batch_response_boundary"}; + std::string response_body; + + // Dispatch based on Authorization header (test scenarios) + auto auth = std::string{req.get_header("Authorization")}; + if (auth.contains("test-success")) { + response_body = make_batch_delete_success_response( + response_boundary, num_keys) + .c_str(); + reply.set_status(reply::status_type::ok); + content_type = fmt::format( + "multipart/mixed; boundary={}", response_boundary); + } else if (auth.contains("test-errors")) { + response_body = make_batch_delete_error_response( + response_boundary, num_keys) + .c_str(); + reply.set_status(reply::status_type::ok); + content_type = fmt::format( + "multipart/mixed; boundary={}", response_boundary); + } else if (auth.contains("test-partial")) { + response_body = make_batch_delete_partial_error_response( + response_boundary, num_keys) + .c_str(); + reply.set_status(reply::status_type::ok); + content_type = fmt::format( + "multipart/mixed; boundary={}", response_boundary); + } else if (auth.contains("test-notfound")) { + response_body = make_batch_delete_not_found_response( + response_boundary, num_keys) + .c_str(); + reply.set_status(reply::status_type::ok); + content_type = fmt::format( + "multipart/mixed; boundary={}", response_boundary); + } else if (auth.contains("test-servererror")) { + reply.set_status(reply::status_type::internal_server_error); + content_type = "application/json"; + return R"json({"error": {"code": 500, "message": "Internal Server Error"}})json"; + } else { + reply.set_status(reply::status_type::unauthorized); + content_type = "application/json"; + return R"json({"error": {"code": 401, "message": "Unauthorized"}})json"; + } + + return response_body; + }, + "txt"); + + r.add(operation_type::POST, url("/batch/storage/v1"), dispatch_handler); +} + +s3_configuration make_test_configuration(std::string_view scenario) { + net::unresolved_address server_addr(httpd_host_name, httpd_port_number); + s3_configuration conf; + // Use a test access key that encodes the scenario name for dispatch + conf.access_key = cloud_roles::public_key_str( + fmt::format("AKIAIOSFODNN7EXAMPLE-{}", scenario)); + conf.secret_key = cloud_roles::private_key_str( + "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"); + conf.region = cloud_roles::aws_region_name("auto"); + // Critical: Set URI to match GCS pattern so backend inference works + conf.uri = access_point_uri("storage.googleapis.com"); + conf.server_addr = server_addr; + conf.url_style = s3_url_style::path; + return conf; +} + +ss::lw_shared_ptr +make_test_credentials(const s3_configuration& cfg) { + return ss::make_lw_shared( + cloud_roles::make_credentials_applier( + cloud_roles::aws_credentials{ + cfg.access_key.value(), + cfg.secret_key.value(), + std::nullopt, + cfg.region})); +} + +class gcs_client_fixture : public ::testing::Test { +public: + void set_up(std::string_view scenario = "test-success") { + server = ss::make_shared(); + server->start().get(); + server->set_routes(set_routes).get(); + + auto conf = make_test_configuration(scenario); + auto resolved = net::resolve_dns(conf.server_addr).get(); + server->listen(resolved).get(); + + auto transport_conf = build_transport_configuration(conf).get(); + client = ss::make_shared( + nullptr, + conf, + transport_conf, + conf.make_probe(), + make_test_credentials(conf)); + } + + void TearDown() override { + if (client) { + client->shutdown(); + client->stop().get(); + } + if (server) { + server->stop().get(); + } + } + + ss::shared_ptr server; + ss::shared_ptr client; +}; + +} // anonymous namespace + +TEST_F(gcs_client_fixture, test_gcs_batch_delete_success) { + set_up("test-success"); + // Test deleting 3 objects successfully + auto keys = chunked_vector{ + object_key{"key1"}, object_key{"key2"}, object_key{"key3"}}; + + auto result = client + ->delete_objects( + plain_bucket_name{"test-bucket"}, + keys, + http::default_connect_timeout) + .get(); + + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE(result.value().undeleted_keys.empty()); +} + +TEST_F(gcs_client_fixture, test_gcs_batch_delete_single_key) { + set_up("test-success"); + // Test deleting a single object + auto keys = chunked_vector{object_key{"single-key"}}; + + auto result = client + ->delete_objects( + plain_bucket_name{"test-bucket"}, + keys, + http::default_connect_timeout) + .get(); + + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE(result.value().undeleted_keys.empty()); +} + +TEST_F(gcs_client_fixture, test_gcs_batch_delete_many_keys) { + set_up("test-success"); + // Test deleting many objects (simulate batch behavior) + chunked_vector keys; + for (int i = 0; i < 10; ++i) { + keys.push_back(object_key{fmt::format("key-{}", i)}); + } + + auto result = client + ->delete_objects( + plain_bucket_name{"test-bucket"}, + keys, + http::default_connect_timeout) + .get(); + + ASSERT_TRUE(result.has_value()) << result.error(); + EXPECT_TRUE(result.value().undeleted_keys.empty()); +} + +TEST_F(gcs_client_fixture, test_gcs_batch_delete_all_errors) { + set_up("test-errors"); + auto keys = chunked_vector{ + object_key{"key1"}, object_key{"key2"}, object_key{"key3"}}; + + auto result = client + ->delete_objects( + plain_bucket_name{"test-bucket"}, + keys, + http::default_connect_timeout) + .get(); + + ASSERT_TRUE(result.has_value()) << result.error(); + // All keys should have failed + EXPECT_EQ(result.value().undeleted_keys.size(), 3); + + auto undeleted + = result.value().undeleted_keys + | std::views::transform( + [](const auto& uk) { return std::make_pair(uk.key, uk.reason); }) + | std::ranges::to>(); + + // Verify all keys are in undeleted_keys + for (const auto& k : keys) { + ASSERT_TRUE(undeleted.contains(k)) + << fmt::format("Key {} not found in undeleted_keys", k); + EXPECT_THAT(undeleted[k], testing::HasSubstr("Access denied")) + << undeleted[k]; + } +} + +TEST_F(gcs_client_fixture, test_gcs_batch_delete_partial_errors) { + set_up("test-partial"); + auto keys = chunked_vector{ + object_key{"key0"}, + object_key{"key1"}, + object_key{"key2"}, + object_key{"key4"}, + }; + + auto result = client + ->delete_objects( + plain_bucket_name{"test-bucket"}, + keys, + http::default_connect_timeout) + .get(); + + ASSERT_TRUE(result.has_value()) + << make_error_code(result.error()).message(); + // First key succeeds, rest fail + EXPECT_EQ(result.value().undeleted_keys.size(), keys.size() - 1); + + auto undeleted + = result.value().undeleted_keys + | std::views::transform( + [](const auto& uk) { return std::make_pair(uk.key, uk.reason); }) + | std::ranges::to>(); + + // Verify the failed keys are key1, key2, and key3 + EXPECT_FALSE(undeleted.contains(keys[0])) << fmt::format( + "Key {} unexpected undeleted: {}", keys[0], undeleted[keys[0]]); + for (const auto& k : std::array{keys[1], keys[2], keys[3]}) { + ASSERT_TRUE(undeleted.contains(k)) + << fmt::format("Key {} not found in undeleted_keys", k); + if (k == keys[1]) { + // Missing content ID, so we have no way to map the sub-response + // back to the input key. Since corresponding content ID didn't + // appear in the response, count this key as undeleted. + EXPECT_THAT( + undeleted[k], + testing::HasSubstr("Object missing from batch response")) + << undeleted[k]; + } else { + EXPECT_THAT(undeleted[k], testing::HasSubstr("Forbidden")) + << undeleted[k]; + } + } +} + +TEST_F(gcs_client_fixture, test_gcs_batch_delete_not_found) { + set_up("test-notfound"); + // Test deleting non-existent objects (404 should be treated as success) + auto keys = chunked_vector{ + object_key{"missing1"}, object_key{"missing2"}}; + + auto result = client + ->delete_objects( + plain_bucket_name{"test-bucket"}, + keys, + http::default_connect_timeout) + .get(); + + ASSERT_TRUE(result.has_value()) + << make_error_code(result.error()).message(); + // 404 is treated as success for delete operations + EXPECT_TRUE(result.value().undeleted_keys.empty()); +} + +TEST_F(gcs_client_fixture, test_gcs_batch_delete_server_error) { + set_up("test-servererror"); + auto keys = chunked_vector{object_key{"key1"}}; + + auto result = client + ->delete_objects( + plain_bucket_name{"test-bucket"}, + keys, + http::default_connect_timeout) + .get(); + + // Server error should result in error_outcome::retry + ASSERT_FALSE(result.has_value()); + EXPECT_EQ(result.error(), error_outcome::retry); +} + +TEST_F(gcs_client_fixture, test_gcs_batch_delete_unauthorized) { + set_up("test-unauthorized"); + auto keys = chunked_vector{object_key{"key1"}}; + + auto result = client + ->delete_objects( + plain_bucket_name{"test-bucket"}, + keys, + http::default_connect_timeout) + .get(); + + // Server error should result in error_outcome::retry + ASSERT_FALSE(result.has_value()); + EXPECT_EQ(result.error(), error_outcome::fail); +} diff --git a/src/v/cloud_storage_clients/util.cc b/src/v/cloud_storage_clients/util.cc index 31bc7539ffaed..0ecd4534c88ce 100644 --- a/src/v/cloud_storage_clients/util.cc +++ b/src/v/cloud_storage_clients/util.cc @@ -407,7 +407,7 @@ std::optional multipart_response_parser::get_part() { // boundary string appeared in the sub-response body (illegal per multipart // grammar laid out in RFC 2046) if ( - delim_idx != _delim.size() || line_feed.count() != 2 + delim_idx != _delim.size() || line_feed.count() < 2 || _parser.bytes_left() < 2) { _done = true; return std::nullopt; diff --git a/src/v/http/tests/utils.cc b/src/v/http/tests/utils.cc index ea86b05f5b4f5..e46f280dbc295 100644 --- a/src/v/http/tests/utils.cc +++ b/src/v/http/tests/utils.cc @@ -23,7 +23,9 @@ namespace http { namespace test_utils { flexible_function_handler::flexible_function_handler( - const flexible_handle_function& f_handle, ss::sstring content_type) + const flexible_handle_function& f_handle, + ss::sstring content_type, + std::set content_type_overrides) : _f_handle([this, f_handle]( std::unique_ptr req, std::unique_ptr rep) { @@ -32,7 +34,8 @@ flexible_function_handler::flexible_function_handler( return ss::make_ready_future>( std::move(rep)); }) - , _content_type(std::move(content_type)) {} + , _content_type(std::move(content_type)) + , _content_type_overrides(std::move(content_type_overrides)) {} ss::future> flexible_function_handler::handle( [[maybe_unused]] const ss::sstring& path, @@ -40,7 +43,10 @@ ss::future> flexible_function_handler::handle( std::unique_ptr rep) { return _f_handle(std::move(req), std::move(rep)) .then([this](std::unique_ptr rep) { - if (_content_type == "xml") { + if (_content_type_overrides.contains( + rep->get_header("Content-Type"))) { + rep->done(); + } else if (_content_type == "xml") { // Because `application/xml` is not implemented as a mapping // in `http/mime_types.cc`, in order to construct a reply with // the `Content-Type` header set to `application/xml`, we diff --git a/src/v/http/tests/utils.h b/src/v/http/tests/utils.h index 623f2cc3e2b28..557eb532d2552 100644 --- a/src/v/http/tests/utils.h +++ b/src/v/http/tests/utils.h @@ -37,7 +37,8 @@ class flexible_function_handler : public ss::httpd::handler_base { public: flexible_function_handler( const flexible_handle_function& f_handle, - ss::sstring content_type = "txt"); + ss::sstring content_type = "txt", + std::set content_type_overrides = {}); ss::future> handle( [[maybe_unused]] const ss::sstring& path, @@ -47,6 +48,7 @@ class flexible_function_handler : public ss::httpd::handler_base { private: ss::httpd::future_handler_function _f_handle; ss::sstring _content_type; + std::set _content_type_overrides; }; } // namespace test_utils diff --git a/tests/rptest/tests/cloud_topics/l0_gc_test.py b/tests/rptest/tests/cloud_topics/l0_gc_test.py index 98f10c6cbde8b..7e712b8a02ce7 100644 --- a/tests/rptest/tests/cloud_topics/l0_gc_test.py +++ b/tests/rptest/tests/cloud_topics/l0_gc_test.py @@ -85,7 +85,12 @@ def get_num_objects_deleted(): ) self.logger.info(samples) if samples is not None and samples.samples: - return int(sum(s.value for s in samples.samples)) + n = int(sum(s.value for s in samples.samples)) + print(n) + for node in self.redpanda.nodes: + self.redpanda._update_usage_stats(node) + print(self.redpanda.usage_stats) + return n return 0 wait_until(