diff --git a/include/cinatra/ylt/coro_io/client_pool.hpp b/include/cinatra/ylt/coro_io/client_pool.hpp index d798ff1e..0bb82b59 100644 --- a/include/cinatra/ylt/coro_io/client_pool.hpp +++ b/include/cinatra/ylt/coro_io/client_pool.hpp @@ -77,8 +77,9 @@ class client_pool : public std::enable_shared_from_this< break; } while (true) { - std::cout << "start collect timeout client of pool{" << self->host_name_ - << "}, now client count: " << clients.size(); + CINATRA_LOG_DEBUG << "start collect timeout client of pool{" + << self->host_name_ + << "}, now client count: " << clients.size(); std::size_t is_all_cleared = clients.clear_old(clear_cnt); CINATRA_LOG_DEBUG << "finish collect timeout client of pool{" << self->host_name_ @@ -105,47 +106,14 @@ class client_pool : public std::enable_shared_from_this< co_return; } - struct client_connect_helper { - std::unique_ptr client; - std::weak_ptr pool_watcher; - std::weak_ptr spinlock_watcher; - client_connect_helper(std::unique_ptr&& client, - std::weak_ptr&& pool_watcher, - std::weak_ptr&& spinlock_watcher) - : client(std::move(client)), - pool_watcher(std::move(pool_watcher)), - spinlock_watcher(std::move(spinlock_watcher)) {} - client_connect_helper(client_connect_helper&& o) - : client(std::move(o.client)), - pool_watcher(std::move(o.pool_watcher)), - spinlock_watcher(std::move(o.spinlock_watcher)) {} - client_connect_helper& operator=(client_connect_helper&& o) { - client = std::move(o.client); - pool_watcher = std::move(o.pool_watcher); - spinlock_watcher = std::move(o.spinlock_watcher); - return *this; - } - ~client_connect_helper() { - if (client) { - if (auto pool = pool_watcher.lock(); pool) { - int cnt = 0; - while (spinlock_watcher.lock()) { - std::this_thread::yield(); - ++cnt; - if (cnt % 10000 == 0) { - CINATRA_LOG_WARNING << "spinlock of client{" << client.get() - << "},host:{" << client->get_host() << ":" - << client->get_port() - << "}cost too much time, spin count: " << cnt; - } - } - pool->collect_free_client(std::move(client)); - } - } - } - }; + static auto rand_time(std::chrono::milliseconds ms) { + static thread_local std::default_random_engine r; + std::uniform_real_distribution e(0.7f, 1.3f); + return std::chrono::milliseconds{static_cast(e(r) * ms.count())}; + } async_simple::coro::Lazy reconnect(std::unique_ptr& client) { + using namespace std::chrono_literals; for (unsigned int i = 0; i < pool_config_.connect_retry_count; ++i) { CINATRA_LOG_DEBUG << "try to reconnect client{" << client.get() << "},host:{" << client->get_host() << ":" @@ -166,9 +134,10 @@ class client_pool : public std::enable_shared_from_this< CINATRA_LOG_DEBUG << "reconnect client{" << client.get() << "} failed. If client close:{" << client->has_closed() << "}"; - auto wait_time = pool_config_.reconnect_wait_time - cost_time; + auto wait_time = rand_time( + (pool_config_.reconnect_wait_time * (i + 1) - cost_time) / 1ms * 1ms); if (wait_time.count() > 0) - co_await coro_io::sleep_for(wait_time); + co_await coro_io::sleep_for(wait_time, &client->get_executor()); } CINATRA_LOG_WARNING << "reconnect client{" << client.get() << "},host:{" << client->get_host() << ":" << client->get_port() @@ -176,140 +145,116 @@ class client_pool : public std::enable_shared_from_this< client = nullptr; } - async_simple::coro::Lazy connect_client( - client_connect_helper helper) { - CINATRA_LOG_DEBUG << "try to connect client{" << helper.client.get() + struct promise_handler { + std::atomic flag_ = false; + async_simple::Promise> promise_; + }; + + async_simple::coro::Lazy connect_client( + std::unique_ptr client, std::weak_ptr watcher, + std::shared_ptr handler) { + CINATRA_LOG_DEBUG << "try to connect client{" << client.get() << "} to host:" << host_name_; - auto result = co_await helper.client->connect(host_name_); + auto result = co_await client->connect(host_name_); + std::shared_ptr self = watcher.lock(); if (!client_t::is_ok(result)) { - CINATRA_LOG_DEBUG << "connect client{" << helper.client.get() - << "} to failed. "; - co_await reconnect(helper.client); + CINATRA_LOG_DEBUG << "connect client{" << client.get() << "} to failed. "; + if (self) { + co_await reconnect(client); + } } - if (helper.client) { - CINATRA_LOG_DEBUG << "connect client{" << helper.client.get() - << "} successful!"; + if (client) { + CINATRA_LOG_DEBUG << "connect client{" << client.get() << "} successful!"; + } + auto has_get_connect = handler->flag_.exchange(true); + if (!has_get_connect) { + handler->promise_.setValue(std::move(client)); + } + else { + auto conn_lim = std::min(10u, pool_config_.max_connection); + if (self && free_clients_.size() < conn_lim && client) { + enqueue(free_clients_, std::move(client), pool_config_.idle_timeout); + } } - - co_return std::move(helper); - } - - auto rand_time() { - static thread_local std::default_random_engine r; - std::uniform_int_distribution e(-25, 25); - return std::chrono::milliseconds{100 + e(r)}; } async_simple::coro::Lazy> get_client( const typename client_t::config& client_config) { std::unique_ptr client; - free_clients_.try_dequeue(client); if (!client) { short_connect_clients_.try_dequeue(client); } - assert(client == nullptr || !client->has_closed()); if (client == nullptr) { - client = std::make_unique(*io_context_pool_.get_executor()); - if (!client->init_config(client_config)) { - CINATRA_LOG_ERROR << "init client config{" << client.get() - << "} failed."; - co_return nullptr; - } - auto spinlock = std::make_shared(false); - auto client_ptr = client.get(); - auto result = co_await async_simple::coro::collectAny( - connect_client(client_connect_helper{ - std::move(client), this->shared_from_this(), spinlock}), - coro_io::sleep_for(rand_time())); - if (result.index() == 0) { // connect finish in 100ms - co_return std::move(std::get<0>(result).value().client); - } - else if (result.index() == 1) { // connect time cost more than 100ms - CINATRA_LOG_DEBUG << "slow connection of client{" << client_ptr - << "}, try to get free client from pool."; - std::unique_ptr cli; - if (short_connect_clients_.try_dequeue(cli) || - free_clients_.try_dequeue(cli)) { - spinlock = nullptr; - CINATRA_LOG_DEBUG << "get free client{" << cli.get() - << "} from pool. skip wait client{" << client_ptr - << "} connect"; - co_return std::move(cli); + std::unique_ptr cli; + auto executor = io_context_pool_.get_executor(); + client = std::make_unique(*executor); + if (!client->init_config(client_config)) + AS_UNLIKELY { + CINATRA_LOG_ERROR << "init client config failed."; + co_return nullptr; } - else { - auto promise = std::make_unique< - async_simple::Promise>>(); - auto* promise_address = promise.get(); - promise_queue.enqueue(promise_address); - spinlock = nullptr; - if (short_connect_clients_.try_dequeue(cli) || - free_clients_.try_dequeue(cli)) { - collect_free_client(std::move(cli)); - } - CINATRA_LOG_DEBUG - << "wait for free client waiter promise{" << promise_address - << "} response because slow client{" << client_ptr << "}"; - - auto res = co_await collectAny( - [](auto promise) - -> async_simple::coro::Lazy> { - co_return co_await promise->getFuture(); - }(std::move(promise)), - coro_io::sleep_for(this->pool_config_.max_connection_time)); - if (res.index() == 0) { - auto& res0 = std::get<0>(res); - if (!res0.hasError()) { - auto& cli = res0.value(); - CINATRA_LOG_DEBUG << "get free client{" << cli.get() - << "} from promise{" << promise_address - << "}. skip wait client{" << client_ptr - << "} connect"; - co_return std::move(cli); - } - else { - CINATRA_LOG_ERROR << "Unexcepted branch"; - co_return nullptr; - } - } - else { - CINATRA_LOG_ERROR - << "Unexcepted branch. Out of max limitation of connect " - "time, connect " - "failed. skip wait client{" - << client_ptr << "} connect. " - << "skip wait promise {" << promise_address << "} response"; - co_return nullptr; + auto client_ptr = client.get(); + auto handler = std::make_shared(); + connect_client(std::move(client), this->weak_from_this(), handler) + .start([](auto&&) { + }); + auto timer = std::make_shared( + executor->get_asio_executor()); + timer->expires_after(std::chrono::milliseconds{20}); + timer->async_await().start([watcher = this->weak_from_this(), handler, + client_ptr, timer](auto&& res) { + if (res.value() && !handler->flag_) { + if (auto self = watcher.lock(); self) { + ++self->promise_cnt_; + self->promise_queue_.enqueue(handler); + timer->expires_after( + (std::max)(std::chrono::milliseconds{0}, + self->pool_config_.max_connection_time - + std::chrono::milliseconds{20})); + timer->async_await().start([handler = std::move(handler), + client_ptr = client_ptr](auto&& res) { + auto has_get_connect = handler->flag_.exchange(true); + if (!has_get_connect) { + CINATRA_LOG_ERROR << "Out of max limitation of connect " + "time, connect " + "failed. skip wait client{" + << client_ptr << "} connect. "; + handler->promise_.setValue(std::unique_ptr{nullptr}); + } + }); } } - } - else { - CINATRA_LOG_ERROR << "unknown collectAny index while wait client{" - << client_ptr << "} connect"; - co_return nullptr; + }); + CINATRA_LOG_DEBUG << "wait client by promise {" << &handler->promise_ + << "}"; + client = co_await handler->promise_.getFuture(); + if (client) { + executor->schedule([timer] { + std::error_code ignore_ec; + timer->cancel(ignore_ec); + }); } } else { - co_return std::move(client); + CINATRA_LOG_DEBUG << "get free client{" << client.get() + << "}. from queue"; } + co_return std::move(client); } void enqueue( coro_io::detail::client_queue>& clients, - std::unique_ptr client, bool is_short_client) { + std::unique_ptr client, + std::chrono::milliseconds collect_time) { if (clients.enqueue(std::move(client)) == 1) { std::size_t expected = 0; if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) { CINATRA_LOG_DEBUG << "start timeout client collecter of client_pool{" << host_name_ << "}"; collect_idle_timeout_client( - this->shared_from_this(), clients, - (std::max)( - (is_short_client - ? (std::min)(pool_config_.idle_timeout, - pool_config_.short_connect_idle_timeout) - : pool_config_.idle_timeout), - std::chrono::milliseconds{50}), + this->weak_from_this(), clients, + (std::max)(collect_time, std::chrono::milliseconds{50}), pool_config_.idle_queue_per_max_clear_count) .via(coro_io::get_global_executor()) .start([](auto&&) { @@ -319,25 +264,44 @@ class client_pool : public std::enable_shared_from_this< } void collect_free_client(std::unique_ptr client) { - if (client && !client->has_closed()) { - async_simple::Promise>* promise = nullptr; - if (promise_queue.try_dequeue(promise)) { - promise->setValue(std::move(client)); + if (!client->has_closed()) { + std::shared_ptr handler; + if (promise_cnt_) { + int cnt = 0; + while (promise_queue_.try_dequeue(handler)) { + ++cnt; + auto has_get_connect = handler->flag_.exchange(true); + if (!has_get_connect) { + handler->promise_.setValue(std::move(client)); + promise_cnt_ -= cnt; + CINATRA_LOG_DEBUG << "collect free client{" << client.get() + << "} and wake up promise{" << &handler->promise_ + << "}"; + return; + } + } + promise_cnt_ -= cnt; } - else if (free_clients_.size() < pool_config_.max_connection) { - enqueue(free_clients_, std::move(client), false); + + if (free_clients_.size() < pool_config_.max_connection) { + if (client) { + CINATRA_LOG_DEBUG << "collect free client{" << client.get() + << "} enqueue"; + enqueue(free_clients_, std::move(client), pool_config_.idle_timeout); + } } else { CINATRA_LOG_DEBUG << "out of max connection limit <<" << pool_config_.max_connection << ", collect free client{" << client.get() << "} enqueue short connect queue"; - enqueue(short_connect_clients_, std::move(client), true); + enqueue(short_connect_clients_, std::move(client), + pool_config_.short_connect_idle_timeout); } } else { CINATRA_LOG_DEBUG << "client{" << client.get() - << "} is nullptr or is closed. we won't collect it"; + << "} is closed. we won't collect it"; } return; @@ -407,6 +371,7 @@ class client_pool : public std::enable_shared_from_this< async_simple::coro::Lazy> send_request( T op, typename client_t::config& client_config) { // return type: Lazy> + CINATRA_LOG_TRACE << "try send request to " << host_name_; auto client = co_await get_client(client_config); if (!client) { CINATRA_LOG_WARNING << "send request to " << host_name_ @@ -430,10 +395,22 @@ class client_pool : public std::enable_shared_from_this< return send_request(std::move(op), pool_config_.client_config); } + /** + * @brief approx connection of client pools + * + * @return std::size_t + */ std::size_t free_client_count() const noexcept { return free_clients_.size() + short_connect_clients_.size(); } + /** + * @brief approx connection of client pools + * + * @return std::size_t + */ + std::size_t size() const noexcept { return free_client_count(); } + std::string_view get_host_name() const noexcept { return host_name_; } private: @@ -448,6 +425,7 @@ class client_pool : public std::enable_shared_from_this< T op, std::string_view endpoint, typename client_t::config& client_config) { // return type: Lazy> + CINATRA_LOG_TRACE << "try send request to " << endpoint; auto client = co_await get_client(client_config); if (!client) { CINATRA_LOG_WARNING << "send request to " << endpoint @@ -477,8 +455,8 @@ class client_pool : public std::enable_shared_from_this< coro_io::detail::client_queue> short_connect_clients_; client_pools_t* pools_manager_ = nullptr; - moodycamel::ConcurrentQueue>*> - promise_queue; + std::atomic promise_cnt_ = 0; + moodycamel::ConcurrentQueue> promise_queue_; async_simple::Promise idle_timeout_waiter; std::string host_name_; pool_config pool_config_; @@ -546,14 +524,6 @@ class client_pools { iter->second = pool; } } - if (has_inserted) { - CINATRA_LOG_DEBUG << "add new client pool of {" << host_name - << "} to hash table"; - } - else { - CINATRA_LOG_DEBUG << "add new client pool of {" << host_name - << "} failed, element existed."; - } } return iter->second; }