Skip to content

Commit

Permalink
add support for alive detact in client_pool (#727)
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle authored Jul 26, 2024
1 parent 8fe7242 commit f7576e3
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 12 deletions.
13 changes: 8 additions & 5 deletions include/ylt/coro_io/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,14 @@ class channel {
config)) {
std::shared_ptr<client_pool_t> client_pool;
if (client_pools_.size() > 1) {
client_pool = co_await std::visit(
[this](auto& worker) {
return worker(*this);
},
lb_worker);
int cnt = 0;
do {
client_pool = co_await std::visit(
[this](auto& worker) {
return worker(*this);
},
lb_worker);
} while (!client_pool->is_alive() && ++cnt <= size() * 2);
}
else {
client_pool = client_pools_[0];
Expand Down
93 changes: 86 additions & 7 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,21 @@ class client_pool : public std::enable_shared_from_this<
return std::chrono::milliseconds{static_cast<long>(e(r) * ms.count())};
}

static async_simple::coro::Lazy<std::pair<bool, std::chrono::milliseconds>>
reconnect_impl(std::unique_ptr<client_t>& client,
std::shared_ptr<client_pool>& self) {
auto pre_time_point = std::chrono::steady_clock::now();
auto result = co_await client->connect(self->host_name_);
bool ok = client_t::is_ok(result);
auto post_time_point = std::chrono::steady_clock::now();
auto cost_time = std::chrono::duration_cast<std::chrono::milliseconds>(
post_time_point - pre_time_point);
ELOG_TRACE << "reconnect client{" << client.get()
<< "} cost time: " << cost_time / std::chrono::milliseconds{1}
<< "ms";
co_return std::pair{ok, cost_time};
}

static async_simple::coro::Lazy<void> reconnect(
std::unique_ptr<client_t>& client, std::weak_ptr<client_pool> watcher) {
using namespace std::chrono_literals;
Expand All @@ -123,13 +138,7 @@ class client_pool : public std::enable_shared_from_this<
<< client->get_host() << ":" << client->get_port()
<< "}, try count:" << i + 1 << "max retry limit:"
<< self->pool_config_.connect_retry_count;
auto pre_time_point = std::chrono::steady_clock::now();
bool ok = client_t::is_ok(co_await client->connect(self->host_name_));
auto post_time_point = std::chrono::steady_clock::now();
auto cost_time = post_time_point - pre_time_point;
ELOG_TRACE << "reconnect client{" << client.get()
<< "} cost time: " << cost_time / std::chrono::milliseconds{1}
<< "ms";
auto [ok, cost_time] = co_await reconnect_impl(client, self);
if (ok) {
ELOG_TRACE << "reconnect client{" << client.get() << "} success";
co_return;
Expand All @@ -148,9 +157,69 @@ class client_pool : public std::enable_shared_from_this<
ELOG_WARN << "reconnect client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port()
<< "} out of max limit, stop retry. connect failed";
alive_detect(client->get_config(), std::move(self)).start([](auto&&) {
});
client = nullptr;
}

static async_simple::coro::Lazy<void> alive_detect(
const typename client_t::config& client_config,
std::weak_ptr<client_pool> watcher) {
std::shared_ptr<client_pool> self = watcher.lock();
using namespace std::chrono_literals;
if (self && self->pool_config_.host_alive_detect_duration.count() != 0 &&
self->free_client_count() == 0) {
bool expected = true;
if (!self->is_alive_.compare_exchange_strong(
expected, false)) { // other alive detect coroutine is running.
co_return;
}
if (self->free_client_count() > 0) { // recheck for multi-thread
self->is_alive_ = true;
co_return;
}
auto executor = self->io_context_pool_.get_executor();
auto client = std::make_unique<client_t>(*executor);
if (!client->init_config(client_config))
AS_UNLIKELY {
ELOG_ERROR << "Init client config failed in host alive detect. That "
"is not expected.";
co_return;
}
while (true) {
auto [ok, cost_time] = co_await reconnect_impl(client, self);
if (ok) {
ELOG_TRACE << "reconnect client{" << client.get()
<< "} success. stop alive detect.";
self->collect_free_client(std::move(client));
self->is_alive_ =
true; /*if client close(), we still mark it as alive*/
co_return;
}
if (self->is_alive_) {
ELOG_TRACE << "client pool is aliving, stop connect client {"
<< client.get() << "} for alive detect";
co_return;
}
ELOG_TRACE << "reconnect client{" << client.get()
<< "} failed. continue alive detect.";
auto wait_time = rand_time(
(self->pool_config_.host_alive_detect_duration - cost_time) / 1ms *
1ms);
self = nullptr;
if (wait_time.count() > 0) {
co_await coro_io::sleep_for(wait_time, &client->get_executor());
}
self = watcher.lock();
if (self->is_alive_) {
ELOG_TRACE << "client pool is aliving, stop connect client {"
<< client.get() << "} for alive detect";
co_return;
}
}
}
}

async_simple::coro::Lazy<std::unique_ptr<client_t>> get_client(
const typename client_t::config& client_config) {
std::unique_ptr<client_t> client;
Expand Down Expand Up @@ -208,6 +277,7 @@ class client_pool : public std::enable_shared_from_this<
enqueue(short_connect_clients_, std::move(client),
pool_config_.short_connect_idle_timeout);
}
is_alive_ = true;
}
else {
ELOG_TRACE << "client{" << client.get()
Expand Down Expand Up @@ -245,6 +315,8 @@ class client_pool : public std::enable_shared_from_this<
std::chrono::milliseconds reconnect_wait_time{1000};
std::chrono::milliseconds idle_timeout{30000};
std::chrono::milliseconds short_connect_idle_timeout{1000};
std::chrono::milliseconds host_alive_detect_duration{
30000}; /* zero means wont detect */
typename client_t::config client_config;
};

Expand Down Expand Up @@ -312,6 +384,12 @@ class client_pool : public std::enable_shared_from_this<
std::size_t free_client_count() const noexcept {
return free_clients_.size() + short_connect_clients_.size();
}
/**
* @brief if host may not useable now.
*
* @return bool
*/
bool is_alive() const noexcept { return is_alive_; }

/**
* @brief approx connection of client pools
Expand Down Expand Up @@ -368,6 +446,7 @@ class client_pool : public std::enable_shared_from_this<
std::string host_name_;
pool_config pool_config_;
io_context_pool_t& io_context_pool_;
std::atomic<bool> is_alive_ = true;
};

template <typename client_t,
Expand Down
4 changes: 4 additions & 0 deletions include/ylt/standalone/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
: coro_http_client(executor->get_asio_executor()) {}

bool init_config(const config &conf) {
config_ = conf;
if (conf.conn_timeout_duration.has_value()) {
set_conn_timeout(*conf.conn_timeout_duration);
}
Expand Down Expand Up @@ -207,6 +208,8 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {

coro_io::ExecutorWrapper<> &get_executor() { return executor_wrapper_; }

const config &get_config() { return config_; }

#ifdef CINATRA_ENABLE_SSL
bool init_ssl(int verify_mode, const std::string &base_path,
const std::string &cert_file, const std::string &sni_hostname) {
Expand Down Expand Up @@ -2432,6 +2435,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::string resp_chunk_str_;
std::span<char> out_buf_;
bool should_reset_ = false;
config config_;

#ifdef CINATRA_ENABLE_GZIP
bool enable_ws_deflate_ = false;
Expand Down
107 changes: 107 additions & 0 deletions src/coro_io/tests/test_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <asio/io_context.hpp>
#include <cassert>
#include <chrono>
#include <filesystem>
#include <fstream>
#include <memory>
Expand All @@ -16,6 +17,7 @@
#include <ylt/coro_io/io_context_pool.hpp>

#include "async_simple/coro/Lazy.h"
#include "ylt/coro_io/client_pool.hpp"
#include "ylt/coro_rpc/impl/coro_rpc_client.hpp"
#include "ylt/coro_rpc/impl/default_config/coro_rpc_config.hpp"

Expand Down Expand Up @@ -184,4 +186,109 @@ TEST_CASE("test send_request config") {
}
server.stop();
}());
}

void hello() {}

TEST_CASE("test server down") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server1(1, 58801);
server1.register_handler<hello>();
auto res = server1.async_start();
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
coro_rpc::coro_rpc_server server2(1, 58802);
server2.register_handler<hello>();
auto res2 = server2.async_start();
REQUIRE_MESSAGE(!res2.hasResult(), "server start failed");
auto hosts =
std::vector<std::string_view>{"127.0.0.1:58801", "127.0.0.1:58802"};
auto config = coro_io::client_pool<coro_rpc::coro_rpc_client>::pool_config{
.connect_retry_count = 0,
.reconnect_wait_time = std::chrono::milliseconds{0},
.host_alive_detect_duration = std::chrono::milliseconds{500}};
auto channel =
coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts, {config});

for (int i = 0; i < 100; ++i) {
auto res = co_await channel.send_request(
[&i, &hosts](
coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
CHECK(host == hosts[i % 2]);
co_return;
});
CHECK(res.has_value());
}
server1.stop();
for (int i = 0; i < 100; ++i) {
auto res = co_await channel.send_request(
[&i, &hosts](
coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
co_await client.call<hello>();
if (i > 0)
CHECK(host == hosts[1]);
co_return;
});
if (i > 2)
CHECK(res.has_value());
}
server2.stop();
{
{
auto res = co_await channel.send_request(
[](coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
co_await client.call<hello>();
co_return;
});
res = co_await channel.send_request(
[](coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
co_await client.call<hello>();
co_return;
});
CHECK(!res.has_value());
}
}

coro_rpc::coro_rpc_server server3(1, 58801);
server3.register_handler<hello>();
auto res3 = server3.async_start();
REQUIRE_MESSAGE(!res3.hasResult(), "server start failed");
co_await coro_io::sleep_for(std::chrono::seconds{1});
{
for (int i = 0; i < 100; ++i) {
auto res = co_await channel.send_request(
[&i, &hosts](
coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
CHECK(host == hosts[0]);
co_return;
});
CHECK(res.has_value());
}
}
coro_rpc::coro_rpc_server server4(1, 58802);
server4.register_handler<hello>();
auto res4 = server4.async_start();
REQUIRE_MESSAGE(!res4.hasResult(), "server start failed");
co_await coro_io::sleep_for(std::chrono::seconds{1});
{
int counter = 0;
for (int i = 0; i < 100; ++i) {
auto res = co_await channel.send_request(
[&i, &hosts, &counter](
coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
if (host == hosts[1]) {
++counter;
}
co_return;
});
CHECK(res.has_value());
}
CHECK(counter == 50);
}
}());
}

0 comments on commit f7576e3

Please sign in to comment.