Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coro_rpc]rpc client support send_request without wait for response #672

Merged
merged 25 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 37 additions & 38 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ class client_pool : public std::enable_shared_from_this<
break;
}
while (true) {
ELOG_DEBUG << "start collect timeout client of pool{"
ELOG_TRACE << "start collect timeout client of pool{"
<< self->host_name_
<< "}, now client count: " << clients.size();
std::size_t is_all_cleared = clients.clear_old(clear_cnt);
ELOG_DEBUG << "finish collect timeout client of pool{"
ELOG_TRACE << "finish collect timeout client of pool{"
<< self->host_name_
<< "}, now client cnt: " << clients.size();
if (is_all_cleared != 0) [[unlikely]] {
Expand All @@ -109,36 +109,42 @@ class client_pool : public std::enable_shared_from_this<

static auto rand_time(std::chrono::milliseconds ms) {
static thread_local std::default_random_engine r;
std::uniform_real_distribution e(0.7f, 1.3f);
std::uniform_real_distribution e(1.0f, 1.2f);
return std::chrono::milliseconds{static_cast<long>(e(r) * ms.count())};
}

async_simple::coro::Lazy<void> reconnect(std::unique_ptr<client_t>& client) {
static async_simple::coro::Lazy<void> reconnect(
std::unique_ptr<client_t>& client, std::weak_ptr<client_pool> watcher) {
using namespace std::chrono_literals;
for (unsigned int i = 0; i < pool_config_.connect_retry_count; ++i) {
ELOG_DEBUG << "try to reconnect client{" << client.get() << "},host:{"
std::shared_ptr<client_pool> self = watcher.lock();
uint32_t i = UINT32_MAX; // (at least connect once)
do {
ELOG_TRACE << "try to reconnect client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port()
<< "}, try count:" << i
<< "max retry limit:" << pool_config_.connect_retry_count;
<< "}, try count:" << i << "max retry limit:"
<< self->pool_config_.connect_retry_count;
auto pre_time_point = std::chrono::steady_clock::now();
bool ok = client_t::is_ok(co_await client->reconnect(host_name_));
bool ok = client_t::is_ok(co_await client->connect(self->host_name_));
auto post_time_point = std::chrono::steady_clock::now();
auto cost_time = post_time_point - pre_time_point;
ELOG_DEBUG << "reconnect client{" << client.get()
ELOG_TRACE << "reconnect client{" << client.get()
<< "} cost time: " << cost_time / std::chrono::milliseconds{1}
<< "ms";
if (ok) {
ELOG_DEBUG << "reconnect client{" << client.get() << "} success";
ELOG_TRACE << "reconnect client{" << client.get() << "} success";
co_return;
}
ELOG_DEBUG << "reconnect client{" << client.get()
ELOG_TRACE << "reconnect client{" << client.get()
<< "} failed. If client close:{" << client->has_closed()
<< "}";
auto wait_time = rand_time(
(pool_config_.reconnect_wait_time * (i + 1) - cost_time) / 1ms * 1ms);
(self->pool_config_.reconnect_wait_time - cost_time) / 1ms * 1ms);
self = nullptr;
if (wait_time.count() > 0)
co_await coro_io::sleep_for(wait_time, &client->get_executor());
}
self = watcher.lock();
++i;
} while (i < self->pool_config_.connect_retry_count);
ELOG_WARN << "reconnect client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port()
<< "} out of max limit, stop retry. connect failed";
Expand All @@ -150,30 +156,23 @@ class client_pool : public std::enable_shared_from_this<
async_simple::Promise<std::unique_ptr<client_t>> promise_;
};

async_simple::coro::Lazy<void> connect_client(
static async_simple::coro::Lazy<void> connect_client(
std::unique_ptr<client_t> client, std::weak_ptr<client_pool> watcher,
std::shared_ptr<promise_handler> handler) {
ELOG_DEBUG << "try to connect client{" << client.get()
<< "} to host:" << host_name_;
auto result = co_await client->connect(host_name_);
std::shared_ptr<client_pool> self = watcher.lock();
if (!client_t::is_ok(result)) {
ELOG_DEBUG << "connect client{" << client.get() << "} to failed. ";
if (self) {
co_await reconnect(client);
}
}
if (client) {
ELOG_DEBUG << "connect client{" << client.get() << "} successful!";
}
co_await reconnect(client, watcher);
auto has_get_connect = handler->flag_.exchange(true);
if (!has_get_connect) {
handler->promise_.setValue(std::move(client));
}
else {
auto conn_lim = std::min<unsigned>(10u, pool_config_.max_connection);
if (self && free_clients_.size() < conn_lim && client) {
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
if (client) {
auto self = watcher.lock();
auto conn_lim =
std::min<unsigned>(10u, self->pool_config_.max_connection);
if (self && self->free_clients_.size() < conn_lim) {
self->enqueue(self->free_clients_, std::move(client),
self->pool_config_.idle_timeout);
}
}
}
}
Expand Down Expand Up @@ -226,7 +225,7 @@ class client_pool : public std::enable_shared_from_this<
}
}
});
ELOG_DEBUG << "wait client by promise {" << &handler->promise_ << "}";
ELOG_TRACE << "wait client by promise {" << &handler->promise_ << "}";
client = co_await handler->promise_.getFuture();
if (client) {
executor->schedule([timer] {
Expand All @@ -236,7 +235,7 @@ class client_pool : public std::enable_shared_from_this<
}
}
else {
ELOG_DEBUG << "get free client{" << client.get() << "}. from queue";
ELOG_TRACE << "get free client{" << client.get() << "}. from queue";
}
co_return std::move(client);
}
Expand All @@ -248,7 +247,7 @@ class client_pool : public std::enable_shared_from_this<
if (clients.enqueue(std::move(client)) == 1) {
std::size_t expected = 0;
if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) {
ELOG_DEBUG << "start timeout client collecter of client_pool{"
ELOG_TRACE << "start timeout client collecter of client_pool{"
<< host_name_ << "}";
collect_idle_timeout_client(
this->weak_from_this(), clients,
Expand All @@ -272,7 +271,7 @@ class client_pool : public std::enable_shared_from_this<
if (!has_get_connect) {
handler->promise_.setValue(std::move(client));
promise_cnt_ -= cnt;
ELOG_DEBUG << "collect free client{" << client.get()
ELOG_TRACE << "collect free client{" << client.get()
<< "} and wake up promise{" << &handler->promise_ << "}";
return;
}
Expand All @@ -282,20 +281,20 @@ class client_pool : public std::enable_shared_from_this<

if (free_clients_.size() < pool_config_.max_connection) {
if (client) {
ELOG_DEBUG << "collect free client{" << client.get() << "} enqueue";
ELOG_TRACE << "collect free client{" << client.get() << "} enqueue";
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
}
}
else {
ELOG_DEBUG << "out of max connection limit <<"
ELOG_TRACE << "out of max connection limit <<"
<< pool_config_.max_connection << ", collect free client{"
<< client.get() << "} enqueue short connect queue";
enqueue(short_connect_clients_, std::move(client),
pool_config_.short_connect_idle_timeout);
}
}
else {
ELOG_DEBUG << "client{" << client.get()
ELOG_TRACE << "client{" << client.get()
<< "} is closed. we won't collect it";
}

Expand Down
2 changes: 1 addition & 1 deletion include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ inline async_simple::coro::Lazy<void> sleep_for(Duration d) {
template <typename R, typename Func, typename Executor>
struct post_helper {
void operator()(auto handler) {
asio::dispatch(e, [this, handler]() {
asio::post(e, [this, handler]() {
try {
if constexpr (std::is_same_v<R, async_simple::Try<void>>) {
func();
Expand Down
2 changes: 1 addition & 1 deletion include/ylt/coro_io/io_context_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class ExecutorWrapper : public async_simple::Executor {

context_t &context() { return executor_.context(); }

auto get_asio_executor() { return executor_; }
auto get_asio_executor() const { return executor_; }

operator ExecutorImpl() { return executor_; }

Expand Down
Loading
Loading