From a41f755d8e09a6eb933cb339532a6f22d794c70e Mon Sep 17 00:00:00 2001 From: saipubw Date: Fri, 17 May 2024 17:59:22 +0800 Subject: [PATCH] [coro_rpc]rpc client support send_request without wait for response (#672) --- include/ylt/coro_io/client_pool.hpp | 75 +- include/ylt/coro_io/coro_io.hpp | 2 +- include/ylt/coro_io/io_context_pool.hpp | 2 +- include/ylt/coro_rpc/impl/coro_rpc_client.hpp | 869 ++++++++++++------ include/ylt/coro_rpc/impl/errno.h | 9 + include/ylt/coro_rpc/impl/expected.hpp | 5 +- .../standalone/cinatra/coro_http_client.hpp | 12 +- src/coro_io/tests/test_client_pool.cpp | 6 +- .../examples/base_examples/channel.cpp | 119 ++- .../examples/base_examples/client_pool.cpp | 112 ++- .../examples/base_examples/client_pools.cpp | 41 +- .../base_examples/concurrent_clients.cpp | 114 ++- .../examples/base_examples/rpc_service.cpp | 8 +- src/coro_rpc/tests/test_coro_rpc_client.cpp | 4 + src/coro_rpc/tests/test_coro_rpc_server.cpp | 7 - src/coro_rpc/tests/test_router.cpp | 7 +- 16 files changed, 903 insertions(+), 489 deletions(-) diff --git a/include/ylt/coro_io/client_pool.hpp b/include/ylt/coro_io/client_pool.hpp index 2f44949f2..21dc3df99 100644 --- a/include/ylt/coro_io/client_pool.hpp +++ b/include/ylt/coro_io/client_pool.hpp @@ -78,11 +78,11 @@ class client_pool : public std::enable_shared_from_this< break; } while (true) { - ELOG_DEBUG << "start collect timeout client of pool{" + ELOG_TRACE << "start collect timeout client of pool{" << self->host_name_ << "}, now client count: " << clients.size(); std::size_t is_all_cleared = clients.clear_old(clear_cnt); - ELOG_DEBUG << "finish collect timeout client of pool{" + ELOG_TRACE << "finish collect timeout client of pool{" << self->host_name_ << "}, now client cnt: " << clients.size(); if (is_all_cleared != 0) [[unlikely]] { @@ -109,36 +109,42 @@ class client_pool : public std::enable_shared_from_this< static auto rand_time(std::chrono::milliseconds ms) { static thread_local std::default_random_engine r; - std::uniform_real_distribution e(0.7f, 1.3f); + std::uniform_real_distribution e(1.0f, 1.2f); return std::chrono::milliseconds{static_cast(e(r) * ms.count())}; } - async_simple::coro::Lazy reconnect(std::unique_ptr& client) { + static async_simple::coro::Lazy reconnect( + std::unique_ptr& client, std::weak_ptr watcher) { using namespace std::chrono_literals; - for (unsigned int i = 0; i < pool_config_.connect_retry_count; ++i) { - ELOG_DEBUG << "try to reconnect client{" << client.get() << "},host:{" + std::shared_ptr self = watcher.lock(); + uint32_t i = UINT32_MAX; // (at least connect once) + do { + ELOG_TRACE << "try to reconnect client{" << client.get() << "},host:{" << client->get_host() << ":" << client->get_port() - << "}, try count:" << i - << "max retry limit:" << pool_config_.connect_retry_count; + << "}, try count:" << i << "max retry limit:" + << self->pool_config_.connect_retry_count; auto pre_time_point = std::chrono::steady_clock::now(); - bool ok = client_t::is_ok(co_await client->reconnect(host_name_)); + bool ok = client_t::is_ok(co_await client->connect(self->host_name_)); auto post_time_point = std::chrono::steady_clock::now(); auto cost_time = post_time_point - pre_time_point; - ELOG_DEBUG << "reconnect client{" << client.get() + ELOG_TRACE << "reconnect client{" << client.get() << "} cost time: " << cost_time / std::chrono::milliseconds{1} << "ms"; if (ok) { - ELOG_DEBUG << "reconnect client{" << client.get() << "} success"; + ELOG_TRACE << "reconnect client{" << client.get() << "} success"; co_return; } - ELOG_DEBUG << "reconnect client{" << client.get() + ELOG_TRACE << "reconnect client{" << client.get() << "} failed. If client close:{" << client->has_closed() << "}"; auto wait_time = rand_time( - (pool_config_.reconnect_wait_time * (i + 1) - cost_time) / 1ms * 1ms); + (self->pool_config_.reconnect_wait_time - cost_time) / 1ms * 1ms); + self = nullptr; if (wait_time.count() > 0) co_await coro_io::sleep_for(wait_time, &client->get_executor()); - } + self = watcher.lock(); + ++i; + } while (i < self->pool_config_.connect_retry_count); ELOG_WARN << "reconnect client{" << client.get() << "},host:{" << client->get_host() << ":" << client->get_port() << "} out of max limit, stop retry. connect failed"; @@ -150,30 +156,23 @@ class client_pool : public std::enable_shared_from_this< async_simple::Promise> promise_; }; - async_simple::coro::Lazy connect_client( + static async_simple::coro::Lazy connect_client( std::unique_ptr client, std::weak_ptr watcher, std::shared_ptr handler) { - ELOG_DEBUG << "try to connect client{" << client.get() - << "} to host:" << host_name_; - auto result = co_await client->connect(host_name_); - std::shared_ptr self = watcher.lock(); - if (!client_t::is_ok(result)) { - ELOG_DEBUG << "connect client{" << client.get() << "} to failed. "; - if (self) { - co_await reconnect(client); - } - } - if (client) { - ELOG_DEBUG << "connect client{" << client.get() << "} successful!"; - } + co_await reconnect(client, watcher); auto has_get_connect = handler->flag_.exchange(true); if (!has_get_connect) { handler->promise_.setValue(std::move(client)); } else { - auto conn_lim = std::min(10u, pool_config_.max_connection); - if (self && free_clients_.size() < conn_lim && client) { - enqueue(free_clients_, std::move(client), pool_config_.idle_timeout); + if (client) { + auto self = watcher.lock(); + auto conn_lim = + std::min(10u, self->pool_config_.max_connection); + if (self && self->free_clients_.size() < conn_lim) { + self->enqueue(self->free_clients_, std::move(client), + self->pool_config_.idle_timeout); + } } } } @@ -226,7 +225,7 @@ class client_pool : public std::enable_shared_from_this< } } }); - ELOG_DEBUG << "wait client by promise {" << &handler->promise_ << "}"; + ELOG_TRACE << "wait client by promise {" << &handler->promise_ << "}"; client = co_await handler->promise_.getFuture(); if (client) { executor->schedule([timer] { @@ -236,7 +235,7 @@ class client_pool : public std::enable_shared_from_this< } } else { - ELOG_DEBUG << "get free client{" << client.get() << "}. from queue"; + ELOG_TRACE << "get free client{" << client.get() << "}. from queue"; } co_return std::move(client); } @@ -248,7 +247,7 @@ class client_pool : public std::enable_shared_from_this< if (clients.enqueue(std::move(client)) == 1) { std::size_t expected = 0; if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) { - ELOG_DEBUG << "start timeout client collecter of client_pool{" + ELOG_TRACE << "start timeout client collecter of client_pool{" << host_name_ << "}"; collect_idle_timeout_client( this->weak_from_this(), clients, @@ -272,7 +271,7 @@ class client_pool : public std::enable_shared_from_this< if (!has_get_connect) { handler->promise_.setValue(std::move(client)); promise_cnt_ -= cnt; - ELOG_DEBUG << "collect free client{" << client.get() + ELOG_TRACE << "collect free client{" << client.get() << "} and wake up promise{" << &handler->promise_ << "}"; return; } @@ -282,12 +281,12 @@ class client_pool : public std::enable_shared_from_this< if (free_clients_.size() < pool_config_.max_connection) { if (client) { - ELOG_DEBUG << "collect free client{" << client.get() << "} enqueue"; + ELOG_TRACE << "collect free client{" << client.get() << "} enqueue"; enqueue(free_clients_, std::move(client), pool_config_.idle_timeout); } } else { - ELOG_DEBUG << "out of max connection limit <<" + ELOG_TRACE << "out of max connection limit <<" << pool_config_.max_connection << ", collect free client{" << client.get() << "} enqueue short connect queue"; enqueue(short_connect_clients_, std::move(client), @@ -295,7 +294,7 @@ class client_pool : public std::enable_shared_from_this< } } else { - ELOG_DEBUG << "client{" << client.get() + ELOG_TRACE << "client{" << client.get() << "} is closed. we won't collect it"; } diff --git a/include/ylt/coro_io/coro_io.hpp b/include/ylt/coro_io/coro_io.hpp index d4d114ed6..eab7727b0 100644 --- a/include/ylt/coro_io/coro_io.hpp +++ b/include/ylt/coro_io/coro_io.hpp @@ -317,7 +317,7 @@ inline async_simple::coro::Lazy sleep_for(Duration d) { template struct post_helper { void operator()(auto handler) { - asio::dispatch(e, [this, handler]() { + asio::post(e, [this, handler]() { try { if constexpr (std::is_same_v>) { func(); diff --git a/include/ylt/coro_io/io_context_pool.hpp b/include/ylt/coro_io/io_context_pool.hpp index 07eb8e522..b45c63ff9 100644 --- a/include/ylt/coro_io/io_context_pool.hpp +++ b/include/ylt/coro_io/io_context_pool.hpp @@ -75,7 +75,7 @@ class ExecutorWrapper : public async_simple::Executor { context_t &context() { return executor_.context(); } - auto get_asio_executor() { return executor_; } + auto get_asio_executor() const { return executor_; } operator ExecutorImpl() { return executor_; } diff --git a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp index 55d03c019..53f5da8cf 100644 --- a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp +++ b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp @@ -22,10 +22,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include #include @@ -33,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -40,6 +43,10 @@ #include "asio/buffer.hpp" #include "asio/dispatch.hpp" #include "asio/registered_buffer.hpp" +#include "async_simple/Executor.h" +#include "async_simple/Promise.h" +#include "async_simple/coro/Mutex.h" +#include "async_simple/coro/SpinLock.h" #include "common_service.hpp" #include "context.hpp" #include "expected.hpp" @@ -80,6 +87,42 @@ template <> struct rpc_return_type { using type = std::monostate; }; + +struct resp_body { + std::string read_buf_; + std::string resp_attachment_buf_; +}; + +template +struct async_rpc_result_value_t { + private: + T result_; + resp_body buffer_; + + public: + async_rpc_result_value_t(T &&result, resp_body &&buffer) + : result_(std::move(result)), buffer_(std::move(buffer)) {} + async_rpc_result_value_t(T &&result) : result_(std::move(result)) {} + T &result() noexcept { return result_; } + const T &result() const noexcept { return result_; } + std::string_view attachment() const noexcept { + return buffer_.resp_attachment_buf_; + } + resp_body release_buffer() { return std::move(buffer_); } +}; + +template <> +struct async_rpc_result_value_t { + resp_body buffer_; + std::string_view attachment() const noexcept { + return buffer_.resp_attachment_buf_; + } + resp_body release_buffer() { return std::move(buffer_); } +}; + +template +using async_rpc_result = expected, rpc_error>; + template using rpc_return_type_t = typename rpc_return_type::type; /*! @@ -117,6 +160,7 @@ class coro_rpc_client { std::chrono::milliseconds{5000}; std::string host; std::string port; + bool enable_tcp_no_delay_ = true; #ifdef YLT_ENABLE_SSL std::filesystem::path ssl_cert_path; std::string ssl_domain; @@ -130,7 +174,7 @@ class coro_rpc_client { coro_rpc_client(asio::io_context::executor_type executor, uint32_t client_id = 0) : control_(std::make_shared(executor, false)), - timer_(executor) { + timer_(std::make_unique(executor)) { config_.client_id = client_id; } @@ -139,11 +183,12 @@ class coro_rpc_client { * @param io_context asio io_context, async event handler */ coro_rpc_client( - coro_io::ExecutorWrapper<> &executor = *coro_io::get_global_executor(), + coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor(), uint32_t client_id = 0) : control_( - std::make_shared(executor.get_asio_executor(), false)), - timer_(executor.get_asio_executor()) { + std::make_shared(executor->get_asio_executor(), false)), + timer_(std::make_unique( + executor->get_asio_executor())) { config_.client_id = client_id; } @@ -151,6 +196,8 @@ class coro_rpc_client { std::string_view get_port() const { return config_.port; } + const config &get_config() const { return config_; } + [[nodiscard]] bool init_config(const config &conf) { config_ = conf; #ifdef YLT_ENABLE_SSL @@ -166,43 +213,8 @@ class coro_rpc_client { * * @return true if client closed, otherwise false. */ - [[nodiscard]] bool has_closed() { return has_closed_; } + [[nodiscard]] bool has_closed() { return control_->has_closed_; } - /*! - * Reconnect server - * - * If connect hasn't been closed, it will be closed first then connect to - * server, else the client will connect to server directly - * - * @param host server address - * @param port server port - * @param timeout_duration RPC call timeout - * @return error code - */ - [[nodiscard]] async_simple::coro::Lazy reconnect( - std::string host, std::string port, - std::chrono::steady_clock::duration timeout_duration = - std::chrono::seconds(5)) { - config_.host = std::move(host); - config_.port = std::move(port); - config_.timeout_duration = - std::chrono::duration_cast(timeout_duration); - reset(); - return connect(is_reconnect_t{true}); - } - - [[nodiscard]] async_simple::coro::Lazy reconnect( - std::string endpoint, - std::chrono::steady_clock::duration timeout_duration = - std::chrono::seconds(5)) { - auto pos = endpoint.find(':'); - config_.host = endpoint.substr(0, pos); - config_.port = endpoint.substr(pos + 1); - config_.timeout_duration = - std::chrono::duration_cast(timeout_duration); - reset(); - return connect(is_reconnect_t{true}); - } /*! * Connect server * @@ -218,22 +230,50 @@ class coro_rpc_client { std::string host, std::string port, std::chrono::steady_clock::duration timeout_duration = std::chrono::seconds(5)) { + auto lock_ok = connect_mutex_.tryLock(); + if (!lock_ok) { + co_await connect_mutex_.coScopedLock(); + co_return err_code{}; + // do nothing, someone has reconnect the client + } config_.host = std::move(host); config_.port = std::move(port); config_.timeout_duration = std::chrono::duration_cast(timeout_duration); - return connect(); + auto ret = co_await connect_impl(); + connect_mutex_.unlock(); + co_return std::move(ret); } [[nodiscard]] async_simple::coro::Lazy connect( std::string_view endpoint, std::chrono::steady_clock::duration timeout_duration = std::chrono::seconds(5)) { auto pos = endpoint.find(':'); + auto lock_ok = connect_mutex_.tryLock(); + if (!lock_ok) { + co_await connect_mutex_.coScopedLock(); + co_return err_code{}; + // do nothing, someone has reconnect the client + } config_.host = endpoint.substr(0, pos); config_.port = endpoint.substr(pos + 1); config_.timeout_duration = std::chrono::duration_cast(timeout_duration); - return connect(); + auto ret = co_await connect_impl(); + connect_mutex_.unlock(); + co_return std::move(ret); + } + + [[nodiscard]] async_simple::coro::Lazy connect() { + auto lock_ok = connect_mutex_.tryLock(); + if (!lock_ok) { + co_await connect_mutex_.coScopedLock(); + co_return err_code{}; + // do nothing, someone has reconnect the client + } + auto ret = co_await connect_impl(); + connect_mutex_.unlock(); + co_return std::move(ret); } #ifdef YLT_ENABLE_SSL @@ -259,10 +299,9 @@ class coro_rpc_client { * @return RPC call result */ template - async_simple::coro::Lazy< - rpc_result()), coro_rpc_protocol>> - call(Args... args) { - return call_for(std::chrono::seconds(5), std::move(args)...); + async_simple::coro::Lazy())>> call( + Args &&...args) { + return call_for(std::chrono::seconds(5), std::forward(args)...); } /*! @@ -277,62 +316,27 @@ class coro_rpc_client { * @return RPC call result */ template - async_simple::coro::Lazy< - rpc_result()), coro_rpc_protocol>> - call_for(auto duration, Args... args) { - using R = decltype(get_return_type()); - - if (has_closed_) - AS_UNLIKELY { - ELOGV(ERROR, "client has been closed, please re-connect"); - auto ret = rpc_result{ - unexpect_t{}, - rpc_error{errc::io_error, - "client has been closed, please re-connect"}}; - co_return ret; + async_simple::coro::Lazy())>> + call_for(auto duration, Args &&...args) { + using return_type = decltype(get_return_type()); + auto async_result = + co_await co_await send_request_for_with_attachment( + duration, req_attachment_, std::forward(args)...); + req_attachment_ = {}; + if (async_result) { + control_->resp_buffer_ = async_result->release_buffer(); + if constexpr (std::is_same_v) { + co_return expected{}; + } + else { + co_return expected{ + std::move(async_result->result())}; } - - rpc_result ret; -#ifdef YLT_ENABLE_SSL - if (!ssl_init_ret_) { - ret = rpc_result{ - unexpect_t{}, - rpc_error{errc::not_connected, - std::string{make_error_message(errc::not_connected)}}}; - co_return ret; - } -#endif - - static_check(); - - timeout(duration, "rpc call timer canceled").start([](auto &&) { - }); - -#ifdef YLT_ENABLE_SSL - if (!config_.ssl_cert_path.empty()) { - assert(ssl_stream_); - ret = co_await call_impl(*ssl_stream_, std::move(args)...); } else { -#endif - ret = co_await call_impl(control_->socket_, std::move(args)...); -#ifdef YLT_ENABLE_SSL + co_return expected{ + unexpect_t{}, std::move(async_result.error())}; } -#endif - - std::error_code err_code; - timer_.cancel(err_code); - - if (control_->is_timeout_) { - ret = rpc_result{ - unexpect_t{}, rpc_error{errc::timed_out, "rpc call timed out"}}; - } - -#ifdef UNIT_TEST_INJECT - ELOGV(INFO, "client_id %d call %s %s", config_.client_id, - get_func_name().data(), ret ? "ok" : "failed"); -#endif - co_return ret; } /*! @@ -343,11 +347,7 @@ class coro_rpc_client { uint32_t get_client_id() const { return config_.client_id; } void close() { - if (has_closed_) { - return; - } - has_closed_ = true; - ELOGV(INFO, "client_id %d close", config_.client_id); + // ELOGV(INFO, "client_id %d close", config_.client_id); close_socket(control_); } @@ -360,10 +360,12 @@ class coro_rpc_client { return true; } - std::string_view get_resp_attachment() const { return resp_attachment_buf_; } + std::string_view get_resp_attachment() const { + return control_->resp_buffer_.resp_attachment_buf_; + } std::string release_resp_attachment() { - return std::move(resp_attachment_buf_); + return std::move(control_->resp_buffer_.resp_attachment_buf_); } template @@ -381,37 +383,35 @@ class coro_rpc_client { control_->socket_ = asio::ip::tcp::socket(control_->executor_.get_asio_executor()); control_->is_timeout_ = false; - has_closed_ = false; + control_->has_closed_ = false; } static bool is_ok(coro_rpc::err_code ec) noexcept { return !ec; } - [[nodiscard]] async_simple::coro::Lazy connect( - is_reconnect_t is_reconnect = is_reconnect_t{false}) { + + [[nodiscard]] async_simple::coro::Lazy connect_impl() { + if (should_reset_) { + reset(); + } + else { + should_reset_ = true; + } #ifdef YLT_ENABLE_SSL if (!ssl_init_ret_) { std::cout << "ssl_init_ret_: " << ssl_init_ret_ << std::endl; co_return errc::not_connected; } #endif - if (!is_reconnect.value && has_closed_) - AS_UNLIKELY { - ELOGV(ERROR, - "a closed client is not allowed connect again, please use " - "reconnect function or create a new " - "client"); - co_return errc::io_error; - } - has_closed_ = false; + control_->has_closed_ = false; ELOGV(INFO, "client_id %d begin to connect %s", config_.client_id, config_.port.data()); - timeout(config_.timeout_duration, "connect timer canceled") + timeout(*this->timer_, config_.timeout_duration, "connect timer canceled") .start([](auto &&) { }); std::error_code ec = co_await coro_io::async_connect( &control_->executor_, control_->socket_, config_.host, config_.port); std::error_code err_code; - timer_.cancel(err_code); + timer_->cancel(err_code); if (ec) { if (control_->is_timeout_) { @@ -424,14 +424,15 @@ class coro_rpc_client { ELOGV(WARN, "client_id %d connect timeout", config_.client_id); co_return errc::timed_out; } - - control_->socket_.set_option(asio::ip::tcp::no_delay(true), ec); + if (config_.enable_tcp_no_delay_ == true) { + control_->socket_.set_option(asio::ip::tcp::no_delay(true), ec); + } #ifdef YLT_ENABLE_SSL if (!config_.ssl_cert_path.empty()) { - assert(ssl_stream_); + assert(control_->ssl_stream_); auto shake_ec = co_await coro_io::async_handshake( - ssl_stream_, asio::ssl::stream_base::client); + control_->ssl_stream_, asio::ssl::stream_base::client); if (shake_ec) { ELOGV(WARN, "client_id %d handshake failed: %s", config_.client_id, shake_ec.message().data()); @@ -461,7 +462,7 @@ class coro_rpc_client { ssl_ctx_.set_verify_mode(asio::ssl::verify_peer); ssl_ctx_.set_verify_callback( asio::ssl::host_name_verification(config_.ssl_domain)); - ssl_stream_ = + control_->ssl_stream_ = std::make_unique>( control_->socket_, ssl_ctx_); ssl_init_ret_ = true; @@ -471,10 +472,12 @@ class coro_rpc_client { return ssl_init_ret_; } #endif - async_simple::coro::Lazy timeout(auto duration, std::string err_msg) { - timer_.expires_after(duration); + + async_simple::coro::Lazy timeout(coro_io::period_timer &timer, + auto duration, std::string err_msg) { + timer.expires_after(duration); std::weak_ptr socket_watcher = control_; - bool is_timeout = co_await timer_.async_await(); + bool is_timeout = co_await timer.async_await(); if (!is_timeout) { co_return false; } @@ -530,148 +533,6 @@ class coro_rpc_client { } } - template - async_simple::coro::Lazy< - rpc_result()), coro_rpc_protocol>> - call_impl(Socket &socket, Args... args) { - using R = decltype(get_return_type()); - - auto buffer = prepare_buffer(std::move(args)...); - - rpc_result r{}; - if (buffer.empty()) { - r = rpc_result{ - unexpect_t{}, rpc_error{errc::message_too_large, - "rpc body serialize size too big"}}; - co_return r; - } -#ifdef GENERATE_BENCHMARK_DATA - std::ofstream file( - benchmark_file_path + std::string{get_func_name()} + ".in", - std::ofstream::binary | std::ofstream::out); - file << std::string_view{(char *)buffer.data(), buffer.size()}; - file.close(); -#endif - std::pair ret; -#ifdef UNIT_TEST_INJECT - if (g_action == inject_action::client_send_bad_header) { - buffer[0] = (std::byte)(uint8_t(buffer[0]) + 1); - } - if (g_action == inject_action::client_close_socket_after_send_header) { - ret = co_await coro_io::async_write( - socket, asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN)); - ELOGV(INFO, "client_id %d close socket", config_.client_id); - close(); - r = rpc_result{ - unexpect_t{}, rpc_error{errc::io_error, ret.first.message()}}; - co_return r; - } - else if (g_action == - inject_action::client_close_socket_after_send_partial_header) { - ret = co_await coro_io::async_write( - socket, - asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN - 1)); - ELOGV(INFO, "client_id %d close socket", config_.client_id); - close(); - r = rpc_result{ - unexpect_t{}, rpc_error{errc::io_error, ret.first.message()}}; - co_return r; - } - else if (g_action == - inject_action::client_shutdown_socket_after_send_header) { - ret = co_await coro_io::async_write( - socket, asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN)); - ELOGV(INFO, "client_id %d shutdown", config_.client_id); - control_->socket_.shutdown(asio::ip::tcp::socket::shutdown_send); - r = rpc_result{ - unexpect_t{}, rpc_error{errc::io_error, ret.first.message()}}; - co_return r; - } - else { -#endif - if (req_attachment_.empty()) { - ret = co_await coro_io::async_write( - socket, asio::buffer(buffer.data(), buffer.size())); - } - else { - std::array iov{ - asio::const_buffer{buffer.data(), buffer.size()}, - asio::const_buffer{req_attachment_.data(), req_attachment_.size()}}; - ret = co_await coro_io::async_write(socket, iov); - req_attachment_ = {}; - } -#ifdef UNIT_TEST_INJECT - } -#endif - if (!ret.first) { -#ifdef UNIT_TEST_INJECT - if (g_action == inject_action::client_close_socket_after_send_payload) { - ELOGV(INFO, "client_id %d client_close_socket_after_send_payload", - config_.client_id); - r = rpc_result{ - unexpect_t{}, rpc_error{errc::io_error, ret.first.message()}}; - close(); - co_return r; - } -#endif - coro_rpc_protocol::resp_header header; - ret = co_await coro_io::async_read( - socket, - asio::buffer((char *)&header, coro_rpc_protocol::RESP_HEAD_LEN)); - if (!ret.first) { - uint32_t body_len = header.length; - struct_pack::detail::resize(read_buf_, body_len); - if (header.attach_length == 0) { - ret = co_await coro_io::async_read( - socket, asio::buffer(read_buf_.data(), body_len)); - resp_attachment_buf_.clear(); - } - else { - struct_pack::detail::resize(resp_attachment_buf_, - header.attach_length); - std::array iov{ - asio::mutable_buffer{read_buf_.data(), body_len}, - asio::mutable_buffer{resp_attachment_buf_.data(), - resp_attachment_buf_.size()}}; - ret = co_await coro_io::async_read(socket, iov); - } - if (!ret.first) { -#ifdef GENERATE_BENCHMARK_DATA - std::ofstream file( - benchmark_file_path + std::string{get_func_name()} + ".out", - std::ofstream::binary | std::ofstream::out); - file << std::string_view{(char *)&header, - coro_rpc_protocol::RESP_HEAD_LEN}; - file << read_buf_; - file << resp_attachment_buf_; - file.close(); -#endif - bool ec = false; - r = handle_response_buffer(read_buf_, header.err_code, ec); - if (ec) { - close(); - } - co_return r; - } - } - } -#ifdef UNIT_TEST_INJECT - if (g_action == inject_action::force_inject_client_write_data_timeout) { - control_->is_timeout_ = true; - } -#endif - if (control_->is_timeout_) { - r = rpc_result{ - unexpect_t{}, rpc_error{.code = errc::timed_out, .msg = {}}}; - } - else { - r = rpc_result{ - unexpect_t{}, - rpc_error{.code = errc::io_error, .msg = ret.first.message()}}; - } - close(); - co_return r; - } /* * buffer layout * ┌────────────────┬────────────────┐ @@ -681,7 +542,7 @@ class coro_rpc_client { * └────────────────┴────────────────┘ */ template - std::vector prepare_buffer(Args &&...args) { + std::vector prepare_buffer(uint32_t &id, Args &&...args) { std::vector buffer; std::size_t offset = coro_rpc_protocol::REQ_HEAD_LEN; if constexpr (sizeof...(Args) > 0) { @@ -697,8 +558,11 @@ class coro_rpc_client { header.magic = coro_rpc_protocol::magic_number; header.function_id = func_id(); header.attach_length = req_attachment_.size(); + id = request_id_++; + ELOG_TRACE << "send request ID:" << id << "."; + header.seq_num = id; + #ifdef UNIT_TEST_INJECT - header.seq_num = config_.client_id; if (g_action == inject_action::client_send_bad_magic_num) { header.magic = coro_rpc_protocol::magic_number + 1; } @@ -720,9 +584,9 @@ class coro_rpc_client { } template - rpc_result handle_response_buffer(std::string &buffer, - uint8_t rpc_errc, - bool &error_happen) { + static rpc_result handle_response_buffer(std::string_view buffer, + uint8_t rpc_errc, + bool &has_error) { rpc_return_type_t ret; struct_pack::err_code ec; rpc_error err; @@ -743,23 +607,22 @@ class coro_rpc_client { err.val() = rpc_errc; ec = struct_pack::deserialize_to(err.msg, buffer); if SP_LIKELY (!ec) { - ELOGV(WARNING, "deserilaize rpc result failed"); - error_happen = true; - return rpc_result{unexpect_t{}, std::move(err)}; + has_error = true; + return rpc_result{unexpect_t{}, std::move(err)}; } } else { ec = struct_pack::deserialize_to(err, buffer); if SP_LIKELY (!ec) { - ELOGV(WARNING, "deserilaize rpc result failed"); - return rpc_result{unexpect_t{}, std::move(err)}; + return rpc_result{unexpect_t{}, std::move(err)}; } } } - error_happen = true; + has_error = true; // deserialize failed. + ELOGV(WARNING, "deserilaize rpc result failed"); err = {errc::invalid_rpc_result, "failed to deserialize rpc return value"}; - return rpc_result{unexpect_t{}, std::move(err)}; + return rpc_result{unexpect_t{}, std::move(err)}; } template @@ -768,11 +631,22 @@ class coro_rpc_client { constexpr bool has_conn_v = requires { typename First::return_type; }; return util::get_args(); } + template + static decltype(auto) add_const(U &&u) { + if constexpr (std::is_const_v>) { + return struct_pack::detail::declval(); + } + else { + return struct_pack::detail::declval(); + } + } template void pack_to_impl(Buffer &buffer, std::size_t offset, Args &&...args) { struct_pack::serialize_to_with_offset( - buffer, offset, std::forward(std::forward(args))...); + buffer, offset, + std::forward(args))>( + (std::forward(args)))...); } template @@ -790,16 +664,59 @@ class coro_rpc_client { offset, std::forward(args)...); } + struct async_rpc_raw_result_value_type { + resp_body buffer_; + uint8_t errc_; + }; + + using async_rpc_raw_result = + std::variant; + + struct control_t; + + struct handler_t { + std::unique_ptr timer_; + async_simple::Promise promise_; + handler_t(std::unique_ptr &&timer, + async_simple::Promise &&promise) + : timer_(std::move(timer)), promise_(std::move(promise)) {} + void operator()(resp_body &&buffer, uint8_t rpc_errc) { + timer_->cancel(); + promise_.setValue(async_rpc_raw_result{ + async_rpc_raw_result_value_type{std::move(buffer), rpc_errc}}); + } + void local_error(std::error_code &ec) { + timer_->cancel(); + promise_.setValue(async_rpc_raw_result{ec}); + } + }; struct control_t { - asio::ip::tcp::socket socket_; +#ifdef YLT_ENABLE_SSL + std::unique_ptr> ssl_stream_; +#endif +#ifdef GENERATE_BENCHMARK_DATA + std::string func_name_; +#endif bool is_timeout_; + std::atomic has_closed_ = false; coro_io::ExecutorWrapper<> executor_; + std::unordered_map response_handler_table_; + resp_body resp_buffer_; + asio::ip::tcp::socket socket_; + std::atomic recving_cnt_ = 0; control_t(asio::io_context::executor_type executor, bool is_timeout) - : socket_(executor), is_timeout_(is_timeout), executor_(executor) {} + : socket_(executor), + is_timeout_(is_timeout), + has_closed_(false), + executor_(executor) {} }; static void close_socket( std::shared_ptr control) { + bool expected = false; + if (!control->has_closed_.compare_exchange_strong(expected, true)) { + return; + } control->executor_.schedule([control = std::move(control)]() { asio::error_code ignored_ec; control->socket_.shutdown(asio::ip::tcp::socket::shutdown_both, @@ -816,25 +733,397 @@ class coro_rpc_client { } template - rpc_result()), coro_rpc_protocol> sync_call( - Args &&...args) { + rpc_result())> sync_call(Args &&...args) { return async_simple::coro::syncAwait( call(std::forward(args)...)); } #endif + template + async_simple::coro::Lazy send_request_for_impl( + auto duration, uint32_t &id, coro_io::period_timer &timer, + std::string_view attachment, Args &&...args) { + using R = decltype(get_return_type()); + + if (control_->has_closed_) + AS_UNLIKELY { + ELOGV(ERROR, "client has been closed, please re-connect"); + co_return rpc_error{errc::io_error, + "client has been closed, please re-connect"}; + } + +#ifdef YLT_ENABLE_SSL + if (!ssl_init_ret_) { + co_return rpc_error{errc::not_connected}; + } +#endif + + static_check(); + + if (duration.count() > 0) { + timeout(timer, duration, "rpc call timer canceled").start([](auto &&) { + }); + } + +#ifdef YLT_ENABLE_SSL + if (!config_.ssl_cert_path.empty()) { + assert(control_->ssl_stream_); + co_return co_await send_impl(*control_->ssl_stream_, id, attachment, + std::forward(args)...); + } + else { +#endif + co_return co_await send_impl(control_->socket_, id, attachment, + std::forward(args)...); +#ifdef YLT_ENABLE_SSL + } +#endif + } + + static void send_err_response(control_t *controller, std::error_code &errc) { + if (controller->is_timeout_) { + errc = std::make_error_code(std::errc::timed_out); + } + for (auto &e : controller->response_handler_table_) { + e.second.local_error(errc); + } + controller->response_handler_table_.clear(); + } + template + static async_simple::coro::Lazy recv( + std::shared_ptr controller, Socket &socket) { + std::pair ret; + do { + coro_rpc_protocol::resp_header header; + ret = co_await coro_io::async_read( + socket, + asio::buffer((char *)&header, coro_rpc_protocol::RESP_HEAD_LEN)); + if (ret.first) { + ELOG_ERROR << "read rpc head failed, error msg:" << ret.first.message() + << ". close the socket.value=" << ret.first.value(); + break; + } + uint32_t body_len = header.length; + struct_pack::detail::resize(controller->resp_buffer_.read_buf_, body_len); + if (header.attach_length == 0) { + ret = co_await coro_io::async_read( + socket, + asio::buffer(controller->resp_buffer_.read_buf_.data(), body_len)); + controller->resp_buffer_.resp_attachment_buf_.clear(); + } + else { + struct_pack::detail::resize( + controller->resp_buffer_.resp_attachment_buf_, + header.attach_length); + std::array iov{ + asio::mutable_buffer{controller->resp_buffer_.read_buf_.data(), + body_len}, + asio::mutable_buffer{ + controller->resp_buffer_.resp_attachment_buf_.data(), + controller->resp_buffer_.resp_attachment_buf_.size()}}; + ret = co_await coro_io::async_read(socket, iov); + } + if (ret.first) { + ELOG_ERROR << "read rpc body failed, error msg:" << ret.first.message() + << ". close the socket."; + break; + } +#ifdef GENERATE_BENCHMARK_DATA + std::ofstream file(benchmark_file_path + controller->func_name_ + ".out", + std::ofstream::binary | std::ofstream::out); + file << std::string_view{(char *)&header, + coro_rpc_protocol::RESP_HEAD_LEN}; + file << controller->resp_buffer_.read_buf_; + file << controller->resp_buffer_.resp_attachment_buf_; + file.close(); +#endif + --controller->recving_cnt_; + if (auto iter = controller->response_handler_table_.find(header.seq_num); + iter != controller->response_handler_table_.end()) { + ELOG_TRACE << "find request ID:" << header.seq_num + << ". start notify response handler"; + iter->second(std::move(controller->resp_buffer_), header.err_code); + controller->response_handler_table_.erase(iter); + if (controller->response_handler_table_.empty()) { + co_return; + } + } + else { + ELOG_ERROR << "unexists request ID:" << header.seq_num + << ". close the socket."; + break; + } + } while (true); + close_socket(controller); + send_err_response(controller.get(), ret.first); + co_return; + } + + template + static async_simple::coro::Lazy> deserialize_rpc_result( + async_simple::Future future, + std::weak_ptr watcher) { + auto ret_ = co_await std::move(future); + + if (ret_.index() == 1) [[unlikely]] { // local error + auto &ret = std::get<1>(ret_); + if (ret.value() == static_cast(std::errc::operation_canceled) || + ret.value() == static_cast(std::errc::timed_out)) { + co_return coro_rpc::unexpected{ + rpc_error{errc::timed_out, ret.message()}}; + } + else { + co_return coro_rpc::unexpected{ + rpc_error{errc::io_error, ret.message()}}; + } + } + + bool has_error = false; + auto &ret = std::get<0>(ret_); + auto result = + handle_response_buffer(ret.buffer_.read_buf_, ret.errc_, has_error); + if (has_error) { + if (auto w = watcher.lock(); w) { + close_socket(std::move(w)); + } + } + if (result) { + if constexpr (std::is_same_v) { + co_return async_rpc_result{ + async_rpc_result_value_t{std::move(ret.buffer_)}}; + } + else { + co_return async_rpc_result{async_rpc_result_value_t{ + std::move(result.value()), std::move(ret.buffer_)}}; + } + } + else { + co_return coro_rpc::unexpected{result.error()}; + } + } + + public: + template + async_simple::coro::Lazy())>>> + send_request(Args &&...args) { + return send_request_for_with_attachment(std::chrono::seconds{5}, {}, + std::forward(args)...); + } + + template + async_simple::coro::Lazy())>>> + send_request_with_attachment(std::string_view request_attachment, + Args &&...args) { + return send_request_for_with_attachment(std::chrono::seconds{5}, + request_attachment, + std::forward(args)...); + } + + template + async_simple::coro::Lazy())>>> + send_request_for(Args &&...args) { + return send_request_for_with_attachment(std::chrono::seconds{5}, + std::string_view{}, + std::forward(args)...); + } + + struct recving_guard { + recving_guard(control_t *ctrl) : ctrl_(ctrl) { ctrl_->recving_cnt_++; } + void release() { ctrl_ = nullptr; } + ~recving_guard() { + if (ctrl_) { + --ctrl_->recving_cnt_; + } + } + control_t *ctrl_; + }; + + private: + template + static async_simple::coro::Lazy> build_failed_rpc_result( + rpc_error err) { + co_return unexpected{err}; + } + + public: + template + async_simple::coro::Lazy())>>> + send_request_for_with_attachment(auto time_out_duration, + std::string_view request_attachment, + Args &&...args) { + using rpc_return_t = decltype(get_return_type()); + recving_guard guard(control_.get()); + uint32_t id; + auto timer = std::make_unique( + control_->executor_.get_asio_executor()); + auto result = co_await send_request_for_impl( + time_out_duration, id, *timer, request_attachment, + std::forward(args)...); + auto &control = *control_; + if (!result) { + async_simple::Promise promise; + auto future = promise.getFuture(); + bool is_empty = control.response_handler_table_.empty(); + auto &&[_, is_ok] = control.response_handler_table_.try_emplace( + id, std::move(timer), std::move(promise)); + if (!is_ok) [[unlikely]] { + close(); + co_return build_failed_rpc_result( + rpc_error{coro_rpc::errc::serial_number_conflict}); + } + else { + if (is_empty) { +#ifdef YLT_ENABLE_SSL + if (!config_.ssl_cert_path.empty()) { + assert(control.ssl_stream_); + recv(control_, *control.ssl_stream_).start([](auto &&) { + }); + } + else { +#endif + recv(control_, control.socket_).start([](auto &&) { + }); +#ifdef YLT_ENABLE_SSL + } +#endif + } + guard.release(); + co_return deserialize_rpc_result( + std::move(future), std::weak_ptr{control_}); + } + } + else { + auto i = build_failed_rpc_result(std::move(result)); + co_return build_failed_rpc_result(std::move(result)); + } + } + + uint32_t get_pipeline_size() { return control_->recving_cnt_; } + + private: + template + async_simple::coro::Lazy send_impl(Socket &socket, uint32_t &id, + std::string_view req_attachment, + Args &&...args) { + auto buffer = prepare_buffer(id, std::forward(args)...); + if (buffer.empty()) { + co_return rpc_error{errc::message_too_large}; + } +#ifdef GENERATE_BENCHMARK_DATA + control_->func_name_ = get_func_name(); + std::ofstream file(benchmark_file_path + control_->func_name_ + ".in", + std::ofstream::binary | std::ofstream::out); + file << std::string_view{(char *)buffer.data(), buffer.size()}; + file.close(); +#endif + std::pair ret; +#ifdef UNIT_TEST_INJECT + if (g_action == inject_action::client_send_bad_header) { + buffer[0] = (std::byte)(uint8_t(buffer[0]) + 1); + } + if (g_action == inject_action::client_close_socket_after_send_header) { + ret = co_await coro_io::async_write( + socket, asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN)); + ELOGV(INFO, "client_id %d close socket", config_.client_id); + close(); + co_return rpc_error{errc::io_error, ret.first.message()}; + } + else if (g_action == + inject_action::client_close_socket_after_send_partial_header) { + ret = co_await coro_io::async_write( + socket, + asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN - 1)); + ELOGV(INFO, "client_id %d close socket", config_.client_id); + close(); + co_return rpc_error{errc::io_error, ret.first.message()}; + } + else if (g_action == + inject_action::client_shutdown_socket_after_send_header) { + ret = co_await coro_io::async_write( + socket, asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN)); + ELOGV(INFO, "client_id %d shutdown", config_.client_id); + control_->socket_.shutdown(asio::ip::tcp::socket::shutdown_send); + co_return rpc_error{errc::io_error, ret.first.message()}; + } + else { +#endif + if (req_attachment.empty()) { + while (true) { + bool expected = false; + if (write_mutex_.compare_exchange_weak(expected, true)) { + break; + } + co_await coro_io::post( + []() { + }, + &control_->executor_); + } + ret = co_await coro_io::async_write( + socket, asio::buffer(buffer.data(), buffer.size())); + write_mutex_ = false; + } + else { + std::array iov{ + asio::const_buffer{buffer.data(), buffer.size()}, + asio::const_buffer{req_attachment.data(), req_attachment.size()}}; + while (true) { + bool expected = false; + if (write_mutex_.compare_exchange_weak(expected, true)) { + break; + } + co_await coro_io::post( + []() { + }, + &control_->executor_); + } + ret = co_await coro_io::async_write(socket, iov); + write_mutex_ = false; + } +#ifdef UNIT_TEST_INJECT + } +#endif +#ifdef UNIT_TEST_INJECT + if (g_action == inject_action::force_inject_client_write_data_timeout) { + control_->is_timeout_ = true; + } +#endif +#ifdef UNIT_TEST_INJECT + if (g_action == inject_action::client_close_socket_after_send_payload) { + ELOGV(INFO, "client_id %d client_close_socket_after_send_payload", + config_.client_id); + close(); + co_return rpc_error{errc::io_error, ret.first.message()}; + } +#endif + if (ret.first) { + close(); + if (control_->is_timeout_) { + co_return rpc_error{errc::timed_out}; + } + else { + co_return rpc_error{errc::io_error, ret.first.message()}; + } + } + co_return rpc_error{}; + } + private: - coro_io::period_timer timer_; + bool should_reset_ = false; + async_simple::coro::Mutex connect_mutex_; + std::atomic write_mutex_ = false; + std::atomic request_id_{0}; + std::unique_ptr timer_; std::shared_ptr control_; - std::string read_buf_, resp_attachment_buf_; std::string_view req_attachment_; config config_; constexpr static std::size_t default_read_buf_size_ = 256; #ifdef YLT_ENABLE_SSL asio::ssl::context ssl_ctx_{asio::ssl::context::sslv23}; - std::unique_ptr> ssl_stream_; bool ssl_init_ret_ = true; #endif - std::atomic has_closed_ = false; }; } // namespace coro_rpc diff --git a/include/ylt/coro_rpc/impl/errno.h b/include/ylt/coro_rpc/impl/errno.h index 0d7c1b6bd..2fd4dabac 100644 --- a/include/ylt/coro_rpc/impl/errno.h +++ b/include/ylt/coro_rpc/impl/errno.h @@ -35,6 +35,7 @@ enum class errc : uint16_t { message_too_large, server_has_ran, invalid_rpc_result, + serial_number_conflict, }; inline constexpr std::string_view make_error_message(errc ec) noexcept { switch (ec) { @@ -70,6 +71,8 @@ inline constexpr std::string_view make_error_message(errc ec) noexcept { return "server has ran"; case errc::invalid_rpc_result: return "invalid rpc result"; + case errc::serial_number_conflict: + return "serial number conflict"; default: return "unknown user-defined error"; } @@ -103,8 +106,14 @@ inline bool operator!(errc ec) noexcept { return ec == errc::ok; } struct rpc_error { coro_rpc::err_code code; //!< error code std::string msg; //!< error message + rpc_error() {} + rpc_error(coro_rpc::err_code code, std::string_view msg) + : code(code), msg(std::string{msg}) {} + rpc_error(coro_rpc::err_code code) + : code(code), msg(std::string{make_error_message(code)}) {} uint16_t& val() { return *(uint16_t*)&(code.ec); } const uint16_t& val() const { return *(uint16_t*)&(code.ec); } + constexpr operator bool() const noexcept { return code; } }; STRUCT_PACK_REFL(rpc_error, val(), msg); diff --git a/include/ylt/coro_rpc/impl/expected.hpp b/include/ylt/coro_rpc/impl/expected.hpp index 4a0a7e91e..dc786f780 100644 --- a/include/ylt/coro_rpc/impl/expected.hpp +++ b/include/ylt/coro_rpc/impl/expected.hpp @@ -48,8 +48,7 @@ namespace protocol { struct coro_rpc_protocol; } -template -using rpc_result = expected; +template +using rpc_result = expected; } // namespace coro_rpc \ No newline at end of file diff --git a/include/ylt/standalone/cinatra/coro_http_client.hpp b/include/ylt/standalone/cinatra/coro_http_client.hpp index 590c9c7f4..acbb2b742 100644 --- a/include/ylt/standalone/cinatra/coro_http_client.hpp +++ b/include/ylt/standalone/cinatra/coro_http_client.hpp @@ -284,6 +284,12 @@ class coro_http_client : public std::enable_shared_from_this { // only make socket connet(or handshake) to the host async_simple::coro::Lazy connect(std::string uri) { + if (should_reset_) { + reset(); + } + else { + should_reset_ = false; + } resp_data data{}; bool no_schema = !has_schema(uri); std::string append_uri; @@ -896,11 +902,6 @@ class coro_http_client : public std::enable_shared_from_this { resp_chunk_str_.clear(); } - async_simple::coro::Lazy reconnect(std::string uri) { - reset(); - co_return co_await connect(std::move(uri)); - } - std::string_view get_host() { return host_; } std::string_view get_port() { return port_; } @@ -2119,6 +2120,7 @@ class coro_http_client : public std::enable_shared_from_this { std::string redirect_uri_; bool enable_follow_redirect_ = false; bool enable_timeout_ = false; + bool should_reset_ = false; std::chrono::steady_clock::duration conn_timeout_duration_ = std::chrono::seconds(8); std::chrono::steady_clock::duration req_timeout_duration_ = diff --git a/src/coro_io/tests/test_client_pool.cpp b/src/coro_io/tests/test_client_pool.cpp index 3a9eff4ac..18d00ee5d 100644 --- a/src/coro_io/tests/test_client_pool.cpp +++ b/src/coro_io/tests/test_client_pool.cpp @@ -156,9 +156,9 @@ TEST_CASE("test reconnect") { struct mock_client : public coro_rpc::coro_rpc_client { using coro_rpc::coro_rpc_client::coro_rpc_client; - async_simple::coro::Lazy reconnect( + async_simple::coro::Lazy connect( const std::string &hostname) { - auto ec = co_await this->coro_rpc::coro_rpc_client::reconnect(hostname); + auto ec = co_await this->coro_rpc::coro_rpc_client::connect(hostname); if (ec) { co_await coro_io::sleep_for(300ms); } @@ -184,7 +184,7 @@ TEST_CASE("test reconnect retry wait time exclude reconnect cost time") { CHECK(pool->free_client_count() == 100); auto dur = std::chrono::steady_clock::now() - tp; std::cout << dur.count() << std::endl; - CHECK((dur >= 400ms && dur <= 800ms)); + CHECK((dur >= 500ms && dur <= 799ms)); server.stop(); co_return; }()); diff --git a/src/coro_rpc/examples/base_examples/channel.cpp b/src/coro_rpc/examples/base_examples/channel.cpp index 99f23c833..292696c9f 100644 --- a/src/coro_rpc/examples/base_examples/channel.cpp +++ b/src/coro_rpc/examples/base_examples/channel.cpp @@ -25,11 +25,16 @@ #include #include #include +#include #include #include #include #include +#include "ylt/coro_io/io_context_pool.hpp" +#include "ylt/coro_rpc/impl/coro_rpc_client.hpp" +#include "ylt/coro_rpc/impl/errno.h" +#include "ylt/coro_rpc/impl/expected.hpp" #include "ylt/easylog.hpp" using namespace coro_rpc; using namespace async_simple::coro; @@ -39,37 +44,74 @@ std::string echo(std::string_view sv); std::atomic qps = 0; std::atomic working_echo = 0; -/*! - * \example helloworld/concurrency_clients.main.cpp - * \brief demo for run concurrency clients - */ -Lazy call_echo(std::shared_ptr> channel, - int cnt) { - while (true) { - ++working_echo; - for (int i = 0; i < cnt; ++i) { - auto res = co_await channel->send_request( - [](coro_rpc_client &client, std::string_view hostname) -> Lazy { - auto res = co_await client.call("Hello world!"); - if (!res.has_value()) { - ELOG_ERROR << "coro_rpc err: \n" << res.error().msg; - co_return; - } - if (res.value() != "Hello world!"sv) { - ELOG_ERROR << "err echo resp: \n" << res.value(); - co_return; - } - ++qps; - co_return; - }); - if (!res) { - ELOG_ERROR << "client pool err: connect failed.\n"; - } +int request_cnt = 10000; + +// Lazy> +// call_echo(std::shared_ptr> channel) { +// std::vector result; +// result.reserve(request_cnt); +// auto tp = std::chrono::steady_clock::now(); +// ++working_echo; +// for (int i = 0; i < request_cnt; ++i) { +// auto res = co_await channel->send_request( +// [](coro_rpc_client &client, std::string_view hostname) -> Lazy +// { +// auto res = co_await client.call("Hello world!"); +// if (!res.has_value()) { +// ELOG_ERROR << "coro_rpc err: \n" << res.error().msg; +// co_return; +// } +// if (res.value() != "Hello world!"sv) { +// ELOG_ERROR << "err echo resp: \n" << res.value(); +// co_return; +// } +// co_return; +// }); +// if (!res) { +// ELOG_ERROR << "client pool err: connect failed.\n"; +// break; +// } +// ++qps; +// auto old_tp=tp; +// tp= std::chrono::steady_clock::now(); +// result.push_back(std::chrono::duration_cast( +// tp - old_tp)); +// } +// co_return std::move(result); +// } + +Lazy> call_echo( + std::shared_ptr> channel) { + std::vector result; + result.reserve(request_cnt); + auto tp = std::chrono::steady_clock::now(); + ++working_echo; + for (int i = 0; i < request_cnt; ++i) { + auto res = co_await channel->send_request( + [](coro_rpc_client &client, std::string_view hostname) { + return client.send_request("Hello world!"); + }); + if (!res) { + ELOG_ERROR << "client pool err: connect failed.\n" + << std::make_error_code(res.error()); + break; + } + auto rpc_result = co_await res.value(); + if (!rpc_result) { + ELOG_ERROR << "recv response failed\n" << rpc_result.error().msg; + break; } - --working_echo; - co_await coro_io::sleep_for(30s); + if (rpc_result->result() != "Hello world!") { + ELOG_ERROR << "error rpc reponse\n" << rpc_result->result(); + } + ++qps; + auto old_tp = tp; + tp = std::chrono::steady_clock::now(); + result.push_back( + std::chrono::duration_cast(tp - old_tp)); } + co_return std::move(result); } Lazy qps_watcher() { @@ -87,6 +129,19 @@ Lazy qps_watcher() { } } +std::vector result; +void latency_watcher() { + std::sort(result.begin(), result.end()); + auto arr = {0.1, 0.3, 0.5, 0.7, 0.9, 0.95, 0.99, 0.999, 0.9999, 0.99999, 1.0}; + for (auto e : arr) { + std::cout + << (e * 100) << "% request finished in:" + << result[std::max(0, result.size() * e - 1)].count() / + 1000.0 + << "ms" << std::endl; + } +} + int main() { auto hosts = std::vector{"127.0.0.1:8801", "localhost:8801"}; @@ -95,11 +150,17 @@ int main() { hosts, coro_io::channel::channel_config{ .pool_config{.max_connection = worker_cnt}}); auto chan_ptr = std::make_shared(std::move(chan)); + auto executor = coro_io::get_global_block_executor(); for (int i = 0; i < worker_cnt; ++i) { - call_echo(chan_ptr, 10000).start([](auto &&) { + call_echo(chan_ptr).start([=](auto &&res) { + executor->schedule([res = std::move(res.value())]() mutable { + result.insert(result.end(), res.begin(), res.end()); + --working_echo; + }); }); } syncAwait(qps_watcher()); + latency_watcher(); std::cout << "Done!" << std::endl; return 0; } \ No newline at end of file diff --git a/src/coro_rpc/examples/base_examples/client_pool.cpp b/src/coro_rpc/examples/base_examples/client_pool.cpp index 23ba2b628..165f43017 100644 --- a/src/coro_rpc/examples/base_examples/client_pool.cpp +++ b/src/coro_rpc/examples/base_examples/client_pool.cpp @@ -17,9 +17,13 @@ #include #include #include +#include #include #include #include +#include +#include +#include #include #include #include @@ -27,77 +31,72 @@ #include "async_simple/coro/Collect.h" #include "async_simple/coro/Lazy.h" +#include "async_simple/coro/Mutex.h" #include "async_simple/coro/SyncAwait.h" #include "ylt/coro_io/io_context_pool.hpp" +#include "ylt/coro_rpc/impl/coro_rpc_client.hpp" +#include "ylt/coro_rpc/impl/errno.h" +#include "ylt/coro_rpc/impl/expected.hpp" #include "ylt/easylog.hpp" +#include "ylt/easylog/record.hpp" std::string echo(std::string_view sv); +constexpr unsigned thread_cnt = 1920; +constexpr auto request_cnt = 1000; using namespace coro_rpc; using namespace async_simple::coro; using namespace std::string_view_literals; -std::atomic qps = 0; - +auto finish_executor = coro_io::get_global_block_executor(); std::atomic working_echo = 0; -std::atomic busy_echo = 0; -struct guard { - guard(std::atomic &ref) : ref(ref) { ++ref; } - ~guard() { --ref; } - std::atomic &ref; -}; /*! * \example helloworld/concurrency_clients.main.cpp * \brief demo for run concurrency clients */ -Lazy> call_echo( - coro_io::client_pool &client_pool, int cnt) { - std::vector result; - result.reserve(cnt); +auto client_pool = + coro_io::client_pool::create("127.0.0.1:8801"); + +std::atomic qps = 0; +Lazy> send() { + std::vector latencys; + latencys.reserve(request_cnt); + auto tp = std::chrono::steady_clock::now(); ++working_echo; - for (int i = 0; i < cnt; ++i) { - auto tp = std::chrono::steady_clock::now(); - auto res = co_await client_pool.send_request( - [=](coro_rpc_client &client) -> Lazy { - guard g{busy_echo}; - if (client.has_closed()) { - co_return; - } - auto res = co_await client.call("Hello world!"); - if (!res.has_value()) { - ELOG_ERROR << "coro_rpc err: \n" << res.error().msg; - client.close(); - co_return; - } - if (res.value() != "Hello world!"sv) { - ELOG_ERROR << "err echo resp: \n" << res.value(); - co_return; - } - ++qps; - co_return; + int id = 0; + for (int i = 0; i < request_cnt; ++i) { + auto result = co_await client_pool->send_request( + [](coro_rpc_client& client) -> Lazy> { + co_return co_await client.call("Hello world!"); }); - if (!res) { - ELOG_ERROR << "client pool err: connect failed.\n"; + if (!result) { + ELOG_ERROR << "get client form client pool failed: \n" + << std::make_error_code(result.error()).message(); + continue; } - else { - result.push_back(std::chrono::duration_cast( - std::chrono::steady_clock::now() - tp)); + auto& call_result = result.value(); + if (!call_result) { + ELOG_ERROR << "call err: \n" << call_result.error().msg; + break; } + qps.fetch_add(1, std::memory_order::release); + auto old_tp = tp; + tp = std::chrono::steady_clock::now(); + latencys.push_back( + std::chrono::duration_cast(tp - old_tp)); } - co_return std::move(result); + co_return std::move(latencys); } -Lazy qps_watcher(coro_io::client_pool &clients) { +Lazy qps_watcher() { using namespace std::chrono_literals; - while (working_echo > 0) { + do { co_await coro_io::sleep_for(1s); uint64_t cnt = qps.exchange(0); std::cout << "QPS:" << cnt - << " free connection: " << clients.free_client_count() - << " working echo:" << working_echo << " busy echo:" << busy_echo - << std::endl; - cnt = 0; - } + << " free connection: " << client_pool->free_client_count() + << " working echo:" << working_echo << std::endl; + } while (working_echo > 0); } std::vector result; void latency_watcher() { @@ -112,23 +111,18 @@ void latency_watcher() { } } int main() { - auto thread_cnt = std::thread::hardware_concurrency(); - auto client_pool = coro_io::client_pool::create( - "localhost:8801", - coro_io::client_pool::pool_config{ - .max_connection = thread_cnt * 20, - .client_config = {.timeout_duration = std::chrono::seconds{50}}}); - - auto finish_executor = coro_io::get_global_block_executor(); - for (int i = 0, lim = thread_cnt * 20; i < lim; ++i) { - call_echo(*client_pool, 10000).start([finish_executor](auto &&res) { - finish_executor->schedule([res = std::move(res.value())] { - result.insert(result.end(), res.begin(), res.end()); - --working_echo; + auto executor = coro_io::get_global_block_executor(); + for (int i = 0, lim = thread_cnt * 10; i < lim; ++i) { + coro_io::get_global_executor()->schedule([=]() { + send().start([executor](auto&& res) { + executor->schedule([res = std::move(res.value())]() mutable { + result.insert(result.end(), res.begin(), res.end()); + --working_echo; + }); }); }); } - syncAwait(qps_watcher(*client_pool)); + syncAwait(qps_watcher()); latency_watcher(); std::cout << "Done!" << std::endl; return 0; diff --git a/src/coro_rpc/examples/base_examples/client_pools.cpp b/src/coro_rpc/examples/base_examples/client_pools.cpp index 8bf414d60..dbbe114c7 100644 --- a/src/coro_rpc/examples/base_examples/client_pools.cpp +++ b/src/coro_rpc/examples/base_examples/client_pools.cpp @@ -26,6 +26,8 @@ #include #include #include + +#include "ylt/coro_io/io_context_pool.hpp" std::string echo(std::string_view sv); using namespace coro_rpc; using namespace async_simple::coro; @@ -39,10 +41,15 @@ std::atomic working_echo = 0; * \brief demo for run concurrency clients */ -Lazy call_echo(coro_io::client_pools &client_pools, - int cnt) { +int request_cnt = 10000; + +Lazy> call_echo( + coro_io::client_pools &client_pools) { ++working_echo; - for (int i = 0; i < cnt; ++i) { + std::vector result; + result.reserve(request_cnt); + auto tp = std::chrono::steady_clock::now(); + for (int i = 0; i < request_cnt; ++i) { auto res = co_await client_pools.send_request( i % 2 ? "localhost:8801" : "127.0.0.1:8801", [=](coro_rpc_client &client) -> Lazy { @@ -60,9 +67,14 @@ Lazy call_echo(coro_io::client_pools &client_pools, }); if (!res) { ELOG_ERROR << "client pool err: connect failed.\n"; + break; } + auto old_tp = tp; + tp = std::chrono::steady_clock::now(); + result.push_back( + std::chrono::duration_cast(tp - old_tp)); } - --working_echo; + co_return std::move(result); } Lazy qps_watcher(coro_io::client_pools &clients) { @@ -78,15 +90,32 @@ Lazy qps_watcher(coro_io::client_pools &clients) { cnt = 0; } } - +std::vector result; +void latency_watcher() { + std::sort(result.begin(), result.end()); + auto arr = {0.1, 0.3, 0.5, 0.7, 0.9, 0.95, 0.99, 0.999, 0.9999, 0.99999, 1.0}; + for (auto e : arr) { + std::cout + << (e * 100) << "% request finished in:" + << result[std::max(0, result.size() * e - 1)].count() / + 1000.0 + << "ms" << std::endl; + } +} int main() { auto thread_cnt = std::thread::hardware_concurrency(); auto &clients = coro_io::g_clients_pool(); + auto executor = coro_io::get_global_block_executor(); for (int i = 0, lim = thread_cnt * 20; i < lim; ++i) { - call_echo(clients, 10000).start([](auto &&) { + call_echo(clients).start([=](auto &&res) { + executor->schedule([res = std::move(res.value())]() mutable { + result.insert(result.end(), res.begin(), res.end()); + --working_echo; + }); }); } syncAwait(qps_watcher(clients)); + latency_watcher(); std::cout << "Done!" << std::endl; return 0; } \ No newline at end of file diff --git a/src/coro_rpc/examples/base_examples/concurrent_clients.cpp b/src/coro_rpc/examples/base_examples/concurrent_clients.cpp index c8e8e7a20..bf087a659 100644 --- a/src/coro_rpc/examples/base_examples/concurrent_clients.cpp +++ b/src/coro_rpc/examples/base_examples/concurrent_clients.cpp @@ -13,80 +13,114 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include -#include -#include -#include #include #include #include +#include #include -#include +#include +#include +#include +#include #include #include #include #include +#include "async_simple/coro/Collect.h" +#include "async_simple/coro/Lazy.h" +#include "async_simple/coro/Mutex.h" +#include "async_simple/coro/SyncAwait.h" +#include "ylt/coro_io/io_context_pool.hpp" #include "ylt/coro_rpc/impl/coro_rpc_client.hpp" +#include "ylt/coro_rpc/impl/errno.h" +#include "ylt/coro_rpc/impl/expected.hpp" +#include "ylt/easylog.hpp" std::string echo(std::string_view sv); + +constexpr auto thread_cnt = 96 * 20; using namespace coro_rpc; using namespace async_simple::coro; using namespace std::string_view_literals; -using namespace std::chrono_literals; -std::atomic qps = 0; +auto finish_executor = coro_io::get_global_block_executor(); std::atomic working_echo = 0; -/*! - * \example helloworld/concurrency_clients.main.cpp - * \brief demo for run concurrency clients - */ -Lazy call_echo(int cnt) { +std::vector> clients; + +std::atomic& get_qps(int id) { + static std::atomic ar[thread_cnt * 8]; + return ar[id * 8]; +} + +Lazy> send(int id, int cnt) { + std::vector result; + auto& cli = *clients[id]; + auto& qps = get_qps(id); + result.reserve(cnt); ++working_echo; - coro_rpc_client client; - auto ec = co_await client.connect("localhost:8801"); - for (int i = 0; i < 3 && !ec; ++i) { - co_await coro_io::sleep_for(rand() % 10000 * 1ms); - ec = co_await client.reconnect("localhost:8801"); - } - if (!ec) { - for (int i = 0; i < cnt; ++i) { - auto res = co_await client.call("Hello world!"); - if (!res.has_value()) { - ELOG_ERROR << "coro_rpc err: \n" << res.error().msg; - co_return; - } - if (res.value() != "Hello world!"sv) { - ELOG_ERROR << "err echo resp: \n" << res.value(); - co_return; - } - ++qps; + auto tp = std::chrono::steady_clock::now(); + for (int i = 0; i < cnt; ++i) { + auto res_ = co_await cli.call("Hello world!"); + if (!res_.has_value()) { + ELOG_ERROR << "coro_rpc err: \n" << res_.error().msg; + continue; } - } - else { - std::cout << "connect failed \n"; + ++qps; + auto old_tp = tp; + tp = std::chrono::steady_clock::now(); + result.push_back( + std::chrono::duration_cast(tp - old_tp)); } --working_echo; + co_return std::move(result); } Lazy qps_watcher() { using namespace std::chrono_literals; - while (working_echo > 0) { + do { co_await coro_io::sleep_for(1s); - uint64_t cnt = qps.exchange(0); - std::cout << "QPS:" << cnt << " working echo:" << working_echo << std::endl; + uint64_t cnt = 0; + for (int i = 0; i < thread_cnt; ++i) { + auto& qps = get_qps(i); + uint64_t tmp = qps.exchange(0); + cnt += tmp; + } + std::cout << "QPS:" + << cnt + // << " free connection: " << clients.free_client_count() + << " working echo:" << working_echo << std::endl; cnt = 0; + } while (working_echo > 0); +} +std::vector result; +void latency_watcher() { + std::sort(result.begin(), result.end()); + auto arr = {0.1, 0.3, 0.5, 0.7, 0.9, 0.95, 0.99, 0.999, 0.9999, 0.99999, 1.0}; + for (auto e : arr) { + std::cout + << (e * 100) << "% request finished in:" + << result[std::max(0, result.size() * e - 1)].count() / + 1000.0 + << "ms" << std::endl; } } - int main() { - auto thread_cnt = std::thread::hardware_concurrency(); - for (int i = 0, lim = thread_cnt * 20; i < lim; ++i) { - call_echo(100000).start([](auto &&) { + for (int i = 0; i < thread_cnt; ++i) { + clients.emplace_back(std::make_unique( + coro_io::get_global_executor()->get_asio_executor())); + syncAwait(clients.back()->connect("localhost:8801")); + } + for (int i = 0, lim = thread_cnt; i < lim; ++i) { + send(i, 20000).via(&clients[i]->get_executor()).start([](auto&& res) { + finish_executor->schedule([res = std::move(res.value())] { + result.insert(result.end(), res.begin(), res.end()); + }); }); } syncAwait(qps_watcher()); + latency_watcher(); std::cout << "Done!" << std::endl; return 0; } \ No newline at end of file diff --git a/src/coro_rpc/examples/base_examples/rpc_service.cpp b/src/coro_rpc/examples/base_examples/rpc_service.cpp index 27f04db59..da6eb6db3 100644 --- a/src/coro_rpc/examples/base_examples/rpc_service.cpp +++ b/src/coro_rpc/examples/base_examples/rpc_service.cpp @@ -102,9 +102,11 @@ Lazy nested_echo(std::string_view sv) { coro_io::g_clients_pool().at("127.0.0.1:8802"); assert(client != nullptr); ELOGV(INFO, "connect another server"); - auto ret = co_await client->send_request([sv](coro_rpc_client &client) { - return client.call(sv); - }); + auto ret = co_await client->send_request( + [sv](coro_rpc_client &client) + -> Lazy> { + co_return co_await client.call(sv); + }); co_return ret.value().value(); } diff --git a/src/coro_rpc/tests/test_coro_rpc_client.cpp b/src/coro_rpc/tests/test_coro_rpc_client.cpp index 71b69eab4..cd5314001 100644 --- a/src/coro_rpc/tests/test_coro_rpc_client.cpp +++ b/src/coro_rpc/tests/test_coro_rpc_client.cpp @@ -56,6 +56,8 @@ Lazy> create_client( co_return client; } +void show(auto& s) { return; } + TEST_CASE("testing client") { { coro_rpc::coro_rpc_client client; @@ -146,6 +148,7 @@ TEST_CASE("testing client") { std::string arg; arg.resize(2048); auto ret = co_await client->template call(arg); + show(ret); CHECK(ret.value() == arg); co_return; }; @@ -205,6 +208,7 @@ TEST_CASE("testing client with inject server") { auto client = co_await create_client(io_context, port); g_action = inject_action::close_socket_after_send_length; auto ret = co_await client->template call(); + show(ret); REQUIRE_MESSAGE(ret.error().code == coro_rpc::errc::io_error, ret.error().msg); }; diff --git a/src/coro_rpc/tests/test_coro_rpc_server.cpp b/src/coro_rpc/tests/test_coro_rpc_server.cpp index 6cf512043..23f522204 100644 --- a/src/coro_rpc/tests/test_coro_rpc_server.cpp +++ b/src/coro_rpc/tests/test_coro_rpc_server.cpp @@ -346,13 +346,6 @@ TEST_CASE("test server accept error") { REQUIRE_MESSAGE(ret.error().code == coro_rpc::errc::io_error, ret.error().msg); REQUIRE(client.has_closed() == true); - - ec = syncAwait(client.connect("127.0.0.1", "8810")); - REQUIRE_MESSAGE(ec == coro_rpc::errc::io_error, - std::to_string(client.get_client_id()).append(ec.message())); - ret = syncAwait(client.call()); - CHECK(!ret); - REQUIRE(client.has_closed() == true); g_action = {}; } diff --git a/src/coro_rpc/tests/test_router.cpp b/src/coro_rpc/tests/test_router.cpp index 9c7aaac0d..c344bfe51 100644 --- a/src/coro_rpc/tests/test_router.cpp +++ b/src/coro_rpc/tests/test_router.cpp @@ -67,12 +67,11 @@ struct RPC_trait { }; using coro_rpc_protocol = coro_rpc::protocol::coro_rpc_protocol; template -rpc_result, coro_rpc_protocol> -get_result(const auto &pair) { +rpc_result> get_result( + const auto &pair) { auto &&[rpc_errc, buffer] = pair; using T = util::function_return_type_t; - using return_type = rpc_result, - coro_rpc_protocol>; + using return_type = rpc_result>; rpc_return_type_t ret; struct_pack::err_code ec; rpc_error err;