From e6342743f8a07686f63f64396e3065634d83991b Mon Sep 17 00:00:00 2001 From: qicosmos Date: Mon, 15 Apr 2024 17:47:47 +0800 Subject: [PATCH] fix timeout (#658) --- .../standalone/cinatra/coro_http_client.hpp | 158 +++++++++--------- 1 file changed, 82 insertions(+), 76 deletions(-) diff --git a/include/ylt/standalone/cinatra/coro_http_client.hpp b/include/ylt/standalone/cinatra/coro_http_client.hpp index dd1c8c266..567a2c9d7 100644 --- a/include/ylt/standalone/cinatra/coro_http_client.hpp +++ b/include/ylt/standalone/cinatra/coro_http_client.hpp @@ -286,12 +286,13 @@ class coro_http_client : public std::enable_shared_from_this { if (!ok) { co_return resp_data{std::make_error_code(std::errc::protocol_error), 404}; } - - auto future = start_timer(conn_timeout_duration_, "connect timer"); - - data = co_await connect(u); - if (auto ec = co_await wait_future(std::move(future)); ec) { - co_return resp_data{ec, 404}; + { + auto time_out_guard = + timer_guard(this, conn_timeout_duration_, "connect timer"); + data = co_await connect(u); + } + if (socket_->is_timeout_) { + co_return resp_data{std::make_error_code(std::errc::timed_out), 404}; } if (!data.net_err) { data.status = 200; @@ -623,38 +624,27 @@ class coro_http_client : public std::enable_shared_from_this { void set_max_single_part_size(size_t size) { max_single_part_size_ = size; } - async_simple::Future start_timer( - std::chrono::steady_clock::duration duration, std::string msg) { - is_timeout_ = false; - - async_simple::Promise promise; - auto fut = promise.getFuture(); + struct timer_guard { + timer_guard(coro_http_client *self, + std::chrono::steady_clock::duration duration, std::string msg) + : self(self) { + self->socket_->is_timeout_ = false; - if (enable_timeout_) { - timeout(timer_, std::move(promise), duration, std::move(msg)) - .via(&executor_wrapper_) - .detach(); - } - else { - promise.setValue(async_simple::Unit{}); - } - return fut; - } - - async_simple::coro::Lazy wait_future( - async_simple::Future &&future) { - if (!enable_timeout_) { - co_return std::error_code{}; + if (self->enable_timeout_) { + self->timeout(self->timer_, duration, std::move(msg)) + .start([](auto &&) { + }); + } + return; } - std::error_code err_code; - timer_.cancel(err_code); - co_await std::move(future); - if (is_timeout_) { - co_return std::make_error_code(std::errc::timed_out); + ~timer_guard() { + if (self->enable_timeout_ && self->socket_->is_timeout_ == false) { + std::error_code ignore_ec; + self->timer_.cancel(ignore_ec); + } } - - co_return std::error_code{}; - } + coro_http_client *self; + }; async_simple::coro::Lazy async_upload_multipart(std::string uri) { std::shared_ptr guard(nullptr, [this](auto) { @@ -684,18 +674,21 @@ class coro_http_client : public std::enable_shared_from_this { size_t size = 0; if (socket_->has_closed_) { - auto future = start_timer(conn_timeout_duration_, "connect timer"); - - data = co_await connect(u); - if (ec = co_await wait_future(std::move(future)); ec) { - co_return resp_data{ec, 404}; + { + auto time_out_guard = + timer_guard(this, conn_timeout_duration_, "connect timer"); + data = co_await connect(u); + } + if (socket_->is_timeout_) { + co_return resp_data{std::make_error_code(std::errc::timed_out), 404}; } if (data.net_err) { co_return data; } } - auto future = start_timer(req_timeout_duration_, "upload timer"); + auto time_out_guard = + timer_guard(this, conn_timeout_duration_, "request timer"); std::tie(ec, size) = co_await async_write(asio::buffer(header_str)); #ifdef INJECT_FOR_HTTP_CLIENT_TEST if (inject_write_failed == ClientInjectAction::write_failed) { @@ -714,7 +707,7 @@ class coro_http_client : public std::enable_shared_from_this { data = co_await send_single_part(key, part); if (data.net_err) { - if (data.net_err == asio::error::operation_aborted) { + if (socket_->is_timeout_) { data.net_err = std::make_error_code(std::errc::timed_out); } co_return data; @@ -725,16 +718,18 @@ class coro_http_client : public std::enable_shared_from_this { last_part.append("--").append(BOUNDARY).append("--").append(CRCF); if (std::tie(ec, size) = co_await async_write(asio::buffer(last_part)); ec) { + if (socket_->is_timeout_) { + ec = std::make_error_code(std::errc::timed_out); + } co_return resp_data{ec, 404}; } bool is_keep_alive = true; data = co_await handle_read(ec, size, is_keep_alive, std::move(ctx), http_method::POST); - if (auto errc = co_await wait_future(std::move(future)); errc) { - ec = errc; + if (socket_->is_timeout_) { + ec = std::make_error_code(std::errc::timed_out); } - handle_result(data, ec, is_keep_alive); co_return data; } @@ -880,20 +875,25 @@ class coro_http_client : public std::enable_shared_from_this { size_t size = 0; if (socket_->has_closed_) { - auto future = start_timer(conn_timeout_duration_, "connect timer"); - - data = co_await connect(u); - if (ec = co_await wait_future(std::move(future)); ec) { - co_return resp_data{ec, 404}; + { + auto guard = timer_guard(this, conn_timeout_duration_, "connect timer"); + data = co_await connect(u); + } + if (socket_->is_timeout_) { + co_return resp_data{std::make_error_code(std::errc::timed_out), 404}; } if (data.net_err) { co_return data; } } - auto future = start_timer(req_timeout_duration_, "upload timer"); + auto time_guard = + timer_guard(this, conn_timeout_duration_, "request timer"); std::tie(ec, size) = co_await async_write(asio::buffer(header_str)); if (ec) { + if (socket_->is_timeout_) { + ec = std::make_error_code(std::errc::timed_out); + } co_return resp_data{ec, 404}; } @@ -945,19 +945,19 @@ class coro_http_client : public std::enable_shared_from_this { } } } - - if (ec && ec == asio::error::operation_aborted) { - ec = std::make_error_code(std::errc::timed_out); + if (ec) { + if (socket_->is_timeout_) { + ec = std::make_error_code(std::errc::timed_out); + } co_return resp_data{ec, 404}; } bool is_keep_alive = true; data = co_await handle_read(ec, size, is_keep_alive, std::move(ctx), http_method::POST); - if (auto errc = co_await wait_future(std::move(future)); errc) { - ec = errc; + if (ec && socket_->is_timeout_) { + ec = std::make_error_code(std::errc::timed_out); } - handle_result(data, ec, is_keep_alive); co_return data; } @@ -1020,15 +1020,20 @@ class coro_http_client : public std::enable_shared_from_this { u.path = uri; } if (socket_->has_closed_) { - auto conn_future = start_timer(conn_timeout_duration_, "connect timer"); host_ = proxy_host_.empty() ? u.get_host() : proxy_host_; port_ = proxy_port_.empty() ? u.get_port() : proxy_port_; + auto guard = timer_guard(this, conn_timeout_duration_, "connect timer"); if (ec = co_await coro_io::async_connect(&executor_wrapper_, socket_->impl_, host_, port_); ec) { break; } + if (socket_->is_timeout_) { + data.net_err = std::make_error_code(std::errc::timed_out); + co_return data; + } + if (enable_tcp_no_delay_) { socket_->impl_.set_option(asio::ip::tcp::no_delay(true), ec); if (ec) { @@ -1059,9 +1064,6 @@ class coro_http_client : public std::enable_shared_from_this { } } socket_->has_closed_ = false; - if (ec = co_await wait_future(std::move(conn_future)); ec) { - break; - } } std::vector vec; @@ -1080,7 +1082,7 @@ class coro_http_client : public std::enable_shared_from_this { #ifdef CORO_HTTP_PRINT_REQ_HEAD CINATRA_LOG_DEBUG << req_head_str; #endif - auto future = start_timer(req_timeout_duration_, "request timer"); + auto guard = timer_guard(this, req_timeout_duration_, "request timer"); if (has_body) { std::tie(ec, size) = co_await async_write(vec); } @@ -1090,14 +1092,12 @@ class coro_http_client : public std::enable_shared_from_this { if (ec) { break; } - data = co_await handle_read(ec, size, is_keep_alive, std::move(ctx), method); - if (auto errc = co_await wait_future(std::move(future)); errc) { - ec = errc; - } } while (0); - + if (ec && socket_->is_timeout_) { + ec = std::make_error_code(std::errc::timed_out); + } handle_result(data, ec, is_keep_alive); co_return data; } @@ -1179,6 +1179,7 @@ class coro_http_client : public std::enable_shared_from_this { struct socket_t { asio::ip::tcp::socket impl_; std::atomic has_closed_ = true; + bool is_timeout_ = false; asio::streambuf head_buf_; asio::streambuf chunked_buf_; #ifdef CINATRA_ENABLE_SSL @@ -1665,6 +1666,11 @@ class coro_http_client : public std::enable_shared_from_this { co_return resp_data{ec, 404}; } + if (socket_->is_timeout_) { + auto ec = std::make_error_code(std::errc::timed_out); + co_return resp_data{ec, 404}; + } + if (enable_tcp_no_delay_) { std::error_code ec; socket_->impl_.set_option(asio::ip::tcp::no_delay(true), ec); @@ -1960,17 +1966,19 @@ class coro_http_client : public std::enable_shared_from_this { } async_simple::coro::Lazy timeout( - auto &timer, auto promise, std::chrono::steady_clock::duration duration, + auto &timer, std::chrono::steady_clock::duration duration, std::string msg) { + auto watcher = std::weak_ptr(socket_); timer.expires_after(duration); - is_timeout_ = co_await timer.async_await(); - if (!is_timeout_) { - promise.setValue(async_simple::Unit()); + auto is_timeout = co_await timer.async_await(); + if (!is_timeout) { co_return false; } - CINATRA_LOG_WARNING << msg << " timeout"; - close_socket(*socket_); - promise.setValue(async_simple::Unit()); + if (auto socket = watcher.lock(); socket) { + socket_->is_timeout_ = true; + CINATRA_LOG_WARNING << msg << " timeout"; + close_socket(*socket_); + } co_return true; } @@ -2025,8 +2033,6 @@ class coro_http_client : public std::enable_shared_from_this { #endif std::string redirect_uri_; bool enable_follow_redirect_ = false; - - bool is_timeout_ = false; bool enable_timeout_ = false; std::chrono::steady_clock::duration conn_timeout_duration_ = std::chrono::seconds(8);