Skip to content

Commit

Permalink
[coro_io][feat][bugfix] enhance reconnect wait time & fix coro_io::sl…
Browse files Browse the repository at this point in the history
…eep_for (#403)

* [coro_io] fix lifetime of duration for coro_io::sleep

* [coro_io] enhance reconnect

* [coro_io] make sure connect return after promise enqueue
  • Loading branch information
poor-circle authored Aug 11, 2023
1 parent b2bc8c8 commit e07ca47
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 34 deletions.
124 changes: 101 additions & 23 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@
#include <random>
#include <shared_mutex>
#include <string_view>
#include <thread>
#include <type_traits>
#include <unordered_map>
#include <utility>
#include <ylt/util/expected.hpp>

#include "async_simple/coro/Collect.h"
#include "coro_io.hpp"
#include "detail/client_queue.hpp"
#include "io_context_pool.hpp"
#include "ylt/easylog.hpp"
namespace coro_io {

template <typename client_t, typename io_context_pool_t>
Expand Down Expand Up @@ -72,7 +75,6 @@ class client_pool : public std::enable_shared_from_this<
if ((self = self_weak.lock()) == nullptr) {
break;
}
std::unique_ptr<client_t> client;
while (true) {
std::size_t is_all_cleared = clients.clear_old(clear_cnt);
if (is_all_cleared != 0) [[unlikely]] {
Expand All @@ -97,11 +99,47 @@ class client_pool : public std::enable_shared_from_this<
co_return;
}

async_simple::coro::Lazy<std::unique_ptr<client_t>> reconnect(
std::unique_ptr<client_t> client) {
struct client_connect_helper {
std::unique_ptr<client_t> client;
std::weak_ptr<client_pool> pool_watcher;
std::weak_ptr<bool> spinlock_watcher;
client_connect_helper(std::unique_ptr<client_t>&& client,
std::weak_ptr<client_pool>&& pool_watcher,
std::weak_ptr<bool>&& spinlock_watcher)
: client(std::move(client)),
pool_watcher(std::move(pool_watcher)),
spinlock_watcher(std::move(spinlock_watcher)) {}
client_connect_helper(client_connect_helper&& o)
: client(std::move(o.client)),
pool_watcher(std::move(o.pool_watcher)),
spinlock_watcher(std::move(o.spinlock_watcher)) {}
client_connect_helper& operator=(client_connect_helper&& o) {
client = std::move(o.client);
pool_watcher = std::move(o.pool_watcher);
spinlock_watcher = std::move(o.spinlock_watcher);
return *this;
}
~client_connect_helper() {
if (client) {
if (auto pool = pool_watcher.lock(); pool) {
int cnt = 0;
while (spinlock_watcher.lock()) {
std::this_thread::yield();
++cnt;
if (cnt % 10000 == 0) {
ELOGV(WARN, "spinlock cost too much time, spin count:%d", cnt);
}
}
pool->collect_free_client(std::move(client));
}
}
}
};

async_simple::coro::Lazy<void> reconnect(std::unique_ptr<client_t>& client) {
auto pre_time_point = std::chrono::steady_clock::now();
bool ok = client_t::is_ok(co_await client->reconnect(host_name_));
for (int i = 0; !ok && i < pool_config_.connect_retry_count; ++i) {
for (unsigned int i = 0; !ok && i < pool_config_.connect_retry_count; ++i) {
auto post_time_point = std::chrono::steady_clock::now();
auto wait_time =
pool_config_.reconnect_wait_time - (post_time_point - pre_time_point);
Expand All @@ -110,7 +148,24 @@ class client_pool : public std::enable_shared_from_this<
pre_time_point = post_time_point;
ok = (client_t::is_ok(co_await client->reconnect(host_name_)));
}
co_return ok ? std::move(client) : nullptr;
if (!ok) {
client = nullptr;
}
}

async_simple::coro::Lazy<client_connect_helper> connect_client(
client_connect_helper helper) {
auto result = co_await helper.client->connect(host_name_);
if (!client_t::is_ok(result)) {
co_await reconnect(helper.client);
}
co_return std::move(helper);
}

auto rand_time() {
static thread_local std::default_random_engine r;
std::uniform_int_distribution<int> e(-25, 25);
return std::chrono::milliseconds{100 + e(r)};
}

async_simple::coro::Lazy<std::unique_ptr<client_t>> get_client(
Expand All @@ -128,11 +183,34 @@ class client_pool : public std::enable_shared_from_this<
if (!client->init_config(client_config)) {
co_return nullptr;
}
if (client_t::is_ok(co_await client->connect(host_name_))) {
co_return std::move(client);
auto spinlock = std::make_shared<bool>(false);
auto result = co_await async_simple::coro::collectAny(
connect_client(client_connect_helper{
std::move(client), this->shared_from_this(), spinlock}),
coro_io::sleep_for(rand_time()));
if (result.index() == 0) { // connect finish in 100ms
co_return std::move(std::get<0>(result).value().client);
}
else if (result.index() == 1) { // connect time cost more than 100ms
std::unique_ptr<client_t> cli;
if (short_connect_clients_.try_dequeue(cli) ||
free_clients_.try_dequeue(cli)) {
spinlock = nullptr;
co_return std::move(cli);
}
else {
async_simple::Promise<std::unique_ptr<client_t>> promise;
promise_queue.enqueue(&promise);
spinlock = nullptr;
if (short_connect_clients_.try_dequeue(cli) ||
free_clients_.try_dequeue(cli)) {
collect_free_client(std::move(cli));
}
co_return co_await promise.getFuture();
}
}
else {
co_return co_await reconnect(std::move(client));
co_return nullptr;
}
}
else {
Expand All @@ -148,10 +226,12 @@ class client_pool : public std::enable_shared_from_this<
if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) {
collect_idle_timeout_client(
this->shared_from_this(), clients,
is_short_client
? (std::min)(pool_config_.idle_timeout,
pool_config_.short_connect_idle_timeout)
: pool_config_.idle_timeout,
(std::max)(
(is_short_client
? (std::min)(pool_config_.idle_timeout,
pool_config_.short_connect_idle_timeout)
: pool_config_.idle_timeout),
std::chrono::milliseconds{50}),
pool_config_.idle_queue_per_max_clear_count)
.via(coro_io::get_global_executor())
.start([](auto&&) {
Expand All @@ -162,7 +242,11 @@ class client_pool : public std::enable_shared_from_this<

void collect_free_client(std::unique_ptr<client_t> client) {
if (client && !client->has_closed()) {
if (free_clients_.size() < pool_config_.max_connection) {
async_simple::Promise<std::unique_ptr<client_t>>* promise = nullptr;
if (promise_queue.try_dequeue(promise)) {
promise->setValue(std::move(client));
}
else if (free_clients_.size() < pool_config_.max_connection) {
enqueue(free_clients_, std::move(client), false);
}
else {
Expand Down Expand Up @@ -220,11 +304,7 @@ class client_pool : public std::enable_shared_from_this<
: host_name_(host_name),
pool_config_(pool_config),
io_context_pool_(io_context_pool),
free_clients_(pool_config.max_connection) {
if (pool_config_.connect_retry_count == 0) {
pool_config_.connect_retry_count = 1;
}
};
free_clients_(pool_config.max_connection){};

client_pool(private_construct_token t, client_pools_t* pools_manager_,
std::string_view host_name, const pool_config& pool_config,
Expand All @@ -233,11 +313,7 @@ class client_pool : public std::enable_shared_from_this<
host_name_(host_name),
pool_config_(pool_config),
io_context_pool_(io_context_pool),
free_clients_(pool_config.max_connection) {
if (pool_config_.connect_retry_count == 0) {
pool_config_.connect_retry_count = 1;
}
};
free_clients_(pool_config.max_connection){};

template <typename T>
async_simple::coro::Lazy<return_type<T>> send_request(
Expand Down Expand Up @@ -309,6 +385,8 @@ class client_pool : public std::enable_shared_from_this<
coro_io::detail::client_queue<std::unique_ptr<client_t>>
short_connect_clients_;
client_pools_t* pools_manager_ = nullptr;
moodycamel::ConcurrentQueue<async_simple::Promise<std::unique_ptr<client_t>>*>
promise_queue;
async_simple::Promise<async_simple::Unit> idle_timeout_waiter;
std::string host_name_;
pool_config pool_config_;
Expand Down
2 changes: 1 addition & 1 deletion include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ inline async_simple::coro::Lazy<void> sleep_for(const Duration &d,
co_await timer.async_await();
}
template <typename Duration>
inline async_simple::coro::Lazy<void> sleep_for(const Duration &d) {
inline async_simple::coro::Lazy<void> sleep_for(Duration d) {
if (auto executor = co_await async_simple::CurrentExecutor();
executor != nullptr) {
co_await async_simple::coro::sleep(d);
Expand Down
7 changes: 2 additions & 5 deletions include/ylt/coro_io/detail/client_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
#pragma once
#include <atomic>
#include <iterator>

#include "ylt/util/concurrentqueue.h"
namespace coro_io::detail {
Expand Down Expand Up @@ -49,9 +48,7 @@ class client_queue {
: queue_{moodycamel::ConcurrentQueue<client_t>{reserve_size},
moodycamel::ConcurrentQueue<client_t>{reserve_size}} {};
std::size_t size() const noexcept { return size_[0] + size_[1]; }
void reselect() noexcept {
const int_fast16_t index = (selected_index_ ^= 1);
}
void reselect() noexcept { selected_index_ ^= 1; }
std::size_t enqueue(client_t&& c) {
const int_fast16_t index = selected_index_;
auto cnt = ++size_[index];
Expand All @@ -77,7 +74,7 @@ class client_queue {
}
return false;
}
std::size_t clear_old(int max_clear_cnt) {
std::size_t clear_old(std::size_t max_clear_cnt) {
const int_fast16_t index = selected_index_ ^ 1;
if (size_[index]) {
std::size_t result =
Expand Down
2 changes: 1 addition & 1 deletion include/ylt/coro_io/io_context_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class multithread_context_pool {
~multithread_context_pool() { stop(); }

void run() {
for (int i = 0; i < thd_num_; i++) {
for (std::size_t i = 0; i < thd_num_; i++) {
thds_.emplace_back([this] {
ioc_.run();
});
Expand Down
2 changes: 1 addition & 1 deletion include/ylt/util/meta_string.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ struct meta_string {
constexpr size_t n = substr_len(pos, count);

meta_string<n> result;
for (int i = 0; i < n; ++i) {
for (std::size_t i = 0; i < n; ++i) {
result[i] = elements_[pos + i];
}
return result;
Expand Down
5 changes: 2 additions & 3 deletions src/coro_io/tests/test_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ TEST_CASE("test single host") {
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts);
for (int i = 0; i < 100; ++i) {
auto res = co_await channel.send_request(
[&i, &hosts](
coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
[&hosts](coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
CHECK(host == hosts[0]);
co_return;
});
Expand Down

0 comments on commit e07ca47

Please sign in to comment.