Skip to content

Commit

Permalink
[coro_io] fix client pool slow connect bug (#657)
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle authored Apr 10, 2024
1 parent 31b6e46 commit ab0fb6b
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 223 deletions.
275 changes: 114 additions & 161 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
#include <random>
#include <shared_mutex>
#include <string_view>
#include <system_error>
#include <thread>
#include <type_traits>
#include <unordered_map>
#include <utility>
#include <ylt/util/expected.hpp>

#include "async_simple/Common.h"
#include "async_simple/coro/Collect.h"
#include "coro_io.hpp"
#include "detail/client_queue.hpp"
Expand Down Expand Up @@ -105,46 +107,14 @@ class client_pool : public std::enable_shared_from_this<
co_return;
}

struct client_connect_helper {
std::unique_ptr<client_t> client;
std::weak_ptr<client_pool> pool_watcher;
std::weak_ptr<bool> spinlock_watcher;
client_connect_helper(std::unique_ptr<client_t>&& client,
std::weak_ptr<client_pool>&& pool_watcher,
std::weak_ptr<bool>&& spinlock_watcher)
: client(std::move(client)),
pool_watcher(std::move(pool_watcher)),
spinlock_watcher(std::move(spinlock_watcher)) {}
client_connect_helper(client_connect_helper&& o)
: client(std::move(o.client)),
pool_watcher(std::move(o.pool_watcher)),
spinlock_watcher(std::move(o.spinlock_watcher)) {}
client_connect_helper& operator=(client_connect_helper&& o) {
client = std::move(o.client);
pool_watcher = std::move(o.pool_watcher);
spinlock_watcher = std::move(o.spinlock_watcher);
return *this;
}
~client_connect_helper() {
if (client) {
if (auto pool = pool_watcher.lock(); pool) {
int cnt = 0;
while (spinlock_watcher.lock()) {
std::this_thread::yield();
++cnt;
if (cnt % 10000 == 0) {
ELOG_WARN << "spinlock of client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port()
<< "}cost too much time, spin count: " << cnt;
}
}
pool->collect_free_client(std::move(client));
}
}
}
};
static auto rand_time(std::chrono::milliseconds ms) {
static thread_local std::default_random_engine r;
std::uniform_real_distribution e(0.7f, 1.3f);
return std::chrono::milliseconds{static_cast<long>(e(r) * ms.count())};
}

async_simple::coro::Lazy<void> reconnect(std::unique_ptr<client_t>& client) {
using namespace std::chrono_literals;
for (unsigned int i = 0; i < pool_config_.connect_retry_count; ++i) {
ELOG_DEBUG << "try to reconnect client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port()
Expand All @@ -164,147 +134,125 @@ class client_pool : public std::enable_shared_from_this<
ELOG_DEBUG << "reconnect client{" << client.get()
<< "} failed. If client close:{" << client->has_closed()
<< "}";
auto wait_time = pool_config_.reconnect_wait_time - cost_time;
auto wait_time = rand_time(
(pool_config_.reconnect_wait_time * (i + 1) - cost_time) / 1ms * 1ms);
if (wait_time.count() > 0)
co_await coro_io::sleep_for(wait_time);
co_await coro_io::sleep_for(wait_time, &client->get_executor());
}
ELOG_WARN << "reconnect client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port()
<< "} out of max limit, stop retry. connect failed";
client = nullptr;
}

