Skip to content

Commit

Permalink
Merge 'Make http::client::make_request() abortable' from Pavel Emelyanov
Browse files Browse the repository at this point in the history
This PR adds abort_source& argument to client::make_request() method. With it, calling abort_source::request_abort() will break the request making process wherever it is and resolve the make_request() with abort_requested_exception exception.

One of the use-cases is to let client impose a timeout on the make_request(). For that, a timer should be armed that would request abort on the abort source passed to the method. In #2304 there was an attempt to put a timeout on connect() only, but this approach is more generic and allows aborting request at any stage, not only connecting.

Another scenario is stopping long operations on user request. There can be several examples of what "long" operation can be. One of them is simple connect() to the server -- TCP times out after 1 minute by default, and breaking this earlier can be useful. Another example of long operation is S3 file uploading. It may happen in "parts" and part can be of any size, so simple writing of the request body into the socket may take time.

fixes: #2409
closes: #2304

Closes #2410

* github.com:scylladb/seastar:
  test: Add abortable http client test cases
  http/client: Add abortable make_request() API method
  http/client: Abort established connections
  http/client: Handle abort source in pool wait
  http/client: Add abort source to factory::make() method
  http/client: Pass abort_source here and there
  http/client: Idnentation fix after previous patch
  http/client: Merge some continuations explicitly
  • Loading branch information
avikivity committed Sep 11, 2024
2 parents 20cf440 + 928bd93 commit 2c1da84
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 40 deletions.
27 changes: 21 additions & 6 deletions include/seastar/http/client.hh
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ private:
future<reply_ptr> maybe_wait_for_continue(const request& req);
future<> write_body(const request& rq);
future<reply_ptr> recv_reply();

void shutdown() noexcept;
};

/**
Expand All @@ -150,7 +152,7 @@ public:
* The implementations of this method should return ready-to-use socket that will
* be used by \ref client as transport for its http connections
*/
virtual future<connected_socket> make() = 0;
virtual future<connected_socket> make(abort_source*) = 0;
virtual ~connection_factory() {}
};

Expand Down Expand Up @@ -185,19 +187,20 @@ private:

using connection_ptr = seastar::shared_ptr<connection>;

future<connection_ptr> get_connection();
future<connection_ptr> make_connection();
future<connection_ptr> get_connection(abort_source* as);
future<connection_ptr> make_connection(abort_source* as);
future<> put_connection(connection_ptr con);
future<> shrink_connections();

template <std::invocable<connection&> Fn>
auto with_connection(Fn&& fn);
auto with_connection(Fn&& fn, abort_source*);

template <typename Fn>
requires std::invocable<Fn, connection&>
auto with_new_connection(Fn&& fn);
auto with_new_connection(Fn&& fn, abort_source*);

future<> do_make_request(connection& con, request& req, reply_handler& handle, std::optional<reply::status_type> expected);
future<> do_make_request(request req, reply_handler handle, abort_source*, std::optional<reply::status_type> expected);
future<> do_make_request(connection& con, request& req, reply_handler& handle, abort_source*, std::optional<reply::status_type> expected);

public:
/**
Expand Down Expand Up @@ -282,6 +285,18 @@ public:
*/
future<> make_request(request req, reply_handler handle, std::optional<reply::status_type> expected = std::nullopt);

/**
* \brief Send the request and handle the response (abortable)
*
* Same as previous method, but aborts the request upon as.request_abort() call
*
* \param req -- request to be sent
* \param handle -- the response handler
* \param as -- abort source that aborts the request
* \param expected -- the optional expected reply status code, default is std::nullopt
*/
future<> make_request(request req, reply_handler handle, abort_source& as, std::optional<reply::status_type> expected = std::nullopt);

