Skip to content

Commit

Permalink
[coro_io] prevent promise no response in corner bug case. (#413)
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle authored Aug 17, 2023
1 parent 70aa812 commit a8ead76
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 10 deletions.
47 changes: 38 additions & 9 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,20 +233,48 @@ class client_pool : public std::enable_shared_from_this<
co_return std::move(cli);
}
else {
async_simple::Promise<std::unique_ptr<client_t>> promise;
promise_queue.enqueue(&promise);
auto promise = std::make_unique<
async_simple::Promise<std::unique_ptr<client_t>>>();
auto* promise_address = promise.get();
promise_queue.enqueue(promise_address);
spinlock = nullptr;
if (short_connect_clients_.try_dequeue(cli) ||
free_clients_.try_dequeue(cli)) {
collect_free_client(std::move(cli));
}
ELOG_DEBUG << "wait for free client waiter promise{" << &promise
<< "} response because slow client{" << client_ptr << "}";
auto cli = co_await promise.getFuture();
ELOG_DEBUG << "get free client{" << cli.get() << "} from promise{"
<< &promise << "}. skip wait client{" << client_ptr
<< "} connect";
co_return std::move(cli);
ELOG_DEBUG << "wait for free client waiter promise{"
<< promise_address << "} response because slow client{"
<< client_ptr << "}";

auto res = co_await collectAny(
[](auto promise)
-> async_simple::coro::Lazy<std::unique_ptr<client_t>> {
co_return co_await promise->getFuture();
}(std::move(promise)),
coro_io::sleep_for(this->pool_config_.max_connection_time));
if (res.index() == 0) {
auto& res0 = std::get<0>(res);
if (!res0.hasError()) {
auto& cli = res0.value();
ELOG_DEBUG << "get free client{" << cli.get() << "} from promise{"
<< promise_address << "}. skip wait client{"
<< client_ptr << "} connect";
co_return std::move(cli);
}
else {
ELOG_ERROR << "Unexcepted branch";
co_return nullptr;
}
}
else {
ELOG_ERROR << "Unexcepted branch. Out of max limitation of connect "
"time, connect "
"failed. skip wait client{"
<< client_ptr << "} connect. "
<< "skip wait promise {" << promise_address
<< "} response";
co_return nullptr;
}
}
}
else {
Expand Down Expand Up @@ -341,6 +369,7 @@ class client_pool : public std::enable_shared_from_this<
std::chrono::milliseconds reconnect_wait_time{1000};
std::chrono::milliseconds idle_timeout{30000};
std::chrono::milliseconds short_connect_idle_timeout{1000};
std::chrono::milliseconds max_connection_time{60000};
typename client_t::config client_config;
};

Expand Down
2 changes: 1 addition & 1 deletion src/coro_io/tests/test_client_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ TEST_CASE("test reconnect retry wait time exinclude reconnect cost time") {
CHECK(pool->free_client_count() == 100);
auto dur = std::chrono::steady_clock::now() - tp;
std::cout << dur.count() << std::endl;
CHECK((dur >= 500ms && dur <= 700ms));
CHECK((dur >= 500ms && dur <= 800ms));
server.stop();
co_return;
}());
Expand Down

0 comments on commit a8ead76

Please sign in to comment.