From ab0fb6b4b585715372b54a708bb5a129de1b1202 Mon Sep 17 00:00:00 2001 From: saipubw Date: Wed, 10 Apr 2024 18:10:56 +0800 Subject: [PATCH] [coro_io] fix client pool slow connect bug (#657) --- include/ylt/coro_io/client_pool.hpp | 275 ++++++++---------- include/ylt/coro_rpc/impl/coro_rpc_client.hpp | 83 +++--- .../standalone/cinatra/coro_http_client.hpp | 2 + src/coro_io/tests/test_client_pool.cpp | 2 +- .../examples/base_examples/channel.cpp | 8 +- .../examples/base_examples/client_pool.cpp | 62 +++- .../examples/base_examples/client_pools.cpp | 6 +- .../base_examples/concurrent_clients.cpp | 4 +- src/coro_rpc/tests/test_coro_rpc_client.cpp | 15 +- 9 files changed, 234 insertions(+), 223 deletions(-) diff --git a/include/ylt/coro_io/client_pool.hpp b/include/ylt/coro_io/client_pool.hpp index 062755890..2f44949f2 100644 --- a/include/ylt/coro_io/client_pool.hpp +++ b/include/ylt/coro_io/client_pool.hpp @@ -36,12 +36,14 @@ #include #include #include +#include #include #include #include #include #include +#include "async_simple/Common.h" #include "async_simple/coro/Collect.h" #include "coro_io.hpp" #include "detail/client_queue.hpp" @@ -105,46 +107,14 @@ class client_pool : public std::enable_shared_from_this< co_return; } - struct client_connect_helper { - std::unique_ptr client; - std::weak_ptr pool_watcher; - std::weak_ptr spinlock_watcher; - client_connect_helper(std::unique_ptr&& client, - std::weak_ptr&& pool_watcher, - std::weak_ptr&& spinlock_watcher) - : client(std::move(client)), - pool_watcher(std::move(pool_watcher)), - spinlock_watcher(std::move(spinlock_watcher)) {} - client_connect_helper(client_connect_helper&& o) - : client(std::move(o.client)), - pool_watcher(std::move(o.pool_watcher)), - spinlock_watcher(std::move(o.spinlock_watcher)) {} - client_connect_helper& operator=(client_connect_helper&& o) { - client = std::move(o.client); - pool_watcher = std::move(o.pool_watcher); - spinlock_watcher = std::move(o.spinlock_watcher); - return *this; - } - ~client_connect_helper() { - if (client) { - if (auto pool = pool_watcher.lock(); pool) { - int cnt = 0; - while (spinlock_watcher.lock()) { - std::this_thread::yield(); - ++cnt; - if (cnt % 10000 == 0) { - ELOG_WARN << "spinlock of client{" << client.get() << "},host:{" - << client->get_host() << ":" << client->get_port() - << "}cost too much time, spin count: " << cnt; - } - } - pool->collect_free_client(std::move(client)); - } - } - } - }; + static auto rand_time(std::chrono::milliseconds ms) { + static thread_local std::default_random_engine r; + std::uniform_real_distribution e(0.7f, 1.3f); + return std::chrono::milliseconds{static_cast(e(r) * ms.count())}; + } async_simple::coro::Lazy reconnect(std::unique_ptr& client) { + using namespace std::chrono_literals; for (unsigned int i = 0; i < pool_config_.connect_retry_count; ++i) { ELOG_DEBUG << "try to reconnect client{" << client.get() << "},host:{" << client->get_host() << ":" << client->get_port() @@ -164,9 +134,10 @@ class client_pool : public std::enable_shared_from_this< ELOG_DEBUG << "reconnect client{" << client.get() << "} failed. If client close:{" << client->has_closed() << "}"; - auto wait_time = pool_config_.reconnect_wait_time - cost_time; + auto wait_time = rand_time( + (pool_config_.reconnect_wait_time * (i + 1) - cost_time) / 1ms * 1ms); if (wait_time.count() > 0) - co_await coro_io::sleep_for(wait_time); + co_await coro_io::sleep_for(wait_time, &client->get_executor()); } ELOG_WARN << "reconnect client{" << client.get() << "},host:{" << client->get_host() << ":" << client->get_port() @@ -174,137 +145,114 @@ class client_pool : public std::enable_shared_from_this< client = nullptr; } - async_simple::coro::Lazy connect_client( - client_connect_helper helper) { - ELOG_DEBUG << "try to connect client{" << helper.client.get() + struct promise_handler { + std::atomic flag_ = false; + async_simple::Promise> promise_; + }; + + async_simple::coro::Lazy connect_client( + std::unique_ptr client, std::weak_ptr watcher, + std::shared_ptr handler) { + ELOG_DEBUG << "try to connect client{" << client.get() << "} to host:" << host_name_; - auto result = co_await helper.client->connect(host_name_); + auto result = co_await client->connect(host_name_); + std::shared_ptr self = watcher.lock(); if (!client_t::is_ok(result)) { - ELOG_DEBUG << "connect client{" << helper.client.get() << "} to failed. "; - co_await reconnect(helper.client); + ELOG_DEBUG << "connect client{" << client.get() << "} to failed. "; + if (self) { + co_await reconnect(client); + } } - if (helper.client) { - ELOG_DEBUG << "connect client{" << helper.client.get() << "} successful!"; + if (client) { + ELOG_DEBUG << "connect client{" << client.get() << "} successful!"; + } + auto has_get_connect = handler->flag_.exchange(true); + if (!has_get_connect) { + handler->promise_.setValue(std::move(client)); + } + else { + auto conn_lim = std::min(10u, pool_config_.max_connection); + if (self && free_clients_.size() < conn_lim && client) { + enqueue(free_clients_, std::move(client), pool_config_.idle_timeout); + } } - - co_return std::move(helper); - } - - auto rand_time() { - static thread_local std::default_random_engine r; - std::uniform_int_distribution e(-25, 25); - return std::chrono::milliseconds{100 + e(r)}; } async_simple::coro::Lazy> get_client( const typename client_t::config& client_config) { std::unique_ptr client; - free_clients_.try_dequeue(client); if (!client) { short_connect_clients_.try_dequeue(client); } - assert(client == nullptr || !client->has_closed()); if (client == nullptr) { - client = std::make_unique(*io_context_pool_.get_executor()); - if (!client->init_config(client_config)) { - ELOG_ERROR << "init client config{" << client.get() << "} failed."; - co_return nullptr; - } - auto spinlock = std::make_shared(false); - auto client_ptr = client.get(); - auto result = co_await async_simple::coro::collectAny( - connect_client(client_connect_helper{ - std::move(client), this->shared_from_this(), spinlock}), - coro_io::sleep_for(rand_time())); - if (result.index() == 0) { // connect finish in 100ms - co_return std::move(std::get<0>(result).value().client); - } - else if (result.index() == 1) { // connect time cost more than 100ms - ELOG_DEBUG << "slow connection of client{" << client_ptr - << "}, try to get free client from pool."; - std::unique_ptr cli; - if (short_connect_clients_.try_dequeue(cli) || - free_clients_.try_dequeue(cli)) { - spinlock = nullptr; - ELOG_DEBUG << "get free client{" << cli.get() - << "} from pool. skip wait client{" << client_ptr - << "} connect"; - co_return std::move(cli); + std::unique_ptr cli; + auto executor = io_context_pool_.get_executor(); + client = std::make_unique(*executor); + if (!client->init_config(client_config)) + AS_UNLIKELY { + ELOG_ERROR << "init client config failed."; + co_return nullptr; } - else { - auto promise = std::make_unique< - async_simple::Promise>>(); - auto* promise_address = promise.get(); - promise_queue.enqueue(promise_address); - spinlock = nullptr; - if (short_connect_clients_.try_dequeue(cli) || - free_clients_.try_dequeue(cli)) { - collect_free_client(std::move(cli)); - } - ELOG_DEBUG << "wait for free client waiter promise{" - << promise_address << "} response because slow client{" - << client_ptr << "}"; - - auto res = co_await collectAny( - [](auto promise) - -> async_simple::coro::Lazy> { - co_return co_await promise->getFuture(); - }(std::move(promise)), - coro_io::sleep_for(this->pool_config_.max_connection_time)); - if (res.index() == 0) { - auto& res0 = std::get<0>(res); - if (!res0.hasError()) { - auto& cli = res0.value(); - ELOG_DEBUG << "get free client{" << cli.get() << "} from promise{" - << promise_address << "}. skip wait client{" - << client_ptr << "} connect"; - co_return std::move(cli); - } - else { - ELOG_ERROR << "Unexcepted branch"; - co_return nullptr; - } - } - else { - ELOG_ERROR << "Unexcepted branch. Out of max limitation of connect " - "time, connect " - "failed. skip wait client{" - << client_ptr << "} connect. " - << "skip wait promise {" << promise_address - << "} response"; - co_return nullptr; + auto client_ptr = client.get(); + auto handler = std::make_shared(); + connect_client(std::move(client), this->weak_from_this(), handler) + .start([](auto&&) { + }); + auto timer = std::make_shared( + executor->get_asio_executor()); + timer->expires_after(std::chrono::milliseconds{20}); + timer->async_await().start([watcher = this->weak_from_this(), handler, + client_ptr, timer](auto&& res) { + if (res.value() && !handler->flag_) { + if (auto self = watcher.lock(); self) { + ++self->promise_cnt_; + self->promise_queue_.enqueue(handler); + timer->expires_after( + (std::max)(std::chrono::milliseconds{0}, + self->pool_config_.max_connection_time - + std::chrono::milliseconds{20})); + timer->async_await().start([handler = std::move(handler), + client_ptr = client_ptr](auto&& res) { + auto has_get_connect = handler->flag_.exchange(true); + if (!has_get_connect) { + ELOG_ERROR << "Out of max limitation of connect " + "time, connect " + "failed. skip wait client{" + << client_ptr << "} connect. "; + handler->promise_.setValue(std::unique_ptr{nullptr}); + } + }); } } - } - else { - ELOG_ERROR << "unknown collectAny index while wait client{" - << client_ptr << "} connect"; - co_return nullptr; + }); + ELOG_DEBUG << "wait client by promise {" << &handler->promise_ << "}"; + client = co_await handler->promise_.getFuture(); + if (client) { + executor->schedule([timer] { + std::error_code ignore_ec; + timer->cancel(ignore_ec); + }); } } else { ELOG_DEBUG << "get free client{" << client.get() << "}. from queue"; - co_return std::move(client); } + co_return std::move(client); } void enqueue( coro_io::detail::client_queue>& clients, - std::unique_ptr client, bool is_short_client) { + std::unique_ptr client, + std::chrono::milliseconds collect_time) { if (clients.enqueue(std::move(client)) == 1) { std::size_t expected = 0; if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) { ELOG_DEBUG << "start timeout client collecter of client_pool{" << host_name_ << "}"; collect_idle_timeout_client( - this->shared_from_this(), clients, - (std::max)( - (is_short_client - ? (std::min)(pool_config_.idle_timeout, - pool_config_.short_connect_idle_timeout) - : pool_config_.idle_timeout), - std::chrono::milliseconds{50}), + this->weak_from_this(), clients, + (std::max)(collect_time, std::chrono::milliseconds{50}), pool_config_.idle_queue_per_max_clear_count) .via(coro_io::get_global_executor()) .start([](auto&&) { @@ -314,28 +262,41 @@ class client_pool : public std::enable_shared_from_this< } void collect_free_client(std::unique_ptr client) { - ELOG_DEBUG << "collect free client{" << client.get() << "}"; - if (client && !client->has_closed()) { - async_simple::Promise>* promise = nullptr; - if (promise_queue.try_dequeue(promise)) { - promise->setValue(std::move(client)); - ELOG_DEBUG << "collect free client{" << client.get() - << "} wake up promise{" << promise << "}"; + if (!client->has_closed()) { + std::shared_ptr handler; + if (promise_cnt_) { + int cnt = 0; + while (promise_queue_.try_dequeue(handler)) { + ++cnt; + auto has_get_connect = handler->flag_.exchange(true); + if (!has_get_connect) { + handler->promise_.setValue(std::move(client)); + promise_cnt_ -= cnt; + ELOG_DEBUG << "collect free client{" << client.get() + << "} and wake up promise{" << &handler->promise_ << "}"; + return; + } + } + promise_cnt_ -= cnt; } - else if (free_clients_.size() < pool_config_.max_connection) { - ELOG_DEBUG << "collect free client{" << client.get() << "} enqueue"; - enqueue(free_clients_, std::move(client), false); + + if (free_clients_.size() < pool_config_.max_connection) { + if (client) { + ELOG_DEBUG << "collect free client{" << client.get() << "} enqueue"; + enqueue(free_clients_, std::move(client), pool_config_.idle_timeout); + } } else { ELOG_DEBUG << "out of max connection limit <<" << pool_config_.max_connection << ", collect free client{" << client.get() << "} enqueue short connect queue"; - enqueue(short_connect_clients_, std::move(client), true); + enqueue(short_connect_clients_, std::move(client), + pool_config_.short_connect_idle_timeout); } } else { ELOG_DEBUG << "client{" << client.get() - << "} is nullptr or is closed. we won't collect it"; + << "} is closed. we won't collect it"; } return; @@ -489,8 +450,8 @@ class client_pool : public std::enable_shared_from_this< coro_io::detail::client_queue> short_connect_clients_; client_pools_t* pools_manager_ = nullptr; - moodycamel::ConcurrentQueue>*> - promise_queue; + std::atomic promise_cnt_ = 0; + moodycamel::ConcurrentQueue> promise_queue_; async_simple::Promise idle_timeout_waiter; std::string host_name_; pool_config pool_config_; @@ -558,14 +519,6 @@ class client_pools { iter->second = pool; } } - if (has_inserted) { - ELOG_DEBUG << "add new client pool of {" << host_name - << "} to hash table"; - } - else { - ELOG_DEBUG << "add new client pool of {" << host_name - << "} failed, element existed."; - } } return iter->second; } diff --git a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp index b3d924f7a..55d03c019 100644 --- a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp +++ b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp @@ -129,9 +129,8 @@ class coro_rpc_client { */ coro_rpc_client(asio::io_context::executor_type executor, uint32_t client_id = 0) - : executor(executor), - timer_(executor), - socket_(std::make_shared(executor)) { + : control_(std::make_shared(executor, false)), + timer_(executor) { config_.client_id = client_id; } @@ -142,10 +141,9 @@ class coro_rpc_client { coro_rpc_client( coro_io::ExecutorWrapper<> &executor = *coro_io::get_global_executor(), uint32_t client_id = 0) - : executor(executor.get_asio_executor()), - timer_(executor.get_asio_executor()), - socket_(std::make_shared( - executor.get_asio_executor())) { + : control_( + std::make_shared(executor.get_asio_executor(), false)), + timer_(executor.get_asio_executor()) { config_.client_id = client_id; } @@ -317,7 +315,7 @@ class coro_rpc_client { } else { #endif - ret = co_await call_impl(*socket_, std::move(args)...); + ret = co_await call_impl(control_->socket_, std::move(args)...); #ifdef YLT_ENABLE_SSL } #endif @@ -325,7 +323,7 @@ class coro_rpc_client { std::error_code err_code; timer_.cancel(err_code); - if (is_timeout_) { + if (control_->is_timeout_) { ret = rpc_result{ unexpect_t{}, rpc_error{errc::timed_out, "rpc call timed out"}}; } @@ -340,7 +338,7 @@ class coro_rpc_client { /*! * Get inner executor */ - auto &get_executor() { return executor; } + auto &get_executor() { return control_->executor_; } uint32_t get_client_id() const { return config_.client_id; } @@ -350,7 +348,7 @@ class coro_rpc_client { } has_closed_ = true; ELOGV(INFO, "client_id %d close", config_.client_id); - close_socket(socket_); + close_socket(control_); } bool set_req_attachment(std::string_view attachment) { @@ -379,10 +377,10 @@ class coro_rpc_client { }; void reset() { - close_socket(socket_); - socket_ = - std::make_shared(executor.get_asio_executor()); - is_timeout_ = false; + close_socket(control_); + control_->socket_ = + asio::ip::tcp::socket(control_->executor_.get_asio_executor()); + control_->is_timeout_ = false; has_closed_ = false; } static bool is_ok(coro_rpc::err_code ec) noexcept { return !ec; } @@ -411,23 +409,23 @@ class coro_rpc_client { }); std::error_code ec = co_await coro_io::async_connect( - &executor, *socket_, config_.host, config_.port); + &control_->executor_, control_->socket_, config_.host, config_.port); std::error_code err_code; timer_.cancel(err_code); if (ec) { - if (is_timeout_) { + if (control_->is_timeout_) { co_return errc::timed_out; } co_return errc::not_connected; } - if (is_timeout_) { + if (control_->is_timeout_) { ELOGV(WARN, "client_id %d connect timeout", config_.client_id); co_return errc::timed_out; } - socket_->set_option(asio::ip::tcp::no_delay(true), ec); + control_->socket_.set_option(asio::ip::tcp::no_delay(true), ec); #ifdef YLT_ENABLE_SSL if (!config_.ssl_cert_path.empty()) { @@ -465,7 +463,7 @@ class coro_rpc_client { asio::ssl::host_name_verification(config_.ssl_domain)); ssl_stream_ = std::make_unique>( - *socket_, ssl_ctx_); + control_->socket_, ssl_ctx_); ssl_init_ret_ = true; } catch (std::exception &e) { ELOGV(ERROR, "init ssl failed: %s", e.what()); @@ -475,14 +473,17 @@ class coro_rpc_client { #endif async_simple::coro::Lazy timeout(auto duration, std::string err_msg) { timer_.expires_after(duration); + std::weak_ptr socket_watcher = control_; bool is_timeout = co_await timer_.async_await(); if (!is_timeout) { co_return false; } - - is_timeout_ = is_timeout; - close_socket(socket_); - co_return true; + if (auto self = socket_watcher.lock()) { + self->is_timeout_ = is_timeout; + close_socket(self); + co_return true; + } + co_return false; } template @@ -581,7 +582,7 @@ class coro_rpc_client { ret = co_await coro_io::async_write( socket, asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN)); ELOGV(INFO, "client_id %d shutdown", config_.client_id); - socket_->shutdown(asio::ip::tcp::socket::shutdown_send); + control_->socket_.shutdown(asio::ip::tcp::socket::shutdown_send); r = rpc_result{ unexpect_t{}, rpc_error{errc::io_error, ret.first.message()}}; co_return r; @@ -656,10 +657,10 @@ class coro_rpc_client { } #ifdef UNIT_TEST_INJECT if (g_action == inject_action::force_inject_client_write_data_timeout) { - is_timeout_ = true; + control_->is_timeout_ = true; } #endif - if (is_timeout_) { + if (control_->is_timeout_) { r = rpc_result{ unexpect_t{}, rpc_error{.code = errc::timed_out, .msg = {}}}; } @@ -789,13 +790,22 @@ class coro_rpc_client { offset, std::forward(args)...); } - void close_socket(std::shared_ptr socket) { - asio::dispatch( - executor.get_asio_executor(), [socket = std::move(socket)]() { - asio::error_code ignored_ec; - socket->shutdown(asio::ip::tcp::socket::shutdown_both, ignored_ec); - socket->close(ignored_ec); - }); + struct control_t { + asio::ip::tcp::socket socket_; + bool is_timeout_; + coro_io::ExecutorWrapper<> executor_; + control_t(asio::io_context::executor_type executor, bool is_timeout) + : socket_(executor), is_timeout_(is_timeout), executor_(executor) {} + }; + + static void close_socket( + std::shared_ptr control) { + control->executor_.schedule([control = std::move(control)]() { + asio::error_code ignored_ec; + control->socket_.shutdown(asio::ip::tcp::socket::shutdown_both, + ignored_ec); + control->socket_.close(ignored_ec); + }); } #ifdef UNIT_TEST_INJECT @@ -812,10 +822,10 @@ class coro_rpc_client { call(std::forward(args)...)); } #endif + private: - coro_io::ExecutorWrapper<> executor; coro_io::period_timer timer_; - std::shared_ptr socket_; + std::shared_ptr control_; std::string read_buf_, resp_attachment_buf_; std::string_view req_attachment_; config config_; @@ -825,7 +835,6 @@ class coro_rpc_client { std::unique_ptr> ssl_stream_; bool ssl_init_ret_ = true; #endif - bool is_timeout_ = false; std::atomic has_closed_ = false; }; } // namespace coro_rpc diff --git a/include/ylt/standalone/cinatra/coro_http_client.hpp b/include/ylt/standalone/cinatra/coro_http_client.hpp index ad8a3bc3f..dd1c8c266 100644 --- a/include/ylt/standalone/cinatra/coro_http_client.hpp +++ b/include/ylt/standalone/cinatra/coro_http_client.hpp @@ -197,6 +197,8 @@ class coro_http_client : public std::enable_shared_from_this { }); } + coro_io::ExecutorWrapper<> &get_executor() { return executor_wrapper_; } + #ifdef CINATRA_ENABLE_SSL bool init_ssl(int verify_mode, const std::string &base_path, const std::string &cert_file, const std::string &sni_hostname) { diff --git a/src/coro_io/tests/test_client_pool.cpp b/src/coro_io/tests/test_client_pool.cpp index 684fc5dca..3a9eff4ac 100644 --- a/src/coro_io/tests/test_client_pool.cpp +++ b/src/coro_io/tests/test_client_pool.cpp @@ -184,7 +184,7 @@ TEST_CASE("test reconnect retry wait time exclude reconnect cost time") { CHECK(pool->free_client_count() == 100); auto dur = std::chrono::steady_clock::now() - tp; std::cout << dur.count() << std::endl; - CHECK((dur >= 500ms && dur <= 800ms)); + CHECK((dur >= 400ms && dur <= 800ms)); server.stop(); co_return; }()); diff --git a/src/coro_rpc/examples/base_examples/channel.cpp b/src/coro_rpc/examples/base_examples/channel.cpp index e57b274cd..99f23c833 100644 --- a/src/coro_rpc/examples/base_examples/channel.cpp +++ b/src/coro_rpc/examples/base_examples/channel.cpp @@ -29,6 +29,8 @@ #include #include #include + +#include "ylt/easylog.hpp" using namespace coro_rpc; using namespace async_simple::coro; using namespace std::string_view_literals; @@ -51,18 +53,18 @@ Lazy call_echo(std::shared_ptr> channel, [](coro_rpc_client &client, std::string_view hostname) -> Lazy { auto res = co_await client.call("Hello world!"); if (!res.has_value()) { - std::cout << "coro_rpc err: \n" << res.error().msg; + ELOG_ERROR << "coro_rpc err: \n" << res.error().msg; co_return; } if (res.value() != "Hello world!"sv) { - std::cout << "err echo resp: \n" << res.value(); + ELOG_ERROR << "err echo resp: \n" << res.value(); co_return; } ++qps; co_return; }); if (!res) { - std::cout << "client pool err: connect failed.\n"; + ELOG_ERROR << "client pool err: connect failed.\n"; } } --working_echo; diff --git a/src/coro_rpc/examples/base_examples/client_pool.cpp b/src/coro_rpc/examples/base_examples/client_pool.cpp index a086016d2..23ba2b628 100644 --- a/src/coro_rpc/examples/base_examples/client_pool.cpp +++ b/src/coro_rpc/examples/base_examples/client_pool.cpp @@ -18,12 +18,20 @@ #include #include #include +#include #include #include #include #include #include + +#include "async_simple/coro/Collect.h" +#include "async_simple/coro/Lazy.h" +#include "async_simple/coro/SyncAwait.h" +#include "ylt/coro_io/io_context_pool.hpp" +#include "ylt/easylog.hpp" std::string echo(std::string_view sv); + using namespace coro_rpc; using namespace async_simple::coro; using namespace std::string_view_literals; @@ -31,34 +39,52 @@ using namespace std::string_view_literals; std::atomic qps = 0; std::atomic working_echo = 0; +std::atomic busy_echo = 0; +struct guard { + guard(std::atomic &ref) : ref(ref) { ++ref; } + ~guard() { --ref; } + std::atomic &ref; +}; /*! * \example helloworld/concurrency_clients.main.cpp * \brief demo for run concurrency clients */ -Lazy call_echo(coro_io::client_pool &client_pool, - int cnt) { +Lazy> call_echo( + coro_io::client_pool &client_pool, int cnt) { + std::vector result; + result.reserve(cnt); ++working_echo; for (int i = 0; i < cnt; ++i) { + auto tp = std::chrono::steady_clock::now(); auto res = co_await client_pool.send_request( [=](coro_rpc_client &client) -> Lazy { + guard g{busy_echo}; + if (client.has_closed()) { + co_return; + } auto res = co_await client.call("Hello world!"); if (!res.has_value()) { - std::cout << "coro_rpc err: \n" << res.error().msg; + ELOG_ERROR << "coro_rpc err: \n" << res.error().msg; + client.close(); co_return; } if (res.value() != "Hello world!"sv) { - std::cout << "err echo resp: \n" << res.value(); + ELOG_ERROR << "err echo resp: \n" << res.value(); co_return; } ++qps; co_return; }); if (!res) { - std::cout << "client pool err: connect failed.\n"; + ELOG_ERROR << "client pool err: connect failed.\n"; + } + else { + result.push_back(std::chrono::duration_cast( + std::chrono::steady_clock::now() - tp)); } } - --working_echo; + co_return std::move(result); } Lazy qps_watcher(coro_io::client_pool &clients) { @@ -68,11 +94,23 @@ Lazy qps_watcher(coro_io::client_pool &clients) { uint64_t cnt = qps.exchange(0); std::cout << "QPS:" << cnt << " free connection: " << clients.free_client_count() - << " working echo:" << working_echo << std::endl; + << " working echo:" << working_echo << " busy echo:" << busy_echo + << std::endl; cnt = 0; } } - +std::vector result; +void latency_watcher() { + std::sort(result.begin(), result.end()); + auto arr = {0.1, 0.3, 0.5, 0.7, 0.9, 0.95, 0.99, 0.999, 0.9999, 0.99999, 1.0}; + for (auto e : arr) { + std::cout + << (e * 100) << "% request finished in:" + << result[std::max(0, result.size() * e - 1)].count() / + 1000.0 + << "ms" << std::endl; + } +} int main() { auto thread_cnt = std::thread::hardware_concurrency(); auto client_pool = coro_io::client_pool::create( @@ -81,11 +119,17 @@ int main() { .max_connection = thread_cnt * 20, .client_config = {.timeout_duration = std::chrono::seconds{50}}}); + auto finish_executor = coro_io::get_global_block_executor(); for (int i = 0, lim = thread_cnt * 20; i < lim; ++i) { - call_echo(*client_pool, 10000).start([](auto &&) { + call_echo(*client_pool, 10000).start([finish_executor](auto &&res) { + finish_executor->schedule([res = std::move(res.value())] { + result.insert(result.end(), res.begin(), res.end()); + --working_echo; + }); }); } syncAwait(qps_watcher(*client_pool)); + latency_watcher(); std::cout << "Done!" << std::endl; return 0; } \ No newline at end of file diff --git a/src/coro_rpc/examples/base_examples/client_pools.cpp b/src/coro_rpc/examples/base_examples/client_pools.cpp index b06303a98..8bf414d60 100644 --- a/src/coro_rpc/examples/base_examples/client_pools.cpp +++ b/src/coro_rpc/examples/base_examples/client_pools.cpp @@ -48,18 +48,18 @@ Lazy call_echo(coro_io::client_pools &client_pools, [=](coro_rpc_client &client) -> Lazy { auto res = co_await client.call("Hello world!"); if (!res.has_value()) { - std::cout << "coro_rpc err: \n" << res.error().msg; + ELOG_ERROR << "coro_rpc err: \n" << res.error().msg; co_return; } if (res.value() != "Hello world!"sv) { - std::cout << "err echo resp: \n" << res.value(); + ELOG_ERROR << "err echo resp: \n" << res.value(); co_return; } ++qps; co_return; }); if (!res) { - std::cout << "client pool err: connect failed.\n"; + ELOG_ERROR << "client pool err: connect failed.\n"; } } --working_echo; diff --git a/src/coro_rpc/examples/base_examples/concurrent_clients.cpp b/src/coro_rpc/examples/base_examples/concurrent_clients.cpp index fdbf6847d..c8e8e7a20 100644 --- a/src/coro_rpc/examples/base_examples/concurrent_clients.cpp +++ b/src/coro_rpc/examples/base_examples/concurrent_clients.cpp @@ -54,11 +54,11 @@ Lazy call_echo(int cnt) { for (int i = 0; i < cnt; ++i) { auto res = co_await client.call("Hello world!"); if (!res.has_value()) { - std::cout << "coro_rpc err: \n" << res.error().msg; + ELOG_ERROR << "coro_rpc err: \n" << res.error().msg; co_return; } if (res.value() != "Hello world!"sv) { - std::cout << "err echo resp: \n" << res.value(); + ELOG_ERROR << "err echo resp: \n" << res.value(); co_return; } ++qps; diff --git a/src/coro_rpc/tests/test_coro_rpc_client.cpp b/src/coro_rpc/tests/test_coro_rpc_client.cpp index a820f8e73..71b69eab4 100644 --- a/src/coro_rpc/tests/test_coro_rpc_client.cpp +++ b/src/coro_rpc/tests/test_coro_rpc_client.cpp @@ -72,9 +72,9 @@ TEST_CASE("testing client") { std::string port = std::to_string(coro_rpc_server_port); asio::io_context io_context; std::promise promise; + auto worker = std::make_unique(io_context); auto future = promise.get_future(); std::thread thd([&io_context, &promise] { - asio::io_context::work work(io_context); promise.set_value(); io_context.run(); }); @@ -116,7 +116,7 @@ TEST_CASE("testing client") { g_action = {}; auto f = [&io_context, &port]() -> Lazy { auto client = co_await create_client(io_context, port); - auto ret = co_await client->template call_for(20ms); + auto ret = co_await client->template call_for(10ms); CHECK_MESSAGE(ret.error().code == coro_rpc::errc::timed_out, ret.error().msg); co_return; @@ -154,7 +154,7 @@ TEST_CASE("testing client") { } server.stop(); - io_context.stop(); + worker = nullptr; thd.join(); } @@ -163,8 +163,8 @@ TEST_CASE("testing client with inject server") { std::string port = std::to_string(coro_rpc_server_port); ELOGV(INFO, "inject server port: %d", port.data()); asio::io_context io_context; + auto worker = std::make_unique(io_context); std::thread thd([&io_context] { - asio::io_context::work work(io_context); io_context.run(); }); coro_rpc_server server(2, coro_rpc_server_port); @@ -212,7 +212,7 @@ TEST_CASE("testing client with inject server") { } server.stop(); - io_context.stop(); + worker = nullptr; thd.join(); g_action = inject_action::nothing; } @@ -245,15 +245,15 @@ class SSLClientTester { std::promise promise; auto future = promise.get_future(); + worker = std::make_unique(io_context); thd = std::thread([this, &promise] { - asio::io_context::work work(io_context); promise.set_value(); io_context.run(); }); future.wait(); } ~SSLClientTester() { - io_context.stop(); + worker = nullptr; thd.join(); } void inject(std::string msg, std::string& path, ssl_type type) { @@ -342,6 +342,7 @@ class SSLClientTester { ssl_type dh; asio::io_context io_context; std::thread thd; + std::unique_ptr worker; }; TEST_CASE("testing client with ssl server") {