/**
* \brief Updates the maximum number of connections a client may have
*
Expand Down
86 changes: 53 additions & 33 deletions src/http/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ input_stream<char> connection::in(reply& rep) {
return input_stream<char>(data_source(std::make_unique<httpd::internal::content_length_source_impl>(_read_buf, rep.content_length)));
}

void connection::shutdown() noexcept {
_persistent = false;
_fd.shutdown_input();
}

future<> connection::close() {
return when_all(_read_buf.close(), _write_buf.close()).discard_result().then([this] {
auto la = _fd.local_address();
Expand All @@ -205,7 +210,7 @@ class basic_connection_factory : public connection_factory {
: _addr(std::move(addr))
{
}
virtual future<connected_socket> make() override {
virtual future<connected_socket> make(abort_source* as) override {
return seastar::connect(_addr, {}, transport::TCP);
}
};
Expand All @@ -226,7 +231,7 @@ class tls_connection_factory : public connection_factory {
, _host(std::move(host))
{
}
virtual future<connected_socket> make() override {
virtual future<connected_socket> make(abort_source* as) override {
return tls::connect(_creds, _addr, tls::tls_options{.server_name = _host});
}
};
Expand All @@ -243,7 +248,7 @@ client::client(std::unique_ptr<connection_factory> f, unsigned max_connections,
{
}

future<client::connection_ptr> client::get_connection() {
future<client::connection_ptr> client::get_connection(abort_source* as) {
if (!_pool.empty()) {
connection_ptr con = _pool.front().shared_from_this();
_pool.pop_front();
Expand All @@ -252,17 +257,21 @@ future<client::connection_ptr> client::get_connection() {
}

if (_nr_connections >= _max_connections) {
return _wait_con.wait().then([this] {
return get_connection();
auto sub = as ? as->subscribe([this] () noexcept { _wait_con.broadcast(); }) : std::nullopt;
return _wait_con.wait().then([this, as, sub = std::move(sub)] {
if (as != nullptr && as->abort_requested()) {
return make_exception_future<client::connection_ptr>(as->abort_requested_exception_ptr());
}
return get_connection(as);
});
}

return make_connection();
return make_connection(as);
}

future<client::connection_ptr> client::make_connection() {
future<client::connection_ptr> client::make_connection(abort_source* as) {
_total_new_connections++;
return _new_connections->make().then([cr = internal::client_ref(this)] (connected_socket cs) mutable {
return _new_connections->make(as).then([cr = internal::client_ref(this)] (connected_socket cs) mutable {
http_log.trace("created new http connection {}", cs.local_address());
auto con = seastar::make_shared<connection>(std::move(cs), std::move(cr));
return make_ready_future<connection_ptr>(std::move(con));
Expand Down Expand Up @@ -311,8 +320,8 @@ future<> client::set_maximum_connections(unsigned nr) {
}

template <std::invocable<connection&> Fn>
auto client::with_connection(Fn&& fn) {
return get_connection().then([this, fn = std::move(fn)] (connection_ptr con) mutable {
auto client::with_connection(Fn&& fn, abort_source* as) {
return get_connection(as).then([this, fn = std::move(fn)] (connection_ptr con) mutable {
return fn(*con).finally([this, con = std::move(con)] () mutable {
return put_connection(std::move(con));
});
Expand All @@ -321,41 +330,52 @@ auto client::with_connection(Fn&& fn) {

template <typename Fn>
requires std::invocable<Fn, connection&>
auto client::with_new_connection(Fn&& fn) {
return make_connection().then([this, fn = std::move(fn)] (connection_ptr con) mutable {
auto client::with_new_connection(Fn&& fn, abort_source* as) {
return make_connection(as).then([this, fn = std::move(fn)] (connection_ptr con) mutable {
return fn(*con).finally([this, con = std::move(con)] () mutable {
return put_connection(std::move(con));
});
});
}

future<> client::make_request(request req, reply_handler handle, std::optional<reply::status_type> expected) {
return do_with(std::move(req), std::move(handle), [this, expected] (request& req, reply_handler& handle) mutable {
auto f = with_connection([this, &req, &handle, expected] (connection& con) {
return do_make_request(con, req, handle, expected);
});
return do_make_request(std::move(req), std::move(handle), nullptr, std::move(expected));
}

if (_retry) {
f = f.handle_exception_type([this, &req, &handle, expected] (const std::system_error& ex) {
auto code = ex.code().value();
if ((code != EPIPE) && (code != ECONNABORTED)) {
return make_exception_future<>(ex);
}
future<> client::make_request(request req, reply_handler handle, abort_source& as, std::optional<reply::status_type> expected) {
return do_make_request(std::move(req), std::move(handle), &as, std::move(expected));
}

// The 'con' connection may not yet be freed, so the total connection
// count still account for it and with_new_connection() may temporarily
// break the limit. That's OK, the 'con' will be closed really soon
return with_new_connection([this, &req, &handle, expected] (connection& con) {
return do_make_request(con, req, handle, expected);
});
});
}
future<> client::do_make_request(request req, reply_handler handle, abort_source* as, std::optional<reply::status_type> expected) {
return do_with(std::move(req), std::move(handle), [this, as, expected] (request& req, reply_handler& handle) mutable {
return with_connection([this, &req, &handle, as, expected] (connection& con) {
return do_make_request(con, req, handle, as, expected);
}, as).handle_exception_type([this, &req, &handle, as, expected] (const std::system_error& ex) {
if (as && as->abort_requested()) {
return make_exception_future<>(as->abort_requested_exception_ptr());
}

if (!_retry) {
return make_exception_future<>(ex);
}

return f;
auto code = ex.code().value();
if ((code != EPIPE) && (code != ECONNABORTED)) {
return make_exception_future<>(ex);
}

// The 'con' connection may not yet be freed, so the total connection
// count still account for it and with_new_connection() may temporarily
// break the limit. That's OK, the 'con' will be closed really soon
return with_new_connection([this, &req, &handle, as, expected] (connection& con) {
return do_make_request(con, req, handle, as, expected);
}, as);
});
});
}

future<> client::do_make_request(connection& con, request& req, reply_handler& handle, std::optional<reply::status_type> expected) {
future<> client::do_make_request(connection& con, request& req, reply_handler& handle, abort_source* as, std::optional<reply::status_type> expected) {
auto sub = as ? as->subscribe([&con] () noexcept { con.shutdown(); }) : std::nullopt;
return con.do_make_request(req).then([&con, &handle, expected] (connection::reply_ptr reply) mutable {
auto& rep = *reply;
if (expected.has_value() && rep._status != expected.value()) {
Expand All @@ -375,7 +395,7 @@ future<> client::do_make_request(connection& con, request& req, reply_handler& h
}).handle_exception([&con] (auto ex) mutable {
con._persistent = false;
return make_exception_future<>(std::move(ex));
});
}).finally([sub = std::move(sub)] {});
}

future<> client::close() {
Expand Down
147 changes: 146 additions & 1 deletion tests/unit/httpd_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <seastar/http/url.hh>
#include <seastar/util/later.hh>
#include <seastar/util/short_streams.hh>
#include <seastar/util/closeable.hh>

using namespace seastar;
using namespace httpd;
Expand All @@ -45,7 +46,7 @@ class loopback_http_factory : public http::experimental::connection_factory {
loopback_socket_impl lsi;
public:
explicit loopback_http_factory(loopback_connection_factory& f) : lsi(f) {}
virtual future<connected_socket> make() override {
virtual future<connected_socket> make(abort_source* as) override {
return lsi.connect(socket_address(ipv4_addr()), socket_address(ipv4_addr()));
}
};
Expand Down Expand Up @@ -953,6 +954,150 @@ SEASTAR_TEST_CASE(test_client_response_parse_error) {
});
}

SEASTAR_TEST_CASE(test_client_abort_new_conn) {
class delayed_factory : public http::experimental::connection_factory {
public:
virtual future<connected_socket> make(abort_source* as) override {
assert(as != nullptr);
return sleep_abortable(std::chrono::seconds(1), *as).then([] {
return make_exception_future<connected_socket>(std::runtime_error("Shouldn't happen"));
});
}
};

return seastar::async([] {
auto cln = http::experimental::client(std::make_unique<delayed_factory>());
abort_source as;
auto f = cln.make_request(http::request::make("GET", "test", "/test"), [] (const auto& rep, auto&& in) {
return make_exception_future<>(std::runtime_error("Shouldn't happen"));
}, as, http::reply::status_type::ok);

as.request_abort();
BOOST_REQUIRE_THROW(f.get(), abort_requested_exception);
cln.close().get();
});
}

SEASTAR_TEST_CASE(test_client_abort_cached_conn) {
return seastar::async([] {
loopback_connection_factory lcf(1);
auto ss = lcf.get_server_socket();
promise<> server_paused;
promise<> server_resume;
future<> server = ss.accept().then([&] (accept_result ar) {
return seastar::async([&server_paused, &server_resume, sk = std::move(ar.connection)] () mutable {
input_stream<char> in = sk.input();
read_simple_http_request(in);
server_paused.set_value();
server_resume.get_future().get();
output_stream<char> out = sk.output();
out.close().get();
});
});

future<> client = seastar::async([&] {
auto cln = http::experimental::client(std::make_unique<loopback_http_factory>(lcf), 1 /* max connections */);
// this request gets handled by server and ...
auto f1 = cln.make_request(http::request::make("GET", "test", "/test"), [] (const auto& rep, auto&& in) {
return make_exception_future<>(std::runtime_error("Shouldn't happen"));
}, http::reply::status_type::ok);
server_paused.get_future().get();
// ... this should hang waiting for cached connection
abort_source as;
auto f2 = cln.make_request(http::request::make("GET", "test", "/test"), [] (const auto& rep, auto&& in) {
return make_exception_future<>(std::runtime_error("Shouldn't happen"));
}, as, http::reply::status_type::ok);

