diff --git a/include/ylt/coro_io/client_pool.hpp b/include/ylt/coro_io/client_pool.hpp index b41b7dcac..c0a758c86 100644 --- a/include/ylt/coro_io/client_pool.hpp +++ b/include/ylt/coro_io/client_pool.hpp @@ -36,14 +36,17 @@ #include #include #include +#include #include #include #include #include +#include "async_simple/coro/Collect.h" #include "coro_io.hpp" #include "detail/client_queue.hpp" #include "io_context_pool.hpp" +#include "ylt/easylog.hpp" namespace coro_io { template @@ -72,7 +75,6 @@ class client_pool : public std::enable_shared_from_this< if ((self = self_weak.lock()) == nullptr) { break; } - std::unique_ptr client; while (true) { std::size_t is_all_cleared = clients.clear_old(clear_cnt); if (is_all_cleared != 0) [[unlikely]] { @@ -97,11 +99,47 @@ class client_pool : public std::enable_shared_from_this< co_return; } - async_simple::coro::Lazy> reconnect( - std::unique_ptr client) { + struct client_connect_helper { + std::unique_ptr client; + std::weak_ptr pool_watcher; + std::weak_ptr spinlock_watcher; + client_connect_helper(std::unique_ptr&& client, + std::weak_ptr&& pool_watcher, + std::weak_ptr&& spinlock_watcher) + : client(std::move(client)), + pool_watcher(std::move(pool_watcher)), + spinlock_watcher(std::move(spinlock_watcher)) {} + client_connect_helper(client_connect_helper&& o) + : client(std::move(o.client)), + pool_watcher(std::move(o.pool_watcher)), + spinlock_watcher(std::move(o.spinlock_watcher)) {} + client_connect_helper& operator=(client_connect_helper&& o) { + client = std::move(o.client); + pool_watcher = std::move(o.pool_watcher); + spinlock_watcher = std::move(o.spinlock_watcher); + return *this; + } + ~client_connect_helper() { + if (client) { + if (auto pool = pool_watcher.lock(); pool) { + int cnt = 0; + while (spinlock_watcher.lock()) { + std::this_thread::yield(); + ++cnt; + if (cnt % 10000 == 0) { + ELOGV(WARN, "spinlock cost too much time, spin count:%d", cnt); + } + } + pool->collect_free_client(std::move(client)); + } + } + } + }; + + async_simple::coro::Lazy reconnect(std::unique_ptr& client) { auto pre_time_point = std::chrono::steady_clock::now(); bool ok = client_t::is_ok(co_await client->reconnect(host_name_)); - for (int i = 0; !ok && i < pool_config_.connect_retry_count; ++i) { + for (unsigned int i = 0; !ok && i < pool_config_.connect_retry_count; ++i) { auto post_time_point = std::chrono::steady_clock::now(); auto wait_time = pool_config_.reconnect_wait_time - (post_time_point - pre_time_point); @@ -110,7 +148,24 @@ class client_pool : public std::enable_shared_from_this< pre_time_point = post_time_point; ok = (client_t::is_ok(co_await client->reconnect(host_name_))); } - co_return ok ? std::move(client) : nullptr; + if (!ok) { + client = nullptr; + } + } + + async_simple::coro::Lazy connect_client( + client_connect_helper helper) { + auto result = co_await helper.client->connect(host_name_); + if (!client_t::is_ok(result)) { + co_await reconnect(helper.client); + } + co_return std::move(helper); + } + + auto rand_time() { + static thread_local std::default_random_engine r; + std::uniform_int_distribution e(-25, 25); + return std::chrono::milliseconds{100 + e(r)}; } async_simple::coro::Lazy> get_client( @@ -128,11 +183,34 @@ class client_pool : public std::enable_shared_from_this< if (!client->init_config(client_config)) { co_return nullptr; } - if (client_t::is_ok(co_await client->connect(host_name_))) { - co_return std::move(client); + auto spinlock = std::make_shared(false); + auto result = co_await async_simple::coro::collectAny( + connect_client(client_connect_helper{ + std::move(client), this->shared_from_this(), spinlock}), + coro_io::sleep_for(rand_time())); + if (result.index() == 0) { // connect finish in 100ms + co_return std::move(std::get<0>(result).value().client); + } + else if (result.index() == 1) { // connect time cost more than 100ms + std::unique_ptr cli; + if (short_connect_clients_.try_dequeue(cli) || + free_clients_.try_dequeue(cli)) { + spinlock = nullptr; + co_return std::move(cli); + } + else { + async_simple::Promise> promise; + promise_queue.enqueue(&promise); + spinlock = nullptr; + if (short_connect_clients_.try_dequeue(cli) || + free_clients_.try_dequeue(cli)) { + collect_free_client(std::move(cli)); + } + co_return co_await promise.getFuture(); + } } else { - co_return co_await reconnect(std::move(client)); + co_return nullptr; } } else { @@ -148,10 +226,12 @@ class client_pool : public std::enable_shared_from_this< if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) { collect_idle_timeout_client( this->shared_from_this(), clients, - is_short_client - ? (std::min)(pool_config_.idle_timeout, - pool_config_.short_connect_idle_timeout) - : pool_config_.idle_timeout, + (std::max)( + (is_short_client + ? (std::min)(pool_config_.idle_timeout, + pool_config_.short_connect_idle_timeout) + : pool_config_.idle_timeout), + std::chrono::milliseconds{50}), pool_config_.idle_queue_per_max_clear_count) .via(coro_io::get_global_executor()) .start([](auto&&) { @@ -162,7 +242,11 @@ class client_pool : public std::enable_shared_from_this< void collect_free_client(std::unique_ptr client) { if (client && !client->has_closed()) { - if (free_clients_.size() < pool_config_.max_connection) { + async_simple::Promise>* promise = nullptr; + if (promise_queue.try_dequeue(promise)) { + promise->setValue(std::move(client)); + } + else if (free_clients_.size() < pool_config_.max_connection) { enqueue(free_clients_, std::move(client), false); } else { @@ -220,11 +304,7 @@ class client_pool : public std::enable_shared_from_this< : host_name_(host_name), pool_config_(pool_config), io_context_pool_(io_context_pool), - free_clients_(pool_config.max_connection) { - if (pool_config_.connect_retry_count == 0) { - pool_config_.connect_retry_count = 1; - } - }; + free_clients_(pool_config.max_connection){}; client_pool(private_construct_token t, client_pools_t* pools_manager_, std::string_view host_name, const pool_config& pool_config, @@ -233,11 +313,7 @@ class client_pool : public std::enable_shared_from_this< host_name_(host_name), pool_config_(pool_config), io_context_pool_(io_context_pool), - free_clients_(pool_config.max_connection) { - if (pool_config_.connect_retry_count == 0) { - pool_config_.connect_retry_count = 1; - } - }; + free_clients_(pool_config.max_connection){}; template async_simple::coro::Lazy> send_request( @@ -309,6 +385,8 @@ class client_pool : public std::enable_shared_from_this< coro_io::detail::client_queue> short_connect_clients_; client_pools_t* pools_manager_ = nullptr; + moodycamel::ConcurrentQueue>*> + promise_queue; async_simple::Promise idle_timeout_waiter; std::string host_name_; pool_config pool_config_; diff --git a/include/ylt/coro_io/coro_io.hpp b/include/ylt/coro_io/coro_io.hpp index 98477a3b7..e20f94b59 100644 --- a/include/ylt/coro_io/coro_io.hpp +++ b/include/ylt/coro_io/coro_io.hpp @@ -271,7 +271,7 @@ inline async_simple::coro::Lazy sleep_for(const Duration &d, co_await timer.async_await(); } template -inline async_simple::coro::Lazy sleep_for(const Duration &d) { +inline async_simple::coro::Lazy sleep_for(Duration d) { if (auto executor = co_await async_simple::CurrentExecutor(); executor != nullptr) { co_await async_simple::coro::sleep(d); diff --git a/include/ylt/coro_io/detail/client_queue.hpp b/include/ylt/coro_io/detail/client_queue.hpp index 42ef53bc9..06bb6aab2 100644 --- a/include/ylt/coro_io/detail/client_queue.hpp +++ b/include/ylt/coro_io/detail/client_queue.hpp @@ -15,7 +15,6 @@ */ #pragma once #include -#include #include "ylt/util/concurrentqueue.h" namespace coro_io::detail { @@ -49,9 +48,7 @@ class client_queue { : queue_{moodycamel::ConcurrentQueue{reserve_size}, moodycamel::ConcurrentQueue{reserve_size}} {}; std::size_t size() const noexcept { return size_[0] + size_[1]; } - void reselect() noexcept { - const int_fast16_t index = (selected_index_ ^= 1); - } + void reselect() noexcept { selected_index_ ^= 1; } std::size_t enqueue(client_t&& c) { const int_fast16_t index = selected_index_; auto cnt = ++size_[index]; @@ -77,7 +74,7 @@ class client_queue { } return false; } - std::size_t clear_old(int max_clear_cnt) { + std::size_t clear_old(std::size_t max_clear_cnt) { const int_fast16_t index = selected_index_ ^ 1; if (size_[index]) { std::size_t result = diff --git a/include/ylt/coro_io/io_context_pool.hpp b/include/ylt/coro_io/io_context_pool.hpp index 2f9cd5cc2..a740b0392 100644 --- a/include/ylt/coro_io/io_context_pool.hpp +++ b/include/ylt/coro_io/io_context_pool.hpp @@ -193,7 +193,7 @@ class multithread_context_pool { ~multithread_context_pool() { stop(); } void run() { - for (int i = 0; i < thd_num_; i++) { + for (std::size_t i = 0; i < thd_num_; i++) { thds_.emplace_back([this] { ioc_.run(); }); diff --git a/include/ylt/util/meta_string.hpp b/include/ylt/util/meta_string.hpp index ecd53f05b..9b3bad9e6 100644 --- a/include/ylt/util/meta_string.hpp +++ b/include/ylt/util/meta_string.hpp @@ -130,7 +130,7 @@ struct meta_string { constexpr size_t n = substr_len(pos, count); meta_string result; - for (int i = 0; i < n; ++i) { + for (std::size_t i = 0; i < n; ++i) { result[i] = elements_[pos + i]; } return result; diff --git a/src/coro_io/tests/test_channel.cpp b/src/coro_io/tests/test_channel.cpp index 964256266..836e049e4 100644 --- a/src/coro_io/tests/test_channel.cpp +++ b/src/coro_io/tests/test_channel.cpp @@ -79,9 +79,8 @@ TEST_CASE("test single host") { auto channel = coro_io::channel::create(hosts); for (int i = 0; i < 100; ++i) { auto res = co_await channel.send_request( - [&i, &hosts]( - coro_rpc::coro_rpc_client &client, - std::string_view host) -> async_simple::coro::Lazy { + [&hosts](coro_rpc::coro_rpc_client &client, + std::string_view host) -> async_simple::coro::Lazy { CHECK(host == hosts[0]); co_return; });