Skip to content

Commit

Permalink
[coro_rpc]rpc client support send_request without wait for response (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle authored May 17, 2024
1 parent 6e684c0 commit a41f755
Show file tree
Hide file tree
Showing 16 changed files with 903 additions and 489 deletions.
75 changes: 37 additions & 38 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ class client_pool : public std::enable_shared_from_this<
break;
}
while (true) {
ELOG_DEBUG << "start collect timeout client of pool{"
ELOG_TRACE << "start collect timeout client of pool{"
<< self->host_name_
<< "}, now client count: " << clients.size();
std::size_t is_all_cleared = clients.clear_old(clear_cnt);
ELOG_DEBUG << "finish collect timeout client of pool{"
ELOG_TRACE << "finish collect timeout client of pool{"
<< self->host_name_
<< "}, now client cnt: " << clients.size();
if (is_all_cleared != 0) [[unlikely]] {
Expand All @@ -109,36 +109,42 @@ class client_pool : public std::enable_shared_from_this<

static auto rand_time(std::chrono::milliseconds ms) {
static thread_local std::default_random_engine r;
std::uniform_real_distribution e(0.7f, 1.3f);
std::uniform_real_distribution e(1.0f, 1.2f);
return std::chrono::milliseconds{static_cast<long>(e(r) * ms.count())};
}

async_simple::coro::Lazy<void> reconnect(std::unique_ptr<client_t>& client) {
static async_simple::coro::Lazy<void> reconnect(
std::unique_ptr<client_t>& client, std::weak_ptr<client_pool> watcher) {
using namespace std::chrono_literals;
for (unsigned int i = 0; i < pool_config_.connect_retry_count; ++i) {
ELOG_DEBUG << "try to reconnect client{" << client.get() << "},host:{"
std::shared_ptr<client_pool> self = watcher.lock();
uint32_t i = UINT32_MAX; // (at least connect once)
do {
ELOG_TRACE << "try to reconnect client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port()
<< "}, try count:" << i
<< "max retry limit:" << pool_config_.connect_retry_count;
<< "}, try count:" << i << "max retry limit:"
<< self->pool_config_.connect_retry_count;
auto pre_time_point = std::chrono::steady_clock::now();
bool ok = client_t::is_ok(co_await client->reconnect(host_name_));
bool ok = client_t::is_ok(co_await client->connect(self->host_name_));
auto post_time_point = std::chrono::steady_clock::now();
auto cost_time = post_time_point - pre_time_point;
ELOG_DEBUG << "reconnect client{" << client.get()
ELOG_TRACE << "reconnect client{" << client.get()
<< "} cost time: " << cost_time / std::chrono::milliseconds{1}
<< "ms";
if (ok) {
ELOG_DEBUG << "reconnect client{" << client.get() << "} success";
ELOG_TRACE << "reconnect client{" << client.get() << "} success";
co_return;
}
ELOG_DEBUG << "reconnect client{" << client.get()
ELOG_TRACE << "reconnect client{" << client.get()
<< "} failed. If client close:{" << client->has_closed()
<< "}";
auto wait_time = rand_time(
(pool_config_.reconnect_wait_time * (i + 1) - cost_time) / 1ms * 1ms);
(self->pool_config_.reconnect_wait_time - cost_time) / 1ms * 1ms);
self = nullptr;
if (wait_time.count() > 0)
co_await coro_io::sleep_for(wait_time, &client->get_executor());
}
self = watcher.lock();
++i;
} while (i < self->pool_config_.connect_retry_count);
ELOG_WARN << "reconnect client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port()
<< "} out of max limit, stop retry. connect failed";
Expand All @@ -150,30 +156,23 @@ class client_pool : public std::enable_shared_from_this<
async_simple::Promise<std::unique_ptr<client_t>> promise_;
};

async_simple::coro::Lazy<void> connect_client(
static async_simple::coro::Lazy<void> connect_client(
std::unique_ptr<client_t> client, std::weak_ptr<client_pool> watcher,
std::shared_ptr<promise_handler> handler) {
ELOG_DEBUG << "try to connect client{" << client.get()
<< "} to host:" << host_name_;
auto result = co_await client->connect(host_name_);
std::shared_ptr<client_pool> self = watcher.lock();
if (!client_t::is_ok(result)) {
ELOG_DEBUG << "connect client{" << client.get() << "} to failed. ";
if (self) {
co_await reconnect(client);
}
}
if (client) {
ELOG_DEBUG << "connect client{" << client.get() << "} successful!";
}
co_await reconnect(client, watcher);
auto has_get_connect = handler->flag_.exchange(true);
if (!has_get_connect) {
handler->promise_.setValue(std::move(client));
}
else {
auto conn_lim = std::min<unsigned>(10u, pool_config_.max_connection);
if (self && free_clients_.size() < conn_lim && client) {
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
if (client) {
auto self = watcher.lock();
auto conn_lim =
std::min<unsigned>(10u, self->pool_config_.max_connection);
if (self && self->free_clients_.size() < conn_lim) {
self->enqueue(self->free_clients_, std::move(client),
self->pool_config_.idle_timeout);
}
}
}
}
Expand Down Expand Up @@ -226,7 +225,7 @@ class client_pool : public std::enable_shared_from_this<
}
}
});
ELOG_DEBUG << "wait client by promise {" << &handler->promise_ << "}";
ELOG_TRACE << "wait client by promise {" << &handler->promise_ << "}";
client = co_await handler->promise_.getFuture();
if (client) {
executor->schedule([timer] {
Expand All @@ -236,7 +235,7 @@ class client_pool : public std::enable_shared_from_this<
}
}
else {
ELOG_DEBUG << "get free client{" << client.get() << "}. from queue";
ELOG_TRACE << "get free client{" << client.get() << "}. from queue";
}
co_return std::move(client);
}
Expand All @@ -248,7 +247,7 @@ class client_pool : public std::enable_shared_from_this<
if (clients.enqueue(std::move(client)) == 1) {
std::size_t expected = 0;
if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) {
ELOG_DEBUG << "start timeout client collecter of client_pool{"
ELOG_TRACE << "start timeout client collecter of client_pool{"
<< host_name_ << "}";
collect_idle_timeout_client(
this->weak_from_this(), clients,
Expand All @@ -272,7 +271,7 @@ class client_pool : public std::enable_shared_from_this<
if (!has_get_connect) {
handler->promise_.setValue(std::move(client));
promise_cnt_ -= cnt;
ELOG_DEBUG << "collect free client{" << client.get()
ELOG_TRACE << "collect free client{" << client.get()
<< "} and wake up promise{" << &handler->promise_ << "}";
return;
}
Expand All @@ -282,20 +281,20 @@ class client_pool : public std::enable_shared_from_this<

if (free_clients_.size() < pool_config_.max_connection) {
if (client) {
ELOG_DEBUG << "collect free client{" << client.get() << "} enqueue";
ELOG_TRACE << "collect free client{" << client.get() << "} enqueue";
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
}
}
else {
ELOG_DEBUG << "out of max connection limit <<"
ELOG_TRACE << "out of max connection limit <<"
<< pool_config_.max_connection << ", collect free client{"
<< client.get() << "} enqueue short connect queue";
enqueue(short_connect_clients_, std::move(client),
pool_config_.short_connect_idle_timeout);
}
}
else {
ELOG_DEBUG << "client{" << client.get()
ELOG_TRACE << "client{" << client.get()
<< "} is closed. we won't collect it";
}

Expand Down
2 changes: 1 addition & 1 deletion include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ inline async_simple::coro::Lazy<void> sleep_for(Duration d) {
template <typename R, typename Func, typename Executor>
struct post_helper {
void operator()(auto handler) {
asio::dispatch(e, [this, handler]() {
asio::post(e, [this, handler]() {
try {
if constexpr (std::is_same_v<R, async_simple::Try<void>>) {
func();
Expand Down
2 changes: 1 addition & 1 deletion include/ylt/coro_io/io_context_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class ExecutorWrapper : public async_simple::Executor {

context_t &context() { return executor_.context(); }

auto get_asio_executor() { return executor_; }
auto get_asio_executor() const { return executor_; }

operator ExecutorImpl() { return executor_; }

Expand Down
Loading

0 comments on commit a41f755

Please sign in to comment.