From db05e473eb9d562061ad865a24ee831969a3fbcd Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 27 Aug 2024 19:21:39 +0300 Subject: [PATCH 1/8] http/client: Merge some continuations explicitly After calling connection::do_make_request() client may attch a continuation to it that handles EPIPE/ECONNABORTED and retries the request. That "may" depends o whether retries were requested in client. This patch makes the continuation unconditional, next patched will need it even for non-retriable requests. Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index 53664a68017..7edbce7c999 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -331,12 +331,13 @@ auto client::with_new_connection(Fn&& fn) { future<> client::make_request(request req, reply_handler handle, std::optional expected) { return do_with(std::move(req), std::move(handle), [this, expected] (request& req, reply_handler& handle) mutable { - auto f = with_connection([this, &req, &handle, expected] (connection& con) { + return with_connection([this, &req, &handle, expected] (connection& con) { return do_make_request(con, req, handle, expected); - }); + }).handle_exception_type([this, &req, &handle, expected] (const std::system_error& ex) { + if (!_retry) { + return make_exception_future<>(ex); + } - if (_retry) { - f = f.handle_exception_type([this, &req, &handle, expected] (const std::system_error& ex) { auto code = ex.code().value(); if ((code != EPIPE) && (code != ECONNABORTED)) { return make_exception_future<>(ex); @@ -349,9 +350,6 @@ future<> client::make_request(request req, reply_handler handle, std::optional Date: Tue, 27 Aug 2024 19:21:49 +0300 Subject: [PATCH 2/8] http/client: Idnentation fix after previous patch Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/http/client.cc b/src/http/client.cc index 7edbce7c999..fc6ebf8c3f2 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -334,22 +334,22 @@ future<> client::make_request(request req, reply_handler handle, std::optional(ex); - } + if (!_retry) { + return make_exception_future<>(ex); + } - auto code = ex.code().value(); - if ((code != EPIPE) && (code != ECONNABORTED)) { - return make_exception_future<>(ex); - } + auto code = ex.code().value(); + if ((code != EPIPE) && (code != ECONNABORTED)) { + return make_exception_future<>(ex); + } - // The 'con' connection may not yet be freed, so the total connection - // count still account for it and with_new_connection() may temporarily - // break the limit. That's OK, the 'con' will be closed really soon - return with_new_connection([this, &req, &handle, expected] (connection& con) { - return do_make_request(con, req, handle, expected); - }); + // The 'con' connection may not yet be freed, so the total connection + // count still account for it and with_new_connection() may temporarily + // break the limit. That's OK, the 'con' will be closed really soon + return with_new_connection([this, &req, &handle, expected] (connection& con) { + return do_make_request(con, req, handle, expected); }); + }); }); } From 031f87e8732b48e03745c94a3e4a37044b82dbe3 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 27 Aug 2024 19:26:33 +0300 Subject: [PATCH 3/8] http/client: Pass abort_source here and there This just propagates abort_source* argument to some calls in client. The argument is unused and is, in fact, nullptr all the time. Next patches will change that. This change is just to reduce further churn. Signed-off-by: Pavel Emelyanov --- include/seastar/http/client.hh | 11 +++++----- src/http/client.cc | 38 +++++++++++++++++++--------------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/include/seastar/http/client.hh b/include/seastar/http/client.hh index 6601df4287e..db63a69684a 100644 --- a/include/seastar/http/client.hh +++ b/include/seastar/http/client.hh @@ -185,19 +185,20 @@ private: using connection_ptr = seastar::shared_ptr; - future get_connection(); - future make_connection(); + future get_connection(abort_source* as); + future make_connection(abort_source* as); future<> put_connection(connection_ptr con); future<> shrink_connections(); template Fn> - auto with_connection(Fn&& fn); + auto with_connection(Fn&& fn, abort_source*); template requires std::invocable - auto with_new_connection(Fn&& fn); + auto with_new_connection(Fn&& fn, abort_source*); - future<> do_make_request(connection& con, request& req, reply_handler& handle, std::optional expected); + future<> do_make_request(request req, reply_handler handle, abort_source*, std::optional expected); + future<> do_make_request(connection& con, request& req, reply_handler& handle, abort_source*, std::optional expected); public: /** diff --git a/src/http/client.cc b/src/http/client.cc index fc6ebf8c3f2..6b79e46cd7e 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -243,7 +243,7 @@ client::client(std::unique_ptr f, unsigned max_connections, { } -future client::get_connection() { +future client::get_connection(abort_source* as) { if (!_pool.empty()) { connection_ptr con = _pool.front().shared_from_this(); _pool.pop_front(); @@ -252,15 +252,15 @@ future client::get_connection() { } if (_nr_connections >= _max_connections) { - return _wait_con.wait().then([this] { - return get_connection(); + return _wait_con.wait().then([this, as] { + return get_connection(as); }); } - return make_connection(); + return make_connection(as); } -future client::make_connection() { +future client::make_connection(abort_source* as) { _total_new_connections++; return _new_connections->make().then([cr = internal::client_ref(this)] (connected_socket cs) mutable { http_log.trace("created new http connection {}", cs.local_address()); @@ -311,8 +311,8 @@ future<> client::set_maximum_connections(unsigned nr) { } template Fn> -auto client::with_connection(Fn&& fn) { - return get_connection().then([this, fn = std::move(fn)] (connection_ptr con) mutable { +auto client::with_connection(Fn&& fn, abort_source* as) { + return get_connection(as).then([this, fn = std::move(fn)] (connection_ptr con) mutable { return fn(*con).finally([this, con = std::move(con)] () mutable { return put_connection(std::move(con)); }); @@ -321,8 +321,8 @@ auto client::with_connection(Fn&& fn) { template requires std::invocable -auto client::with_new_connection(Fn&& fn) { - return make_connection().then([this, fn = std::move(fn)] (connection_ptr con) mutable { +auto client::with_new_connection(Fn&& fn, abort_source* as) { + return make_connection(as).then([this, fn = std::move(fn)] (connection_ptr con) mutable { return fn(*con).finally([this, con = std::move(con)] () mutable { return put_connection(std::move(con)); }); @@ -330,10 +330,14 @@ auto client::with_new_connection(Fn&& fn) { } future<> client::make_request(request req, reply_handler handle, std::optional expected) { - return do_with(std::move(req), std::move(handle), [this, expected] (request& req, reply_handler& handle) mutable { - return with_connection([this, &req, &handle, expected] (connection& con) { - return do_make_request(con, req, handle, expected); - }).handle_exception_type([this, &req, &handle, expected] (const std::system_error& ex) { + return do_make_request(std::move(req), std::move(handle), nullptr, std::move(expected)); +} + +future<> client::do_make_request(request req, reply_handler handle, abort_source* as, std::optional expected) { + return do_with(std::move(req), std::move(handle), [this, as, expected] (request& req, reply_handler& handle) mutable { + return with_connection([this, &req, &handle, as, expected] (connection& con) { + return do_make_request(con, req, handle, as, expected); + }, as).handle_exception_type([this, &req, &handle, as, expected] (const std::system_error& ex) { if (!_retry) { return make_exception_future<>(ex); } @@ -346,14 +350,14 @@ future<> client::make_request(request req, reply_handler handle, std::optional client::do_make_request(connection& con, request& req, reply_handler& handle, std::optional expected) { +future<> client::do_make_request(connection& con, request& req, reply_handler& handle, abort_source* as, std::optional expected) { return con.do_make_request(req).then([&con, &handle, expected] (connection::reply_ptr reply) mutable { auto& rep = *reply; if (expected.has_value() && rep._status != expected.value()) { From 35badf56f1d145de520c6e1aa3f4526492026b88 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 27 Aug 2024 19:29:13 +0300 Subject: [PATCH 4/8] http/client: Add abort source to factory::make() method One of the places where client can sleep and may need to be aborted is making new connection via factory method. Add the abort source argument there. This breaks the API, but it's still experimental, so fine. Also, existing sample factories ignore the abort source. This will be done as followup. Signed-off-by: Pavel Emelyanov --- include/seastar/http/client.hh | 2 +- src/http/client.cc | 6 +++--- tests/unit/httpd_test.cc | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/include/seastar/http/client.hh b/include/seastar/http/client.hh index db63a69684a..9151e7a9a5d 100644 --- a/include/seastar/http/client.hh +++ b/include/seastar/http/client.hh @@ -150,7 +150,7 @@ public: * The implementations of this method should return ready-to-use socket that will * be used by \ref client as transport for its http connections */ - virtual future make() = 0; + virtual future make(abort_source*) = 0; virtual ~connection_factory() {} }; diff --git a/src/http/client.cc b/src/http/client.cc index 6b79e46cd7e..0ecc89becea 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -205,7 +205,7 @@ class basic_connection_factory : public connection_factory { : _addr(std::move(addr)) { } - virtual future make() override { + virtual future make(abort_source* as) override { return seastar::connect(_addr, {}, transport::TCP); } }; @@ -226,7 +226,7 @@ class tls_connection_factory : public connection_factory { , _host(std::move(host)) { } - virtual future make() override { + virtual future make(abort_source* as) override { return tls::connect(_creds, _addr, tls::tls_options{.server_name = _host}); } }; @@ -262,7 +262,7 @@ future client::get_connection(abort_source* as) { future client::make_connection(abort_source* as) { _total_new_connections++; - return _new_connections->make().then([cr = internal::client_ref(this)] (connected_socket cs) mutable { + return _new_connections->make(as).then([cr = internal::client_ref(this)] (connected_socket cs) mutable { http_log.trace("created new http connection {}", cs.local_address()); auto con = seastar::make_shared(std::move(cs), std::move(cr)); return make_ready_future(std::move(con)); diff --git a/tests/unit/httpd_test.cc b/tests/unit/httpd_test.cc index 96f58278685..5c1dddfa58a 100644 --- a/tests/unit/httpd_test.cc +++ b/tests/unit/httpd_test.cc @@ -45,7 +45,7 @@ class loopback_http_factory : public http::experimental::connection_factory { loopback_socket_impl lsi; public: explicit loopback_http_factory(loopback_connection_factory& f) : lsi(f) {} - virtual future make() override { + virtual future make(abort_source* as) override { return lsi.connect(socket_address(ipv4_addr()), socket_address(ipv4_addr())); } }; From 507445f7ed17f58d4c30027ebc38931bca8bf568 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 27 Aug 2024 19:36:17 +0300 Subject: [PATCH 5/8] http/client: Handle abort source in pool wait Another place where client may sleep is when waiting for free connection from pool. Wait happens on conditional variable, but it cannot be aborted (see #2013). So instead, subscribe for the abort source and wake up all the waiters. Then check the local abort source for being aborted in the continuation. Those waiters that are not aborted will re-enter and will sleep again. Signed-off-by: Pavel Emelyanov --- src/http/client.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/http/client.cc b/src/http/client.cc index 0ecc89becea..ab7d252fa1b 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -252,7 +252,11 @@ future client::get_connection(abort_source* as) { } if (_nr_connections >= _max_connections) { - return _wait_con.wait().then([this, as] { + auto sub = as ? as->subscribe([this] () noexcept { _wait_con.broadcast(); }) : std::nullopt; + return _wait_con.wait().then([this, as, sub = std::move(sub)] { + if (as != nullptr && as->abort_requested()) { + return make_exception_future(as->abort_requested_exception_ptr()); + } return get_connection(as); }); } From 7c5eacdd899902cd7ea9e4643b84511e0a104ce6 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 27 Aug 2024 19:35:18 +0300 Subject: [PATCH 6/8] http/client: Abort established connections When a socket is doing request-response cycle, aborting it can only be done by shutting down and eliminating from the pool. For that -- subscribe on the abort source with the callback that marks the connection as no longer persistent and shuts it down. Signed-off-by: Pavel Emelyanov --- include/seastar/http/client.hh | 2 ++ src/http/client.cc | 12 +++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/include/seastar/http/client.hh b/include/seastar/http/client.hh index 9151e7a9a5d..6ec994e97ad 100644 --- a/include/seastar/http/client.hh +++ b/include/seastar/http/client.hh @@ -132,6 +132,8 @@ private: future maybe_wait_for_continue(const request& req); future<> write_body(const request& rq); future recv_reply(); + + void shutdown() noexcept; }; /** diff --git a/src/http/client.cc b/src/http/client.cc index ab7d252fa1b..1baea37eb58 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -189,6 +189,11 @@ input_stream connection::in(reply& rep) { return input_stream(data_source(std::make_unique(_read_buf, rep.content_length))); } +void connection::shutdown() noexcept { + _persistent = false; + _fd.shutdown_input(); +} + future<> connection::close() { return when_all(_read_buf.close(), _write_buf.close()).discard_result().then([this] { auto la = _fd.local_address(); @@ -342,6 +347,10 @@ future<> client::do_make_request(request req, reply_handler handle, abort_source return with_connection([this, &req, &handle, as, expected] (connection& con) { return do_make_request(con, req, handle, as, expected); }, as).handle_exception_type([this, &req, &handle, as, expected] (const std::system_error& ex) { + if (as && as->abort_requested()) { + return make_exception_future<>(as->abort_requested_exception_ptr()); + } + if (!_retry) { return make_exception_future<>(ex); } @@ -362,6 +371,7 @@ future<> client::do_make_request(request req, reply_handler handle, abort_source } future<> client::do_make_request(connection& con, request& req, reply_handler& handle, abort_source* as, std::optional expected) { + auto sub = as ? as->subscribe([&con] () noexcept { con.shutdown(); }) : std::nullopt; return con.do_make_request(req).then([&con, &handle, expected] (connection::reply_ptr reply) mutable { auto& rep = *reply; if (expected.has_value() && rep._status != expected.value()) { @@ -381,7 +391,7 @@ future<> client::do_make_request(connection& con, request& req, reply_handler& h }).handle_exception([&con] (auto ex) mutable { con._persistent = false; return make_exception_future<>(std::move(ex)); - }); + }).finally([sub = std::move(sub)] {}); } future<> client::close() { From f9aabee0b76c0273eadcb14dda8c473e86bfdf81 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 27 Aug 2024 19:36:36 +0300 Subject: [PATCH 7/8] http/client: Add abortable make_request() API method Everything is ready, now add the make_request() that accepts abort source reference and passes it along the stack. Signed-off-by: Pavel Emelyanov --- include/seastar/http/client.hh | 12 ++++++++++++ src/http/client.cc | 4 ++++ 2 files changed, 16 insertions(+) diff --git a/include/seastar/http/client.hh b/include/seastar/http/client.hh index 6ec994e97ad..09e3b8d2edd 100644 --- a/include/seastar/http/client.hh +++ b/include/seastar/http/client.hh @@ -285,6 +285,18 @@ public: */ future<> make_request(request req, reply_handler handle, std::optional expected = std::nullopt); + /** + * \brief Send the request and handle the response (abortable) + * + * Same as previous method, but aborts the request upon as.request_abort() call + * + * \param req -- request to be sent + * \param handle -- the response handler + * \param as -- abort source that aborts the request + * \param expected -- the optional expected reply status code, default is std::nullopt + */ + future<> make_request(request req, reply_handler handle, abort_source& as, std::optional expected = std::nullopt); + /** * \brief Updates the maximum number of connections a client may have * diff --git a/src/http/client.cc b/src/http/client.cc index 1baea37eb58..4d618f7f8cb 100644 --- a/src/http/client.cc +++ b/src/http/client.cc @@ -342,6 +342,10 @@ future<> client::make_request(request req, reply_handler handle, std::optional client::make_request(request req, reply_handler handle, abort_source& as, std::optional expected) { + return do_make_request(std::move(req), std::move(handle), &as, std::move(expected)); +} + future<> client::do_make_request(request req, reply_handler handle, abort_source* as, std::optional expected) { return do_with(std::move(req), std::move(handle), [this, as, expected] (request& req, reply_handler& handle) mutable { return with_connection([this, &req, &handle, as, expected] (connection& con) { From 928bd933418b350a95e5b01fc9b46bf841e71da5 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 27 Aug 2024 19:36:43 +0300 Subject: [PATCH 8/8] test: Add abortable http client test cases Test all four scenarios: - abort establishing a connection - abort waiting for connection from pool - abort conneciton that sends request - abort connection that waits for response Signed-off-by: Pavel Emelyanov --- tests/unit/httpd_test.cc | 145 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/tests/unit/httpd_test.cc b/tests/unit/httpd_test.cc index 5c1dddfa58a..6472e2e850b 100644 --- a/tests/unit/httpd_test.cc +++ b/tests/unit/httpd_test.cc @@ -28,6 +28,7 @@ #include #include #include +#include using namespace seastar; using namespace httpd; @@ -953,6 +954,150 @@ SEASTAR_TEST_CASE(test_client_response_parse_error) { }); } +SEASTAR_TEST_CASE(test_client_abort_new_conn) { + class delayed_factory : public http::experimental::connection_factory { + public: + virtual future make(abort_source* as) override { + assert(as != nullptr); + return sleep_abortable(std::chrono::seconds(1), *as).then([] { + return make_exception_future(std::runtime_error("Shouldn't happen")); + }); + } + }; + + return seastar::async([] { + auto cln = http::experimental::client(std::make_unique()); + abort_source as; + auto f = cln.make_request(http::request::make("GET", "test", "/test"), [] (const auto& rep, auto&& in) { + return make_exception_future<>(std::runtime_error("Shouldn't happen")); + }, as, http::reply::status_type::ok); + + as.request_abort(); + BOOST_REQUIRE_THROW(f.get(), abort_requested_exception); + cln.close().get(); + }); +} + +SEASTAR_TEST_CASE(test_client_abort_cached_conn) { + return seastar::async([] { + loopback_connection_factory lcf(1); + auto ss = lcf.get_server_socket(); + promise<> server_paused; + promise<> server_resume; + future<> server = ss.accept().then([&] (accept_result ar) { + return seastar::async([&server_paused, &server_resume, sk = std::move(ar.connection)] () mutable { + input_stream in = sk.input(); + read_simple_http_request(in); + server_paused.set_value(); + server_resume.get_future().get(); + output_stream out = sk.output(); + out.close().get(); + }); + }); + + future<> client = seastar::async([&] { + auto cln = http::experimental::client(std::make_unique(lcf), 1 /* max connections */); + // this request gets handled by server and ... + auto f1 = cln.make_request(http::request::make("GET", "test", "/test"), [] (const auto& rep, auto&& in) { + return make_exception_future<>(std::runtime_error("Shouldn't happen")); + }, http::reply::status_type::ok); + server_paused.get_future().get(); + // ... this should hang waiting for cached connection + abort_source as; + auto f2 = cln.make_request(http::request::make("GET", "test", "/test"), [] (const auto& rep, auto&& in) { + return make_exception_future<>(std::runtime_error("Shouldn't happen")); + }, as, http::reply::status_type::ok); + + as.request_abort(); + BOOST_REQUIRE_THROW(f2.get(), abort_requested_exception); + server_resume.set_value(); + cln.close().get(); + try { + f1.get(); + } catch (...) { + } + }); + + when_all(std::move(client), std::move(server)).discard_result().get(); + }); +} + +SEASTAR_TEST_CASE(test_client_abort_send_request) { + return seastar::async([] { + loopback_connection_factory lcf(1); + auto ss = lcf.get_server_socket(); + future<> server = ss.accept().then([&] (accept_result ar) { + return seastar::async([sk = std::move(ar.connection)] () mutable { + input_stream in = sk.input(); + read_simple_http_request(in); + output_stream out = sk.output(); + out.close().get(); + }); + }); + + future<> client = seastar::async([&] { + auto cln = http::experimental::client(std::make_unique(lcf), 1 /* max connections */); + abort_source as; + auto req = http::request::make("GET", "test", "/test"); + promise<> client_paused; + promise<> client_resume; + req.write_body("txt", [&] (output_stream&& out) { + return seastar::async([&client_paused, &client_resume, out = std::move(out)] () mutable { + auto cl = deferred_close(out); + client_paused.set_value(); + client_resume.get_future().get(); + out.write("foo").get(); + out.flush().get(); + }); + }); + auto f = cln.make_request(std::move(req), [] (const auto& rep, auto&& in) { + return make_exception_future<>(std::runtime_error("Shouldn't happen")); + }, as, http::reply::status_type::ok); + client_paused.get_future().get(); + as.request_abort(); + client_resume.set_value(); + BOOST_REQUIRE_THROW(f.get(), abort_requested_exception); + cln.close().get(); + }); + + when_all(std::move(client), std::move(server)).discard_result().get(); + }); +} + +SEASTAR_TEST_CASE(test_client_abort_recv_response) { + return seastar::async([] { + loopback_connection_factory lcf(1); + auto ss = lcf.get_server_socket(); + promise<> server_paused; + promise<> server_resume; + future<> server = ss.accept().then([&] (accept_result ar) { + return seastar::async([&server_paused, &server_resume, sk = std::move(ar.connection)] () mutable { + input_stream in = sk.input(); + read_simple_http_request(in); + server_paused.set_value(); + server_resume.get_future().get(); + output_stream out = sk.output(); + out.close().get(); + }); + }); + + future<> client = seastar::async([&] { + auto cln = http::experimental::client(std::make_unique(lcf), 1 /* max connections */); + abort_source as; + auto f = cln.make_request(http::request::make("GET", "test", "/test"), [] (const auto& rep, auto&& in) { + return make_exception_future<>(std::runtime_error("Shouldn't happen")); + }, as, http::reply::status_type::ok); + server_paused.get_future().get(); + as.request_abort(); + BOOST_REQUIRE_THROW(f.get(), abort_requested_exception); + server_resume.set_value(); + cln.close().get(); + }); + + when_all(std::move(client), std::move(server)).discard_result().get(); + }); +} + SEASTAR_TEST_CASE(test_client_retry_request) { return seastar::async([] { loopback_connection_factory lcf(1);