Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coro_io] add support for alive detect in client_pool #727

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions include/ylt/coro_io/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,14 @@ class channel {
config)) {
std::shared_ptr<client_pool_t> client_pool;
if (client_pools_.size() > 1) {
client_pool = co_await std::visit(
[this](auto& worker) {
return worker(*this);
},
lb_worker);
int cnt = 0;
do {
client_pool = co_await std::visit(
[this](auto& worker) {
return worker(*this);
},
lb_worker);
} while (!client_pool->is_alive() && ++cnt <= size() * 2);
}
else {
client_pool = client_pools_[0];
Expand Down
93 changes: 86 additions & 7 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,21 @@ class client_pool : public std::enable_shared_from_this<
return std::chrono::milliseconds{static_cast<long>(e(r) * ms.count())};
}

static async_simple::coro::Lazy<std::pair<bool, std::chrono::milliseconds>>
reconnect_impl(std::unique_ptr<client_t>& client,
std::shared_ptr<client_pool>& self) {
auto pre_time_point = std::chrono::steady_clock::now();
auto result = co_await client->connect(self->host_name_);
bool ok = client_t::is_ok(result);
auto post_time_point = std::chrono::steady_clock::now();
auto cost_time = std::chrono::duration_cast<std::chrono::milliseconds>(
post_time_point - pre_time_point);
ELOG_TRACE << "reconnect client{" << client.get()
<< "} cost time: " << cost_time / std::chrono::milliseconds{1}
<< "ms";
co_return std::pair{ok, cost_time};
}

static async_simple::coro::Lazy<void> reconnect(
std::unique_ptr<client_t>& client, std::weak_ptr<client_pool> watcher) {
using namespace std::chrono_literals;
Expand All @@ -123,13 +138,7 @@ class client_pool : public std::enable_shared_from_this<
<< client->get_host() << ":" << client->get_port()
<< "}, try count:" << i + 1 << "max retry limit:"
<< self->pool_config_.connect_retry_count;
auto pre_time_point = std::chrono::steady_clock::now();
bool ok = client_t::is_ok(co_await client->connect(self->host_name_));
auto post_time_point = std::chrono::steady_clock::now();
auto cost_time = post_time_point - pre_time_point;
ELOG_TRACE << "reconnect client{" << client.get()
<< "} cost time: " << cost_time / std::chrono::milliseconds{1}
<< "ms";
auto [ok, cost_time] = co_await reconnect_impl(client, self);
if (ok) {
ELOG_TRACE << "reconnect client{" << client.get() << "} success";
co_return;
Expand All @@ -148,9 +157,69 @@ class client_pool : public std::enable_shared_from_this<
ELOG_WARN << "reconnect client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port()
<< "} out of max limit, stop retry. connect failed";
alive_detect(client->get_config(), std::move(self)).start([](auto&&) {
});
client = nullptr;
}

static async_simple::coro::Lazy<void> alive_detect(
const typename client_t::config& client_config,
std::weak_ptr<client_pool> watcher) {
std::shared_ptr<client_pool> self = watcher.lock();
using namespace std::chrono_literals;
if (self && self->pool_config_.host_alive_detect_duration.count() != 0 &&
self->free_client_count() == 0) {
bool expected = true;
if (!self->is_alive_.compare_exchange_strong(
expected, false)) { // other alive detect coroutine is running.
co_return;
}
if (self->free_client_count() > 0) { // recheck for multi-thread
self->is_alive_ = true;
co_return;
}
auto executor = self->io_context_pool_.get_executor();
auto client = std::make_unique<client_t>(*executor);
if (!client->init_config(client_config))
AS_UNLIKELY {
ELOG_ERROR << "Init client config failed in host alive detect. That "
"is not expected.";
co_return;
}
while (true) {
auto [ok, cost_time] = co_await reconnect_impl(client, self);
if (ok) {
ELOG_TRACE << "reconnect client{" << client.get()
<< "} success. stop alive detect.";
self->collect_free_client(std::move(client));
self->is_alive_ =
true; /*if client close(), we still mark it as alive*/
co_return;
}
if (self->is_alive_) {
ELOG_TRACE << "client pool is aliving, stop connect client {"
<< client.get() << "} for alive detect";
co_return;
}
ELOG_TRACE << "reconnect client{" << client.get()
<< "} failed. continue alive detect.";
auto wait_time = rand_time(
(self->pool_config_.host_alive_detect_duration - cost_time) / 1ms *
1ms);
self = nullptr;
if (wait_time.count() > 0) {
co_await coro_io::sleep_for(wait_time, &client->get_executor());
}
self = watcher.lock();
if (self->is_alive_) {
ELOG_TRACE << "client pool is aliving, stop connect client {"
<< client.get() << "} for alive detect";
co_return;
}
}
}
}

async_simple::coro::Lazy<std::unique_ptr<client_t>> get_client(
const typename client_t::config& client_config) {
std::unique_ptr<client_t> client;
Expand Down Expand Up @@ -208,6 +277,7 @@ class client_pool : public std::enable_shared_from_this<
enqueue(short_connect_clients_, std::move(client),
pool_config_.short_connect_idle_timeout);
}
is_alive_ = true;
}
else {
ELOG_TRACE << "client{" << client.get()
Expand Down Expand Up @@ -245,6 +315,8 @@ class client_pool : public std::enable_shared_from_this<
std::chrono::milliseconds reconnect_wait_time{1000};
std::chrono::milliseconds idle_timeout{30000};
std::chrono::milliseconds short_connect_idle_timeout{1000};
std::chrono::milliseconds host_alive_detect_duration{
30000}; /* zero means wont detect */
typename client_t::config client_config;
};

Expand Down Expand Up @@ -312,6 +384,12 @@ class client_pool : public std::enable_shared_from_this<
std::size_t free_client_count() const noexcept {
return free_clients_.size() + short_connect_clients_.size();
}
/**
* @brief if host may not useable now.
*
* @return bool
*/
bool is_alive() const noexcept { return is_alive_; }

/**
* @brief approx connection of client pools
Expand Down Expand Up @@ -368,6 +446,7 @@ class client_pool : public std::enable_shared_from_this<
std::string host_name_;
pool_config pool_config_;
io_context_pool_t& io_context_pool_;
std::atomic<bool> is_alive_ = true;
};

template <typename client_t,
Expand Down
4 changes: 4 additions & 0 deletions include/ylt/standalone/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
: coro_http_client(executor->get_asio_executor()) {}

bool init_config(const config &conf) {
config_ = conf;
if (conf.conn_timeout_duration.has_value()) {
set_conn_timeout(*conf.conn_timeout_duration);
}
Expand Down Expand Up @@ -207,6 +208,8 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {

coro_io::ExecutorWrapper<> &get_executor() { return executor_wrapper_; }

const config &get_config() { return config_; }

#ifdef CINATRA_ENABLE_SSL
bool init_ssl(int verify_mode, const std::string &base_path,
const std::string &cert_file, const std::string &sni_hostname) {
Expand Down Expand Up @@ -2432,6 +2435,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::string resp_chunk_str_;
std::span<char> out_buf_;
bool should_reset_ = false;
config config_;

#ifdef CINATRA_ENABLE_GZIP
bool enable_ws_deflate_ = false;
Expand Down
107 changes: 107 additions & 0 deletions src/coro_io/tests/test_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <asio/io_context.hpp>
#include <cassert>
#include <chrono>
#include <filesystem>
#include <fstream>
#include <memory>
Expand All @@ -16,6 +17,7 @@
#include <ylt/coro_io/io_context_pool.hpp>

#include "async_simple/coro/Lazy.h"
#include "ylt/coro_io/client_pool.hpp"
#include "ylt/coro_rpc/impl/coro_rpc_client.hpp"
#include "ylt/coro_rpc/impl/default_config/coro_rpc_config.hpp"

Expand Down Expand Up @@ -184,4 +186,109 @@ TEST_CASE("test send_request config") {
}
server.stop();
}());
}

void hello() {}

TEST_CASE("test server down") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server1(1, 58801);
server1.register_handler<hello>();
auto res = server1.async_start();
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
coro_rpc::coro_rpc_server server2(1, 58802);
server2.register_handler<hello>();
auto res2 = server2.async_start();
REQUIRE_MESSAGE(!res2.hasResult(), "server start failed");
auto hosts =
std::vector<std::string_view>{"127.0.0.1:58801", "127.0.0.1:58802"};
auto config = coro_io::client_pool<coro_rpc::coro_rpc_client>::pool_config{
.connect_retry_count = 0,
.reconnect_wait_time = std::chrono::milliseconds{0},
.host_alive_detect_duration = std::chrono::milliseconds{500}};
auto channel =
coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts, {config});

for (int i = 0; i < 100; ++i) {
auto res = co_await channel.send_request(
[&i, &hosts](
coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
CHECK(host == hosts[i % 2]);
co_return;
});
CHECK(res.has_value());
}
server1.stop();
for (int i = 0; i < 100; ++i) {
auto res = co_await channel.send_request(
[&i, &hosts](
coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
co_await client.call<hello>();
if (i > 0)
CHECK(host == hosts[1]);
co_return;
});
if (i > 2)
CHECK(res.has_value());
}
server2.stop();
{
{
auto res = co_await channel.send_request(
[](coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
co_await client.call<hello>();
co_return;
});
res = co_await channel.send_request(
[](coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
co_await client.call<hello>();
co_return;
});
CHECK(!res.has_value());
}
}

coro_rpc::coro_rpc_server server3(1, 58801);
server3.register_handler<hello>();
auto res3 = server3.async_start();
REQUIRE_MESSAGE(!res3.hasResult(), "server start failed");
co_await coro_io::sleep_for(std::chrono::seconds{1});
{
for (int i = 0; i < 100; ++i) {
auto res = co_await channel.send_request(
[&i, &hosts](
coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
CHECK(host == hosts[0]);
co_return;
});
CHECK(res.has_value());
}
}
coro_rpc::coro_rpc_server server4(1, 58802);
server4.register_handler<hello>();
auto res4 = server4.async_start();
REQUIRE_MESSAGE(!res4.hasResult(), "server start failed");
co_await coro_io::sleep_for(std::chrono::seconds{1});
{
int counter = 0;
for (int i = 0; i < 100; ++i) {
auto res = co_await channel.send_request(
[&i, &hosts, &counter](
coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
if (host == hosts[1]) {
++counter;
}
co_return;
});
CHECK(res.has_value());
}
CHECK(counter == 50);
}
}());
}
Loading