Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coro_io] fix client pool slow connect bug #657

Merged
merged 4 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 116 additions & 155 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <utility>
#include <ylt/util/expected.hpp>

#include "async_simple/Common.h"
#include "async_simple/coro/Collect.h"
#include "coro_io.hpp"
#include "detail/client_queue.hpp"
Expand Down Expand Up @@ -105,46 +106,14 @@ class client_pool : public std::enable_shared_from_this<
co_return;
}

struct client_connect_helper {
std::unique_ptr<client_t> client;
std::weak_ptr<client_pool> pool_watcher;
std::weak_ptr<bool> spinlock_watcher;
client_connect_helper(std::unique_ptr<client_t>&& client,
std::weak_ptr<client_pool>&& pool_watcher,
std::weak_ptr<bool>&& spinlock_watcher)
: client(std::move(client)),
pool_watcher(std::move(pool_watcher)),
spinlock_watcher(std::move(spinlock_watcher)) {}
client_connect_helper(client_connect_helper&& o)
: client(std::move(o.client)),
pool_watcher(std::move(o.pool_watcher)),
spinlock_watcher(std::move(o.spinlock_watcher)) {}
client_connect_helper& operator=(client_connect_helper&& o) {
client = std::move(o.client);
pool_watcher = std::move(o.pool_watcher);
spinlock_watcher = std::move(o.spinlock_watcher);
return *this;
}
~client_connect_helper() {
if (client) {
if (auto pool = pool_watcher.lock(); pool) {
int cnt = 0;
while (spinlock_watcher.lock()) {
std::this_thread::yield();
++cnt;
if (cnt % 10000 == 0) {
ELOG_WARN << "spinlock of client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port()
<< "}cost too much time, spin count: " << cnt;
}
}
pool->collect_free_client(std::move(client));
}
}
}
};
static auto rand_time(std::chrono::milliseconds ms) {
static thread_local std::default_random_engine r;
std::uniform_real_distribution e(0.7f, 1.3f);
return std::chrono::milliseconds{static_cast<long>(e(r) * ms.count())};
}

async_simple::coro::Lazy<void> reconnect(std::unique_ptr<client_t>& client) {
using namespace std::chrono_literals;
for (unsigned int i = 0; i < pool_config_.connect_retry_count; ++i) {
ELOG_DEBUG << "try to reconnect client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port()
Expand All @@ -164,147 +133,126 @@ class client_pool : public std::enable_shared_from_this<
ELOG_DEBUG << "reconnect client{" << client.get()
<< "} failed. If client close:{" << client->has_closed()
<< "}";
auto wait_time = pool_config_.reconnect_wait_time - cost_time;
auto wait_time = rand_time(
(pool_config_.reconnect_wait_time * (i + 1) - cost_time) / 1ms * 1ms);
if (wait_time.count() > 0)
co_await coro_io::sleep_for(wait_time);
co_await coro_io::sleep_for(wait_time, &client->get_executor());
}
ELOG_WARN << "reconnect client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port()
<< "} out of max limit, stop retry. connect failed";
client = nullptr;
}

