Skip to content

Commit

Permalink
handle timeout connections (qicosmos#431)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Oct 31, 2023
1 parent 9f8175e commit 4c98676
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 55 deletions.
3 changes: 2 additions & 1 deletion include/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ struct multipart_t {
size_t size = 0;
};

class coro_http_client {
class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
public:
struct config {
std::optional<std::chrono::steady_clock::duration> conn_timeout_duration;
Expand Down Expand Up @@ -1578,6 +1578,7 @@ class coro_http_client {
async_simple::coro::Lazy<void> async_read_ws() {
resp_data data{};

auto self = this->shared_from_this();
read_buf_.consume(read_buf_.size());
size_t header_size = 2;
std::shared_ptr sock = socket_;
Expand Down
38 changes: 28 additions & 10 deletions include/cinatra/coro_http_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ class coro_http_connection
template <typename AsioBuffer>
async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read(
AsioBuffer &&buffer, size_t size_to_read) noexcept {
set_last_time();
#ifdef CINATRA_ENABLE_SSL
if (use_ssl_) {
return coro_io::async_read(*ssl_stream_, buffer, size_to_read);
Expand All @@ -299,6 +300,7 @@ class coro_http_connection
template <typename AsioBuffer>
async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_write(
AsioBuffer &&buffer) {
set_last_time();
#ifdef CINATRA_ENABLE_SSL
if (use_ssl_) {
return coro_io::async_write(*ssl_stream_, buffer);
Expand All @@ -314,6 +316,7 @@ class coro_http_connection
template <typename AsioBuffer>
async_simple::coro::Lazy<std::pair<std::error_code, size_t>> async_read_until(
AsioBuffer &buffer, asio::string_view delim) noexcept {
set_last_time();
#ifdef CINATRA_ENABLE_SSL
if (use_ssl_) {
return coro_io::async_read_until(*ssl_stream_, buffer, delim);
Expand All @@ -326,24 +329,37 @@ class coro_http_connection
#endif
}

void set_last_time() {
if (checkout_timeout_) {
last_rwtime_ = std::chrono::system_clock::now();
}
}

std::chrono::system_clock::time_point get_last_rwtime() {
return last_rwtime_;
}

auto &get_executor() { return *executor_; }

void close() {
void close(bool need_cb = true) {
if (has_closed_) {
return;
}

asio::dispatch(socket_.get_executor(), [this, self = shared_from_this()] {
std::error_code ec;
socket_.shutdown(asio::socket_base::shutdown_both, ec);
socket_.close(ec);
if (quit_cb_) {
quit_cb_(conn_id_);
}
has_closed_ = true;
});
asio::dispatch(socket_.get_executor(),
[this, need_cb, self = shared_from_this()] {
std::error_code ec;
socket_.shutdown(asio::socket_base::shutdown_both, ec);
socket_.close(ec);
if (need_cb && quit_cb_) {
quit_cb_(conn_id_);
}
has_closed_ = true;
});
}

void set_check_timeout(bool r) { checkout_timeout_ = r; }

private:
bool check_keep_alive() {
bool keep_alive = true;
Expand All @@ -369,6 +385,8 @@ class coro_http_connection
std::atomic<bool> has_closed_{false};
uint64_t conn_id_{0};
std::function<void(const uint64_t &conn_id)> quit_cb_ = nullptr;
bool checkout_timeout_ = false;
std::atomic<std::chrono::system_clock::time_point> last_rwtime_;

#ifdef CINATRA_ENABLE_SSL
std::unique_ptr<asio::ssl::context> ssl_ctx_ = nullptr;
Expand Down
68 changes: 66 additions & 2 deletions include/cinatra/coro_http_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class coro_http_server {
coro_http_server(size_t thread_num, unsigned short port)
: pool_(thread_num),
port_(port),
acceptor_(pool_.get_executor()->get_asio_executor()) {}
acceptor_(pool_.get_executor()->get_asio_executor()),
check_timer_(pool_.get_executor()->get_asio_executor()) {}

~coro_http_server() {
CINATRA_LOG_INFO << "coro_http_server will quit";
Expand Down Expand Up @@ -80,13 +81,18 @@ class coro_http_server {
if (!thd_.joinable()) {
return;
}

stop_timer_ = true;
std::error_code ec;
check_timer_.cancel(ec);

close_acceptor();

// close current connections.
{
std::scoped_lock lock(conn_mtx_);
for (auto &conn : connections_) {
conn.second->close();
conn.second->close(false);
}
connections_.clear();
}
Expand Down Expand Up @@ -133,6 +139,22 @@ class coro_http_server {
}
}

void set_check_duration(auto duration) { check_duration_ = duration; }

void set_timeout_duration(
std::chrono::steady_clock::duration timeout_duration) {
if (timeout_duration > std::chrono::steady_clock::duration::zero()) {
need_check_ = true;
timeout_duration_ = timeout_duration;
start_check_timer();
}
}

size_t connection_count() {
std::scoped_lock lock(conn_mtx_);
return connections_.size();
}

private:
std::errc listen() {
CINATRA_LOG_INFO << "begin to listen";
Expand Down Expand Up @@ -189,6 +211,9 @@ class coro_http_server {
if (no_delay_) {
conn->tcp_socket().set_option(asio::ip::tcp::no_delay(true));
}
if (need_check_) {
conn->set_check_timeout(true);
}

#ifdef CINATRA_ENABLE_SSL
if (use_ssl_) {
Expand Down Expand Up @@ -227,6 +252,39 @@ class coro_http_server {
acceptor_close_waiter_.get_future().wait();
}

void start_check_timer() {
check_timer_.expires_after(check_duration_);
check_timer_.async_wait([this](auto ec) {
if (ec || stop_timer_) {
return;
}

check_timeout();
start_check_timer();
});
}

void check_timeout() {
auto cur_time = std::chrono::system_clock::now();

std::unordered_map<uint64_t, std::shared_ptr<coro_http_connection>> conns;

{
std::scoped_lock lock(conn_mtx_);
for (auto it = connections_.begin();
it != connections_.end();) // no "++"!
{
if (cur_time - it->second->get_last_rwtime() > timeout_duration_) {
it->second->close(false);
connections_.erase(it++);
}
else {
++it;
}
}
}
}

private:
coro_io::io_context_pool pool_;
uint16_t port_;
Expand All @@ -239,6 +297,12 @@ class coro_http_server {
std::unordered_map<uint64_t, std::shared_ptr<coro_http_connection>>
connections_;
std::mutex conn_mtx_;
std::chrono::steady_clock::duration check_duration_ =
std::chrono::seconds(15);
std::chrono::steady_clock::duration timeout_duration_{};
asio::steady_timer check_timer_;
bool need_check_ = false;
std::atomic<bool> stop_timer_ = false;
#ifdef CINATRA_ENABLE_SSL
std::string cert_file_;
std::string key_file_;
Expand Down
22 changes: 11 additions & 11 deletions tests/test_cinatra.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ TEST_CASE("test coro_http_client async_http_connect") {
r = async_simple::coro::syncAwait(client1.reconnect("http://www.baidu.com"));

CHECK(r.status >= 200);
r = async_simple::coro::syncAwait(client1.reconnect("http://www.purecpp.cn"));
r = async_simple::coro::syncAwait(client1.reconnect("http://cn.bing.com"));
CHECK(r.status == 200);
}

Expand Down Expand Up @@ -558,9 +558,9 @@ TEST_CASE("test coro_http_client quit") {

TEST_CASE("test coro_http_client get") {
coro_http_client client{};
auto r = client.get("http://www.purecpp.cn");
auto r = client.get("http://www.baidu.com");
CHECK(!r.net_err);
CHECK(r.status == 200);
CHECK(r.status < 400);
}

TEST_CASE("test coro_http_client add header and url queries") {
Expand Down Expand Up @@ -600,9 +600,9 @@ TEST_CASE("test coro_http_client not exist domain and bad uri") {
TEST_CASE("test coro_http_client async_get") {
coro_http_client client{};
auto r =
async_simple::coro::syncAwait(client.async_get("http://www.purecpp.cn"));
async_simple::coro::syncAwait(client.async_get("http://www.baidu.com"));
CHECK(!r.net_err);
CHECK(r.status == 200);
CHECK(r.status < 400);

auto r1 =
async_simple::coro::syncAwait(client.async_get("http://www.baidu.com"));
Expand Down Expand Up @@ -800,11 +800,11 @@ TEST_CASE("test coro http basic auth request") {

TEST_CASE("test coro http bearer token auth request") {
coro_http_client client{};
std::string uri = "http://www.purecpp.cn";
std::string uri = "http://www.baidu.com";
client.set_proxy_bearer_token_auth("password");
resp_data result = async_simple::coro::syncAwait(client.async_get(uri));
CHECK(!result.net_err);
CHECK(result.status == 200);
CHECK(result.status < 400);
}

TEST_CASE("test coro http redirect request") {
Expand Down Expand Up @@ -885,23 +885,23 @@ TEST_CASE("test coro_http_client using external io_context") {

coro_http_client client(io_context.get_executor());
auto r =
async_simple::coro::syncAwait(client.async_get("http://www.purecpp.cn"));
async_simple::coro::syncAwait(client.async_get("http://www.baidu.com"));
CHECK(!r.net_err);
CHECK(r.status == 200);
CHECK(r.status < 400);
work.reset();
io_context.run();
io_thd.join();
}

async_simple::coro::Lazy<resp_data> simulate_self_join() {
coro_http_client client{};
co_return co_await client.async_get("http://www.purecpp.cn");
co_return co_await client.async_get("http://www.baidu.com");
}

TEST_CASE("test coro_http_client dealing with self join") {
auto r = async_simple::coro::syncAwait(simulate_self_join());
CHECK(!r.net_err);
CHECK(r.status == 200);
CHECK(r.status < 400);
}

TEST_CASE("test coro_http_client no scheme still send request check") {
Expand Down
30 changes: 15 additions & 15 deletions tests/test_cinatra_websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ TEST_CASE("test wss client") {
});
f.wait();

coro_http_client client;
bool ok = client.init_ssl("../../include/cinatra", "server.crt");
auto client = std::make_shared<coro_http_client>();
bool ok = client->init_ssl("../../include/cinatra", "server.crt");
REQUIRE_MESSAGE(ok == true, "init ssl fail, please check ssl config");

std::promise<void> promise;
client.on_ws_msg([&promise](resp_data data) {
client->on_ws_msg([&promise](resp_data data) {
if (data.net_err) {
std::cout << data.net_err.message() << "\n";
promise.set_value();
Expand All @@ -51,14 +51,14 @@ TEST_CASE("test wss client") {
});

REQUIRE(async_simple::coro::syncAwait(
client.async_ws_connect("wss://localhost:9001")));
client->async_ws_connect("wss://localhost:9001")));

auto result = async_simple::coro::syncAwait(client.async_send_ws("hello"));
auto result = async_simple::coro::syncAwait(client->async_send_ws("hello"));
std::cout << result.net_err << "\n";

promise.get_future().wait();

client.async_close();
client->async_close();

server.stop();
server_thread.join();
Expand Down Expand Up @@ -133,12 +133,12 @@ TEST_CASE("test websocket") {

std::this_thread::sleep_for(std::chrono::milliseconds(100));

coro_http_client client;
client.set_ws_sec_key("s//GYHa/XO7Hd2F2eOGfyA==");
auto client = std::make_shared<coro_http_client>();
client->set_ws_sec_key("s//GYHa/XO7Hd2F2eOGfyA==");

async_simple::coro::syncAwait(test_websocket(client));
async_simple::coro::syncAwait(test_websocket(*client));

client.async_close();
client->async_close();

std::this_thread::sleep_for(std::chrono::milliseconds(300));

Expand Down Expand Up @@ -169,13 +169,13 @@ void test_websocket_content(size_t len) {
});
f.wait();

coro_http_client client;
auto client = std::make_shared<coro_http_client>();
REQUIRE(async_simple::coro::syncAwait(
client.async_ws_connect("ws://localhost:8090")));
client->async_ws_connect("ws://localhost:8090")));

std::string send_str(len, 'a');

client.on_ws_msg([&, send_str](resp_data data) {
client->on_ws_msg([&, send_str](resp_data data) {
if (data.net_err) {
std::cout << "ws_msg net error " << data.net_err.message() << "\n";
return;
Expand All @@ -186,14 +186,14 @@ void test_websocket_content(size_t len) {
CHECK(data.resp_body == send_str);
});

async_simple::coro::syncAwait(client.async_send_ws(send_str));
async_simple::coro::syncAwait(client->async_send_ws(send_str));

std::this_thread::sleep_for(std::chrono::milliseconds(300));

server.stop();
server_thread.join();

client.async_close();
client->async_close();
}

TEST_CASE("test websocket content lt 126") {
Expand Down
Loading

0 comments on commit 4c98676

Please sign in to comment.