diff --git a/include/ylt/coro_io/client_pool.hpp b/include/ylt/coro_io/client_pool.hpp index b41b7dcac3..e7387e213c 100644 --- a/include/ylt/coro_io/client_pool.hpp +++ b/include/ylt/coro_io/client_pool.hpp @@ -41,6 +41,7 @@ #include #include +#include "async_simple/coro/Collect.h" #include "coro_io.hpp" #include "detail/client_queue.hpp" #include "io_context_pool.hpp" @@ -72,7 +73,6 @@ class client_pool : public std::enable_shared_from_this< if ((self = self_weak.lock()) == nullptr) { break; } - std::unique_ptr client; while (true) { std::size_t is_all_cleared = clients.clear_old(clear_cnt); if (is_all_cleared != 0) [[unlikely]] { @@ -97,8 +97,30 @@ class client_pool : public std::enable_shared_from_this< co_return; } - async_simple::coro::Lazy> reconnect( - std::unique_ptr client) { + struct client_connect_helper { + std::unique_ptr client; + std::weak_ptr pool_watcher; + client_connect_helper(std::unique_ptr&& client, + std::weak_ptr&& pool_watcher) + : client(std::move(client)), pool_watcher(std::move(pool_watcher)) {} + client_connect_helper(client_connect_helper&& o) + : client(std::move(o.client)), + pool_watcher(std::move(o.pool_watcher)) {} + client_connect_helper& operator=(client_connect_helper&& o) { + client = std::move(o.client); + pool_watcher = std::move(o.pool_watcher); + return *this; + } + ~client_connect_helper() { + if (client) { + if (auto pool = pool_watcher.lock(); pool) { + pool->collect_free_client(std::move(client)); + } + } + } + }; + + async_simple::coro::Lazy reconnect(std::unique_ptr& client) { auto pre_time_point = std::chrono::steady_clock::now(); bool ok = client_t::is_ok(co_await client->reconnect(host_name_)); for (int i = 0; !ok && i < pool_config_.connect_retry_count; ++i) { @@ -110,7 +132,23 @@ class client_pool : public std::enable_shared_from_this< pre_time_point = post_time_point; ok = (client_t::is_ok(co_await client->reconnect(host_name_))); } - co_return ok ? std::move(client) : nullptr; + if (!ok) { + client = nullptr; + } + } + + async_simple::coro::Lazy connect_client( + client_connect_helper helper) { + if (!client_t::is_ok(co_await helper.client->connect(host_name_))) { + co_await reconnect(helper.client); + } + co_return std::move(helper); + } + + auto rand_time() { + static thread_local std::default_random_engine r; + std::uniform_int_distribution e(-25, 25); + return std::chrono::milliseconds{100 + e(r)}; } async_simple::coro::Lazy> get_client( @@ -128,11 +166,31 @@ class client_pool : public std::enable_shared_from_this< if (!client->init_config(client_config)) { co_return nullptr; } - if (client_t::is_ok(co_await client->connect(host_name_))) { - co_return std::move(client); + auto result = co_await async_simple::coro::collectAny( + connect_client(client_connect_helper{std::move(client), + this->shared_from_this()}), + coro_io::sleep_for(rand_time())); + if (result.index() == 0) { // connect finish in 100ms + co_return std::move(std::get<0>(result).value().client); + } + else if (result.index() == 1) { // connect time cost more than 100ms + std::unique_ptr cli; + if (short_connect_clients_.try_dequeue(cli) || + free_clients_.try_dequeue(cli)) { + co_return std::move(cli); + } + else { + async_simple::Promise> promise; + promise_queue.enqueue(&promise); + if (short_connect_clients_.try_dequeue(cli) || + free_clients_.try_dequeue(cli)) { + collect_free_client(std::move(cli)); + } + co_return co_await promise.getFuture(); + } } else { - co_return co_await reconnect(std::move(client)); + co_return nullptr; } } else { @@ -148,10 +206,11 @@ class client_pool : public std::enable_shared_from_this< if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) { collect_idle_timeout_client( this->shared_from_this(), clients, - is_short_client - ? (std::min)(pool_config_.idle_timeout, - pool_config_.short_connect_idle_timeout) - : pool_config_.idle_timeout, + std::max((is_short_client + ? (std::min)(pool_config_.idle_timeout, + pool_config_.short_connect_idle_timeout) + : pool_config_.idle_timeout), + std::chrono::milliseconds{50}), pool_config_.idle_queue_per_max_clear_count) .via(coro_io::get_global_executor()) .start([](auto&&) { @@ -162,7 +221,11 @@ class client_pool : public std::enable_shared_from_this< void collect_free_client(std::unique_ptr client) { if (client && !client->has_closed()) { - if (free_clients_.size() < pool_config_.max_connection) { + async_simple::Promise>* promise = nullptr; + if (promise_queue.try_dequeue(promise)) { + promise->setValue(std::move(client)); + } + else if (free_clients_.size() < pool_config_.max_connection) { enqueue(free_clients_, std::move(client), false); } else { @@ -221,9 +284,6 @@ class client_pool : public std::enable_shared_from_this< pool_config_(pool_config), io_context_pool_(io_context_pool), free_clients_(pool_config.max_connection) { - if (pool_config_.connect_retry_count == 0) { - pool_config_.connect_retry_count = 1; - } }; client_pool(private_construct_token t, client_pools_t* pools_manager_, @@ -234,9 +294,6 @@ class client_pool : public std::enable_shared_from_this< pool_config_(pool_config), io_context_pool_(io_context_pool), free_clients_(pool_config.max_connection) { - if (pool_config_.connect_retry_count == 0) { - pool_config_.connect_retry_count = 1; - } }; template @@ -309,6 +366,8 @@ class client_pool : public std::enable_shared_from_this< coro_io::detail::client_queue> short_connect_clients_; client_pools_t* pools_manager_ = nullptr; + moodycamel::ConcurrentQueue>*> + promise_queue; async_simple::Promise idle_timeout_waiter; std::string host_name_; pool_config pool_config_;