as.request_abort();
BOOST_REQUIRE_THROW(f2.get(), abort_requested_exception);
server_resume.set_value();
cln.close().get();
try {
f1.get();
} catch (...) {
}
});

when_all(std::move(client), std::move(server)).discard_result().get();
});
}

SEASTAR_TEST_CASE(test_client_abort_send_request) {
return seastar::async([] {
loopback_connection_factory lcf(1);
auto ss = lcf.get_server_socket();
future<> server = ss.accept().then([&] (accept_result ar) {
return seastar::async([sk = std::move(ar.connection)] () mutable {
input_stream<char> in = sk.input();
read_simple_http_request(in);
output_stream<char> out = sk.output();
out.close().get();
});
});

future<> client = seastar::async([&] {
auto cln = http::experimental::client(std::make_unique<loopback_http_factory>(lcf), 1 /* max connections */);
abort_source as;
auto req = http::request::make("GET", "test", "/test");
promise<> client_paused;
promise<> client_resume;
req.write_body("txt", [&] (output_stream<char>&& out) {
return seastar::async([&client_paused, &client_resume, out = std::move(out)] () mutable {
auto cl = deferred_close(out);
client_paused.set_value();
client_resume.get_future().get();
out.write("foo").get();
out.flush().get();
});
});
auto f = cln.make_request(std::move(req), [] (const auto& rep, auto&& in) {
return make_exception_future<>(std::runtime_error("Shouldn't happen"));
}, as, http::reply::status_type::ok);
client_paused.get_future().get();
as.request_abort();
client_resume.set_value();
BOOST_REQUIRE_THROW(f.get(), abort_requested_exception);
cln.close().get();
});

when_all(std::move(client), std::move(server)).discard_result().get();
});
}