async_simple::coro::Lazy<client_connect_helper> connect_client(
client_connect_helper helper) {
ELOG_DEBUG << "try to connect client{" << helper.client.get()
struct promise_handler {
std::atomic<bool> flag_ = false;
async_simple::Promise<std::unique_ptr<client_t>> promise_;
};

async_simple::coro::Lazy<void> connect_client(
std::unique_ptr<client_t> client, std::weak_ptr<client_pool> watcher,
std::shared_ptr<promise_handler> handler) {
ELOG_DEBUG << "try to connect client{" << client.get()
<< "} to host:" << host_name_;
auto result = co_await helper.client->connect(host_name_);
auto result = co_await client->connect(host_name_);
std::shared_ptr<client_pool> self = watcher.lock();
if (!client_t::is_ok(result)) {
ELOG_DEBUG << "connect client{" << helper.client.get() << "} to failed. ";
co_await reconnect(helper.client);
ELOG_DEBUG << "connect client{" << client.get() << "} to failed. ";
if (self) {
co_await reconnect(client);
}
}
if (helper.client) {
ELOG_DEBUG << "connect client{" << helper.client.get() << "} successful!";
if (client) {
ELOG_DEBUG << "connect client{" << client.get() << "} successful!";
}
auto has_response = handler->flag_.exchange(true);
if (!has_response) {
handler->promise_.setValue(std::move(client));
}
else {
auto conn_lim = std::min<unsigned>(10u, pool_config_.max_connection);
if (self && free_clients_.size() < conn_lim && client) {
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
}
}

co_return std::move(helper);
}

auto rand_time() {
static thread_local std::default_random_engine r;
std::uniform_int_distribution<int> e(-25, 25);
return std::chrono::milliseconds{100 + e(r)};
}

async_simple::coro::Lazy<std::unique_ptr<client_t>> get_client(
const typename client_t::config& client_config) {
std::unique_ptr<client_t> client;

free_clients_.try_dequeue(client);
if (!client) {
short_connect_clients_.try_dequeue(client);
}
assert(client == nullptr || !client->has_closed());
if (client == nullptr) {
client = std::make_unique<client_t>(*io_context_pool_.get_executor());
if (!client->init_config(client_config)) {
ELOG_ERROR << "init client config{" << client.get() << "} failed.";
co_return nullptr;
}
auto spinlock = std::make_shared<bool>(false);
auto client_ptr = client.get();
auto result = co_await async_simple::coro::collectAny(
connect_client(client_connect_helper{
std::move(client), this->shared_from_this(), spinlock}),
coro_io::sleep_for(rand_time()));
if (result.index() == 0) { // connect finish in 100ms
co_return std::move(std::get<0>(result).value().client);
}
else if (result.index() == 1) { // connect time cost more than 100ms
ELOG_DEBUG << "slow connection of client{" << client_ptr
<< "}, try to get free client from pool.";
std::unique_ptr<client_t> cli;
if (short_connect_clients_.try_dequeue(cli) ||
free_clients_.try_dequeue(cli)) {
spinlock = nullptr;
ELOG_DEBUG << "get free client{" << cli.get()
<< "} from pool. skip wait client{" << client_ptr
<< "} connect";
co_return std::move(cli);
std::unique_ptr<client_t> cli;
auto executor = io_context_pool_.get_executor();
client = std::make_unique<client_t>(*executor);
if (!client->init_config(client_config))
AS_UNLIKELY {
ELOG_ERROR << "init client config failed.";
co_return nullptr;
}
else {
auto promise = std::make_unique<
async_simple::Promise<std::unique_ptr<client_t>>>();
auto* promise_address = promise.get();
promise_queue.enqueue(promise_address);
spinlock = nullptr;
if (short_connect_clients_.try_dequeue(cli) ||
free_clients_.try_dequeue(cli)) {
collect_free_client(std::move(cli));
}
ELOG_DEBUG << "wait for free client waiter promise{"
<< promise_address << "} response because slow client{"
<< client_ptr << "}";

auto res = co_await collectAny(
[](auto promise)
-> async_simple::coro::Lazy<std::unique_ptr<client_t>> {
co_return co_await promise->getFuture();
}(std::move(promise)),
coro_io::sleep_for(this->pool_config_.max_connection_time));
if (res.index() == 0) {
auto& res0 = std::get<0>(res);
if (!res0.hasError()) {
auto& cli = res0.value();
ELOG_DEBUG << "get free client{" << cli.get() << "} from promise{"
<< promise_address << "}. skip wait client{"
<< client_ptr << "} connect";
co_return std::move(cli);
}
else {
ELOG_ERROR << "Unexcepted branch";
co_return nullptr;
}
}
else {
ELOG_ERROR << "Unexcepted branch. Out of max limitation of connect "
"time, connect "
"failed. skip wait client{"
<< client_ptr << "} connect. "
<< "skip wait promise {" << promise_address
<< "} response";
co_return nullptr;
auto client_ptr = client.get();
auto handler = std::make_shared<promise_handler>();
connect_client(std::move(client), this->weak_from_this(), handler)
.start([](auto&&) {
});
auto timer = std::make_shared<coro_io::period_timer>(
executor->get_asio_executor());
timer->expires_after(std::chrono::milliseconds{20});
timer->async_await().start([watcher = this->weak_from_this(), handler,
client_ptr, timer](auto&& res) {
if (/*REMOVE?*/ res.value() && !handler->flag_) {
poor-circle marked this conversation as resolved.
Show resolved Hide resolved
if (auto self = watcher.lock(); self) {
++self->promise_cnt_;
self->promise_queue_.enqueue(handler);
timer->expires_after(
(std::max)(std::chrono::milliseconds{0},
self->pool_config_.max_connection_time -
std::chrono::milliseconds{20}));
timer->async_await().start([handler = std::move(handler),
client_ptr = client_ptr](auto&& res) {
auto has_response = handler->flag_.exchange(true);
if (!has_response) {
ELOG_ERROR << "Out of max limitation of connect "
"time, connect "
"failed. skip wait client{"
<< client_ptr << "} connect. ";
handler->promise_.setValue(std::unique_ptr<client_t>{nullptr});
}
});
}
}
}
else {
ELOG_ERROR << "unknown collectAny index while wait client{"
<< client_ptr << "} connect";
co_return nullptr;
});
ELOG_DEBUG << "wait client by promise {" << &handler->promise_ << "}";
client = co_await handler->promise_.getFuture();
;
poor-circle marked this conversation as resolved.
Show resolved Hide resolved
// REMOVE?
poor-circle marked this conversation as resolved.
Show resolved Hide resolved
if (client) {
executor->schedule([timer] {
timer->cancel();
poor-circle marked this conversation as resolved.
Show resolved Hide resolved
});
}
}
else {
ELOG_DEBUG << "get free client{" << client.get() << "}. from queue";
co_return std::move(client);
}
co_return std::move(client);
}

void enqueue(
coro_io::detail::client_queue<std::unique_ptr<client_t>>& clients,
std::unique_ptr<client_t> client, bool is_short_client) {
std::unique_ptr<client_t> client,
std::chrono::milliseconds collect_time) {
if (clients.enqueue(std::move(client)) == 1) {
std::size_t expected = 0;
if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) {
ELOG_DEBUG << "start timeout client collecter of client_pool{"
<< host_name_ << "}";
collect_idle_timeout_client(
this->shared_from_this(), clients,
(std::max)(
(is_short_client
? (std::min)(pool_config_.idle_timeout,
pool_config_.short_connect_idle_timeout)
: pool_config_.idle_timeout),
std::chrono::milliseconds{50}),
this->weak_from_this(), clients,
(std::max)(collect_time, std::chrono::milliseconds{50}),
pool_config_.idle_queue_per_max_clear_count)
.via(coro_io::get_global_executor())
.start([](auto&&) {
Expand All @@ -314,28 +262,41 @@ class client_pool : public std::enable_shared_from_this<
}

void collect_free_client(std::unique_ptr<client_t> client) {
ELOG_DEBUG << "collect free client{" << client.get() << "}";
if (client && !client->has_closed()) {
async_simple::Promise<std::unique_ptr<client_t>>* promise = nullptr;
if (promise_queue.try_dequeue(promise)) {
promise->setValue(std::move(client));
ELOG_DEBUG << "collect free client{" << client.get()
<< "} wake up promise{" << promise << "}";
if (!client || !client->has_closed()) {
poor-circle marked this conversation as resolved.
Show resolved Hide resolved
poor-circle marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<promise_handler> handler;
if (promise_cnt_) {
int cnt = 0;
while (promise_queue_.try_dequeue(handler)) {
++cnt;
auto is_time_out = handler->flag_.exchange(true);
poor-circle marked this conversation as resolved.
Show resolved Hide resolved
if (!is_time_out) {
handler->promise_.setValue(std::move(client));
promise_cnt_ -= cnt;
ELOG_DEBUG << "collect free client{" << client.get()
<< "} and wake up promise{" << &handler->promise_ << "}";
return;
}
}
promise_cnt_ -= cnt;
}
else if (free_clients_.size() < pool_config_.max_connection) {
ELOG_DEBUG << "collect free client{" << client.get() << "} enqueue";
enqueue(free_clients_, std::move(client), false);

if (free_clients_.size() < pool_config_.max_connection) {
if (client) {
ELOG_DEBUG << "collect free client{" << client.get() << "} enqueue";
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
}
}
else {
ELOG_DEBUG << "out of max connection limit <<"
<< pool_config_.max_connection << ", collect free client{"
<< client.get() << "} enqueue short connect queue";
enqueue(short_connect_clients_, std::move(client), true);
enqueue(short_connect_clients_, std::move(client),
pool_config_.short_connect_idle_timeout);
}
}
else {
ELOG_DEBUG << "client{" << client.get()
<< "} is nullptr or is closed. we won't collect it";
<< "} is closed. we won't collect it";
}

return;
Expand Down Expand Up @@ -489,8 +450,8 @@ class client_pool : public std::enable_shared_from_this<
coro_io::detail::client_queue<std::unique_ptr<client_t>>
short_connect_clients_;
client_pools_t* pools_manager_ = nullptr;
moodycamel::ConcurrentQueue<async_simple::Promise<std::unique_ptr<client_t>>*>
promise_queue;
std::atomic<int> promise_cnt_ = 0;
moodycamel::ConcurrentQueue<std::shared_ptr<promise_handler>> promise_queue_;
async_simple::Promise<async_simple::Unit> idle_timeout_waiter;
std::string host_name_;
pool_config pool_config_;
Expand Down Expand Up @@ -559,8 +520,8 @@ class client_pools {
}
}
if (has_inserted) {
ELOG_DEBUG << "add new client pool of {" << host_name
<< "} to hash table";
// ELOG_DEBUG << "add new client pool of {" << host_name
poor-circle marked this conversation as resolved.
Show resolved Hide resolved
// << "} to hash table";
}
else {
ELOG_DEBUG << "add new client pool of {" << host_name
Expand Down
Loading
Loading