Skip to content

Commit

Permalink
[coro_rpc_client][improve]avoid create timer everytime when call rpc (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Mar 20, 2024
1 parent a414eea commit 3b73dfa
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 38 deletions.
28 changes: 16 additions & 12 deletions include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,11 @@ inline async_simple::coro::Lazy<std::error_code> async_handshake(
#endif
class period_timer : public asio::steady_timer {
public:
using asio::steady_timer::steady_timer;
template <typename T>
period_timer(coro_io::ExecutorWrapper<T> *executor)
: asio::steady_timer(executor->get_asio_executor()) {}
template <typename executor_t, typename Rep, typename Period>
period_timer(const executor_t &executor,
const std::chrono::duration<Rep, Period> &timeout_duration)
: asio::steady_timer(executor, timeout_duration) {}

async_simple::coro::Lazy<bool> async_await() noexcept {
callback_awaitor<bool> awaitor;

Expand Down Expand Up @@ -312,10 +310,10 @@ inline async_simple::coro::Lazy<void> sleep_for(Duration d) {
}
}

template <typename R, typename Func>
template <typename R, typename Func, typename Executor>
struct post_helper {
void operator()(auto handler) const {
asio::dispatch(e->get_asio_executor(), [this, handler]() {
asio::dispatch(e, [this, handler]() {
try {
if constexpr (std::is_same_v<R, async_simple::Try<void>>) {
func();
Expand All @@ -332,24 +330,30 @@ struct post_helper {
}
});
}
coro_io::ExecutorWrapper<> *e;
Executor e;
Func func;
};

template <typename Func>
template <typename Func, typename Executor>
inline async_simple::coro::Lazy<
async_simple::Try<typename util::function_traits<Func>::return_type>>
post(Func func,
coro_io::ExecutorWrapper<> *e = coro_io::get_global_block_executor()) {
post(Func func, Executor executor) {
using R =
async_simple::Try<typename util::function_traits<Func>::return_type>;

callback_awaitor<R> awaitor;

post_helper<R, Func> helper{e, std::move(func)};
post_helper<R, Func, Executor> helper{executor, std::move(func)};
co_return co_await awaitor.await_resume(helper);
}

template <typename Func>
inline async_simple::coro::Lazy<
async_simple::Try<typename util::function_traits<Func>::return_type>>
post(Func func,
coro_io::ExecutorWrapper<> *e = coro_io::get_global_block_executor()) {
co_return co_await post(std::move(func), e->get_asio_executor());
}

template <typename R>
struct coro_channel
: public asio::experimental::channel<void(std::error_code, R)> {
Expand Down
39 changes: 13 additions & 26 deletions include/ylt/coro_rpc/impl/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class coro_rpc_client {
coro_rpc_client(asio::io_context::executor_type executor,
uint32_t client_id = 0)
: executor(executor),
timer_(executor),
socket_(std::make_shared<asio::ip::tcp::socket>(executor)) {
config_.client_id = client_id;
}
Expand All @@ -142,6 +143,7 @@ class coro_rpc_client {
coro_io::ExecutorWrapper<> &executor = *coro_io::get_global_executor(),
uint32_t client_id = 0)
: executor(executor.get_asio_executor()),
timer_(executor.get_asio_executor()),
socket_(std::make_shared<asio::ip::tcp::socket>(
executor.get_asio_executor())) {
config_.client_id = client_id;
Expand Down Expand Up @@ -306,11 +308,8 @@ class coro_rpc_client {

static_check<func, Args...>();

async_simple::Promise<async_simple::Unit> promise;
coro_io::period_timer timer(&executor);
timeout(timer, duration, promise, "rpc call timer canceled")
.via(&executor)
.detach();
timeout(duration, "rpc call timer canceled").start([](auto &&) {
});

#ifdef YLT_ENABLE_SSL
if (!config_.ssl_cert_path.empty()) {
Expand All @@ -325,15 +324,14 @@ class coro_rpc_client {
#endif

std::error_code err_code;
timer.cancel(err_code);
timer_.cancel(err_code);

if (is_timeout_) {
ret = rpc_result<R, coro_rpc_protocol>{
unexpect_t{},
coro_rpc_protocol::rpc_error{errc::timed_out, "rpc call timed out"}};
}

co_await promise.getFuture();
#ifdef UNIT_TEST_INJECT
ELOGV(INFO, "client_id %d call %s %s", config_.client_id,
get_func_name<func>().data(), ret ? "ok" : "failed");
Expand Down Expand Up @@ -410,18 +408,15 @@ class coro_rpc_client {

ELOGV(INFO, "client_id %d begin to connect %s", config_.client_id,
config_.port.data());
async_simple::Promise<async_simple::Unit> promise;
coro_io::period_timer timer(&executor);
timeout(timer, config_.timeout_duration, promise, "connect timer canceled")
.via(&executor)
.detach();
timeout(config_.timeout_duration, "connect timer canceled")
.start([](auto &&) {
});

std::error_code ec = co_await coro_io::async_connect(
&executor, *socket_, config_.host, config_.port);
std::error_code err_code;
timer.cancel(err_code);
timer_.cancel(err_code);

co_await promise.getFuture();
if (ec) {
if (is_timeout_) {
co_return errc::timed_out;
Expand Down Expand Up @@ -478,24 +473,15 @@ class coro_rpc_client {
return ssl_init_ret_;
}
#endif
async_simple::coro::Lazy<bool> timeout(auto &timer, auto duration,
auto &promise, std::string err_msg) {
timer.expires_after(duration);
bool is_timeout = co_await timer.async_await();
#ifdef UNIT_TEST_INJECT
ELOGV(INFO, "client_id %d %s, is_timeout_ %d, %d , duration %d ms",
config_.client_id, err_msg.data(), is_timeout_, is_timeout,
std::chrono::duration_cast<std::chrono::milliseconds>(duration)
.count());
#endif
async_simple::coro::Lazy<bool> timeout(auto duration, std::string err_msg) {
timer_.expires_after(duration);
bool is_timeout = co_await timer_.async_await();
if (!is_timeout) {
promise.setValue(async_simple::Unit());
co_return false;
}

is_timeout_ = is_timeout;
close_socket(socket_);
promise.setValue(async_simple::Unit());
co_return true;
}

Expand Down Expand Up @@ -833,6 +819,7 @@ class coro_rpc_client {
#endif
private:
coro_io::ExecutorWrapper<> executor;
coro_io::period_timer timer_;
std::shared_ptr<asio::ip::tcp::socket> socket_;
std::string read_buf_, resp_attachment_buf_;
std::string_view req_attachment_;
Expand Down

0 comments on commit 3b73dfa

Please sign in to comment.