async_simple::coro::Lazy<client_connect_helper> connect_client(
client_connect_helper helper) {
ELOG_DEBUG << "try to connect client{" << helper.client.get()
struct promise_handler {
std::atomic<bool> flag_ = false;
async_simple::Promise<std::unique_ptr<client_t>> promise_;
};

async_simple::coro::Lazy<void> connect_client(
std::unique_ptr<client_t> client, std::weak_ptr<client_pool> watcher,
std::shared_ptr<promise_handler> handler) {
ELOG_DEBUG << "try to connect client{" << client.get()
<< "} to host:" << host_name_;
auto result = co_await helper.client->connect(host_name_);
auto result = co_await client->connect(host_name_);
std::shared_ptr<client_pool> self = watcher.lock();
if (!client_t::is_ok(result)) {
ELOG_DEBUG << "connect client{" << helper.client.get() << "} to failed. ";
co_await reconnect(helper.client);
ELOG_DEBUG << "connect client{" << client.get() << "} to failed. ";
if (self) {
co_await reconnect(client);
}
}
if (helper.client) {
ELOG_DEBUG << "connect client{" << helper.client.get() << "} successful!";
if (client) {
ELOG_DEBUG << "connect client{" << client.get() << "} successful!";
}
auto has_get_connect = handler->flag_.exchange(true);
if (!has_get_connect) {
handler->promise_.setValue(std::move(client));
}
else {
auto conn_lim = std::min<unsigned>(10u, pool_config_.max_connection);
if (self && free_clients_.size() < conn_lim && client) {
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
}
}

co_return std::move(helper);
}

auto rand_time() {
static thread_local std::default_random_engine r;
std::uniform_int_distribution<int> e(-25, 25);
return std::chrono::milliseconds{100 + e(r)};
}

async_simple::coro::Lazy<std::unique_ptr<client_t>> get_client(
const typename client_t::config& client_config) {
std::unique_ptr<client_t> client;

free_clients_.try_dequeue(client);
if (!client) {
short_connect_clients_.try_dequeue(client);
}
assert(client == nullptr || !client->has_closed());
if (client == nullptr) {
client = std::make_unique<client_t>(*io_context_pool_.get_executor());
if (!client->init_config(client_config)) {
ELOG_ERROR << "init client config{" << client.get() << "} failed.";
co_return nullptr;
}
auto spinlock = std::make_shared<bool>(false);
auto client_ptr = client.get();
auto result = co_await async_simple::coro::collectAny(
connect_client(client_connect_helper{
std::move(client), this->shared_from_this(), spinlock}),
coro_io::sleep_for(rand_time()));
if (result.index() == 0) { // connect finish in 100ms
co_return std::move(std::get<0>(result).value().client);
}
else if (result.index() == 1) { // connect time cost more than 100ms
ELOG_DEBUG << "slow connection of client{" << client_ptr
<< "}, try to get free client from pool.";
std::unique_ptr<client_t> cli;
if (short_connect_clients_.try_dequeue(cli) ||
free_clients_.try_dequeue(cli)) {
spinlock = nullptr;
ELOG_DEBUG << "get free client{" << cli.get()
<< "} from pool. skip wait client{" << client_ptr
<< "} connect";
co_return std::move(cli);
std::unique_ptr<client_t> cli;
auto executor = io_context_pool_.get_executor();
client = std::make_unique<client_t>(*executor);
if (!client->init_config(client_config))
AS_UNLIKELY {
ELOG_ERROR << "init client config failed.";
co_return nullptr;
}
else {
auto promise = std::make_unique<
async_simple::Promise<std::unique_ptr<client_t>>>();
auto* promise_address = promise.get();
promise_queue.enqueue(promise_address);
spinlock = nullptr;
if (short_connect_clients_.try_dequeue(cli) ||
free_clients_.try_dequeue(cli)) {
collect_free_client(std::move(cli));
}
ELOG_DEBUG << "wait for free client waiter promise{"
<< promise_address << "} response because slow client{"
<< client_ptr << "}";

auto res = co_await collectAny(
[](auto promise)
-> async_simple::coro::Lazy<std::unique_ptr<client_t>> {
co_return co_await promise->getFuture();
}(std::move(promise)),
coro_io::sleep_for(this->pool_config_.max_connection_time));
if (res.index() == 0) {
auto& res0 = std::get<0>(res);
if (!res0.hasError()) {
auto& cli = res0.value();
ELOG_DEBUG << "get free client{" << cli.get() << "} from promise{"
<< promise_address << "}. skip wait client{"
<< client_ptr << "} connect";
co_return std::move(cli);
}
else {
ELOG_ERROR << "Unexcepted branch";
co_return nullptr;
}
}
else {
ELOG_ERROR << "Unexcepted branch. Out of max limitation of connect "
"time, connect "
"failed. skip wait client{"
<< client_ptr << "} connect. "
<< "skip wait promise {" << promise_address
<< "} response";
co_return nullptr;
auto client_ptr = client.get();
auto handler = std::make_shared<promise_handler>();
connect_client(std::move(client), this->weak_from_this(), handler)
.start([](auto&&) {
});
auto timer = std::make_shared<coro_io::period_timer>(
executor->get_asio_executor());
timer->expires_after(std::chrono::milliseconds{20});
timer->async_await().start([watcher = this->weak_from_this(), handler,
client_ptr, timer](auto&& res) {
if (res.value() && !handler->flag_) {
if (auto self = watcher.lock(); self) {
++self->promise_cnt_;
self->promise_queue_.enqueue(handler);
timer->expires_after(
(std::max)(std::chrono::milliseconds{0},
self->pool_config_.max_connection_time -
std::chrono::milliseconds{20}));
timer->async_await().start([handler = std::move(handler),
client_ptr = client_ptr](auto&& res) {
auto has_get_connect = handler->flag_.exchange(true);
if (!has_get_connect) {
ELOG_ERROR << "Out of max limitation of connect "
"time, connect "
"failed. skip wait client{"
<< client_ptr << "} connect. ";
handler->promise_.setValue(std::unique_ptr<client_t>{nullptr});
}
});
}
}
}
else {
ELOG_ERROR << "unknown collectAny index while wait client{"
<< client_ptr << "} connect";
co_return nullptr;
});
ELOG_DEBUG << "wait client by promise {" << &handler->promise_ << "}";
client = co_await handler->promise_.getFuture();
if (client) {
executor->schedule([timer] {
std::error_code ignore_ec;
timer->cancel(ignore_ec);
});
}
}
else {
ELOG_DEBUG << "get free client{" << client.get() << "}. from queue";
co_return std::move(client);
}
co_return std::move(client);
}

void enqueue(
coro_io::detail::client_queue<std::unique_ptr<client_t>>& clients,
std::unique_ptr<client_t> client, bool is_short_client) {
std::unique_ptr<client_t> client,
std::chrono::milliseconds collect_time) {
if (clients.enqueue(std::move(client)) == 1) {
std::size_t expected = 0;
if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) {
ELOG_DEBUG << "start timeout client collecter of client_pool{"
<< host_name_ << "}";
collect_idle_timeout_client(
this->shared_from_this(), clients,
(std::max)(
(is_short_client
? (std::min)(pool_config_.idle_timeout,
pool_config_.short_connect_idle_timeout)
: pool_config_.idle_timeout),
std::chrono::milliseconds{50}),
this->weak_from_this(), clients,
(std::max)(collect_time, std::chrono::milliseconds{50}),
pool_config_.idle_queue_per_max_clear_count)
.via(coro_io::get_global_executor())
.start([](auto&&) {
Expand All @@ -314,28 +262,41 @@ class client_pool : public std::enable_shared_from_this<
}

void collect_free_client(std::unique_ptr<client_t> client) {
ELOG_DEBUG << "collect free client{" << client.get() << "}";
if (client && !client->has_closed()) {
async_simple::Promise<std::unique_ptr<client_t>>* promise = nullptr;
if (promise_queue.try_dequeue(promise)) {
promise->setValue(std::move(client));
ELOG_DEBUG << "collect free client{" << client.get()
<< "} wake up promise{" << promise << "}";
if (!client->has_closed()) {
std::shared_ptr<promise_handler> handler;
if (promise_cnt_) {
int cnt = 0;
while (promise_queue_.try_dequeue(handler)) {
++cnt;
auto has_get_connect = handler->flag_.exchange(true);
if (!has_get_connect) {
handler->promise_.setValue(std::move(client));
promise_cnt_ -= cnt;
ELOG_DEBUG << "collect free client{" << client.get()
<< "} and wake up promise{" << &handler->promise_ << "}";
return;
}
}
promise_cnt_ -= cnt;
}
else if (free_clients_.size() < pool_config_.max_connection) {
ELOG_DEBUG << "collect free client{" << client.get() << "} enqueue";
enqueue(free_clients_, std::move(client), false);

if (free_clients_.size() < pool_config_.max_connection) {
if (client) {
ELOG_DEBUG << "collect free client{" << client.get() << "} enqueue";
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
}
}
else {
ELOG_DEBUG << "out of max connection limit <<"
<< pool_config_.max_connection << ", collect free client{"
<< client.get() << "} enqueue short connect queue";
enqueue(short_connect_clients_, std::move(client), true);
enqueue(short_connect_clients_, std::move(client),
pool_config_.short_connect_idle_timeout);
}
}
else {
ELOG_DEBUG << "client{" << client.get()
<< "} is nullptr or is closed. we won't collect it";
<< "} is closed. we won't collect it";
}

return;
Expand Down Expand Up @@ -489,8 +450,8 @@ class client_pool : public std::enable_shared_from_this<
coro_io::detail::client_queue<std::unique_ptr<client_t>>
short_connect_clients_;
client_pools_t* pools_manager_ = nullptr;
moodycamel::ConcurrentQueue<async_simple::Promise<std::unique_ptr<client_t>>*>
promise_queue;
std::atomic<int> promise_cnt_ = 0;
moodycamel::ConcurrentQueue<std::shared_ptr<promise_handler>> promise_queue_;
async_simple::Promise<async_simple::Unit> idle_timeout_waiter;
std::string host_name_;
pool_config pool_config_;
Expand Down Expand Up @@ -558,14 +519,6 @@ class client_pools {
iter->second = pool;
}
}
if (has_inserted) {
ELOG_DEBUG << "add new client pool of {" << host_name
<< "} to hash table";
}
else {
ELOG_DEBUG << "add new client pool of {" << host_name
<< "} failed, element existed.";
}
}
return iter->second;
}
Expand Down
Loading

0 comments on commit ab0fb6b

Please sign in to comment.