Skip to content

Commit

Permalink
enhance connect timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle committed Aug 8, 2023
1 parent 0acb5e3 commit 051d654
Showing 1 changed file with 60 additions and 6 deletions.
66 changes: 60 additions & 6 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ class client_pool : public std::enable_shared_from_this<
co_return ok ? std::move(client) : nullptr;
}

async_simple::coro::Lazy<std::unique_ptr<client_t>> connect_client(
std::unique_ptr<client_t> client) {
if (client_t::is_ok(co_await client->connect(host_name_))) {
co_return std::move(client);
}
else {
co_return co_await reconnect(std::move(client));
}
}

async_simple::coro::Lazy<std::unique_ptr<client_t>> get_client(
const typename client_t::config& client_config) {
std::unique_ptr<client_t> client;
Expand All @@ -128,12 +138,47 @@ class client_pool : public std::enable_shared_from_this<
if (!client->init_config(client_config)) {
co_return nullptr;
}
if (client_t::is_ok(co_await client->connect(host_name_))) {
co_return std::move(client);
}
else {
co_return co_await reconnect(std::move(client));
}
struct state_t {
std::atomic<bool> has_got_client = false, has_promise_enqueue = false;
};
auto state = std::make_shared<state_t>();
async_simple::Promise<std::unique_ptr<client_t>> promise;
connect_client(std::move(client))
.start([&promise, state,
self = this->shared_from_this()](auto&& result) {
bool expected = false;
if (state->has_got_client.compare_exchange_strong(expected, true)) {
promise.setValue(result.hasError() ? nullptr
: std::move(result.value()));
}
else {
while (state->has_promise_enqueue == false)
; // simple spinlock
self->collect_free_client(
result.hasError() ? nullptr : std::move(result.value()));
}
});
coro_io::sleep_for(std::chrono::milliseconds{100})
.start([&promise, state, self = this->shared_from_this()](auto&&) {
bool expected = false;
if (state->has_got_client.compare_exchange_strong(expected, true)) {
std::unique_ptr<client_t> cli;
if (self->short_connect_clients_.try_dequeue(cli) ||
self->free_clients_.try_dequeue(cli)) {
state->has_promise_enqueue = true;
promise.setValue(std::move(cli));
}
else {
self->promise_queue.enqueue(&promise);
state->has_promise_enqueue = true;
if (self->short_connect_clients_.try_dequeue(cli) ||
self->free_clients_.try_dequeue(cli)) {
self->collect_free_client(std::move(cli));
}
}
}
});
co_return std::move(co_await promise.getFuture());
}
else {
co_return std::move(client);
Expand Down Expand Up @@ -161,6 +206,13 @@ class client_pool : public std::enable_shared_from_this<
}

void collect_free_client(std::unique_ptr<client_t> client) {
if (client == nullptr || !client->has_closed()) {
async_simple::Promise<std::unique_ptr<client_t>>* promise = nullptr;
if (promise_queue.try_dequeue(promise)) {
promise->setValue(std::move(client));
return;
}
}
if (client && !client->has_closed()) {
if (free_clients_.size() < pool_config_.max_connection) {
enqueue(free_clients_, std::move(client), false);
Expand Down Expand Up @@ -309,6 +361,8 @@ class client_pool : public std::enable_shared_from_this<
coro_io::detail::client_queue<std::unique_ptr<client_t>>
short_connect_clients_;
client_pools_t* pools_manager_ = nullptr;
moodycamel::ConcurrentQueue<async_simple::Promise<std::unique_ptr<client_t>>*>
promise_queue;
async_simple::Promise<async_simple::Unit> idle_timeout_waiter;
std::string host_name_;
pool_config pool_config_;
Expand Down

0 comments on commit 051d654

Please sign in to comment.