SEASTAR_TEST_CASE(test_client_abort_recv_response) {
return seastar::async([] {
loopback_connection_factory lcf(1);
auto ss = lcf.get_server_socket();
promise<> server_paused;
promise<> server_resume;
future<> server = ss.accept().then([&] (accept_result ar) {
return seastar::async([&server_paused, &server_resume, sk = std::move(ar.connection)] () mutable {
input_stream<char> in = sk.input();
read_simple_http_request(in);
server_paused.set_value();
server_resume.get_future().get();
output_stream<char> out = sk.output();
out.close().get();
});
});

future<> client = seastar::async([&] {
auto cln = http::experimental::client(std::make_unique<loopback_http_factory>(lcf), 1 /* max connections */);
abort_source as;
auto f = cln.make_request(http::request::make("GET", "test", "/test"), [] (const auto& rep, auto&& in) {
return make_exception_future<>(std::runtime_error("Shouldn't happen"));
}, as, http::reply::status_type::ok);
server_paused.get_future().get();
as.request_abort();
BOOST_REQUIRE_THROW(f.get(), abort_requested_exception);
server_resume.set_value();
cln.close().get();
});

when_all(std::move(client), std::move(server)).discard_result().get();
});
}

SEASTAR_TEST_CASE(test_client_retry_request) {
return seastar::async([] {
loopback_connection_factory lcf(1);
Expand Down

0 comments on commit 2c1da84

Please sign in to comment.