From 6154528e41943732f7b739c1085247162923e758 Mon Sep 17 00:00:00 2001 From: qicosmos Date: Wed, 20 Mar 2024 14:54:32 +0800 Subject: [PATCH] avoid create timer everytime when call rpc --- include/ylt/coro_io/coro_io.hpp | 28 +++++++++------- include/ylt/coro_rpc/impl/coro_rpc_client.hpp | 33 ++++++++----------- 2 files changed, 29 insertions(+), 32 deletions(-) diff --git a/include/ylt/coro_io/coro_io.hpp b/include/ylt/coro_io/coro_io.hpp index 7c0047368..81cd605a4 100644 --- a/include/ylt/coro_io/coro_io.hpp +++ b/include/ylt/coro_io/coro_io.hpp @@ -275,13 +275,11 @@ inline async_simple::coro::Lazy async_handshake( #endif class period_timer : public asio::steady_timer { public: + using asio::steady_timer::steady_timer; template period_timer(coro_io::ExecutorWrapper *executor) : asio::steady_timer(executor->get_asio_executor()) {} - template - period_timer(const executor_t &executor, - const std::chrono::duration &timeout_duration) - : asio::steady_timer(executor, timeout_duration) {} + async_simple::coro::Lazy async_await() noexcept { callback_awaitor awaitor; @@ -312,10 +310,10 @@ inline async_simple::coro::Lazy sleep_for(Duration d) { } } -template +template struct post_helper { void operator()(auto handler) const { - asio::dispatch(e->get_asio_executor(), [this, handler]() { + asio::dispatch(e, [this, handler]() { try { if constexpr (std::is_same_v>) { func(); @@ -332,24 +330,30 @@ struct post_helper { } }); } - coro_io::ExecutorWrapper<> *e; + Executor e; Func func; }; -template +template inline async_simple::coro::Lazy< async_simple::Try::return_type>> -post(Func func, - coro_io::ExecutorWrapper<> *e = coro_io::get_global_block_executor()) { +post(Func func, Executor executor) { using R = async_simple::Try::return_type>; callback_awaitor awaitor; - - post_helper helper{e, std::move(func)}; + post_helper helper{executor, std::move(func)}; co_return co_await awaitor.await_resume(helper); } +template +inline async_simple::coro::Lazy< + async_simple::Try::return_type>> +post(Func func, + coro_io::ExecutorWrapper<> *e = coro_io::get_global_block_executor()) { + co_return co_await post(std::move(func), e->get_asio_executor()); +} + template struct coro_channel : public asio::experimental::channel { diff --git a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp index 9038fc391..bc2f8c3eb 100644 --- a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp +++ b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp @@ -130,6 +130,7 @@ class coro_rpc_client { coro_rpc_client(asio::io_context::executor_type executor, uint32_t client_id = 0) : executor(executor), + timer_(executor), socket_(std::make_shared(executor)) { config_.client_id = client_id; } @@ -142,6 +143,7 @@ class coro_rpc_client { coro_io::ExecutorWrapper<> &executor = *coro_io::get_global_executor(), uint32_t client_id = 0) : executor(executor.get_asio_executor()), + timer_(executor.get_asio_executor()), socket_(std::make_shared( executor.get_asio_executor())) { config_.client_id = client_id; @@ -306,11 +308,8 @@ class coro_rpc_client { static_check(); - async_simple::Promise promise; - coro_io::period_timer timer(&executor); - timeout(timer, duration, promise, "rpc call timer canceled") - .via(&executor) - .detach(); + timeout(duration, "rpc call timer canceled").start([](auto &&) { + }); #ifdef YLT_ENABLE_SSL if (!config_.ssl_cert_path.empty()) { @@ -325,7 +324,7 @@ class coro_rpc_client { #endif std::error_code err_code; - timer.cancel(err_code); + timer_.cancel(err_code); if (is_timeout_) { ret = rpc_result{ @@ -333,7 +332,6 @@ class coro_rpc_client { coro_rpc_protocol::rpc_error{errc::timed_out, "rpc call timed out"}}; } - co_await promise.getFuture(); #ifdef UNIT_TEST_INJECT ELOGV(INFO, "client_id %d call %s %s", config_.client_id, get_func_name().data(), ret ? "ok" : "failed"); @@ -410,18 +408,15 @@ class coro_rpc_client { ELOGV(INFO, "client_id %d begin to connect %s", config_.client_id, config_.port.data()); - async_simple::Promise promise; - coro_io::period_timer timer(&executor); - timeout(timer, config_.timeout_duration, promise, "connect timer canceled") - .via(&executor) - .detach(); + timeout(config_.timeout_duration, "connect timer canceled") + .start([](auto &&) { + }); std::error_code ec = co_await coro_io::async_connect( &executor, *socket_, config_.host, config_.port); std::error_code err_code; - timer.cancel(err_code); + timer_.cancel(err_code); - co_await promise.getFuture(); if (ec) { if (is_timeout_) { co_return errc::timed_out; @@ -478,10 +473,9 @@ class coro_rpc_client { return ssl_init_ret_; } #endif - async_simple::coro::Lazy timeout(auto &timer, auto duration, - auto &promise, std::string err_msg) { - timer.expires_after(duration); - bool is_timeout = co_await timer.async_await(); + async_simple::coro::Lazy timeout(auto duration, std::string err_msg) { + timer_.expires_after(duration); + bool is_timeout = co_await timer_.async_await(); #ifdef UNIT_TEST_INJECT ELOGV(INFO, "client_id %d %s, is_timeout_ %d, %d , duration %d ms", config_.client_id, err_msg.data(), is_timeout_, is_timeout, @@ -489,13 +483,11 @@ class coro_rpc_client { .count()); #endif if (!is_timeout) { - promise.setValue(async_simple::Unit()); co_return false; } is_timeout_ = is_timeout; close_socket(socket_); - promise.setValue(async_simple::Unit()); co_return true; } @@ -833,6 +825,7 @@ class coro_rpc_client { #endif private: coro_io::ExecutorWrapper<> executor; + coro_io::period_timer timer_; std::shared_ptr socket_; std::string read_buf_, resp_attachment_buf_; std::string_view req_attachment_;