Skip to content

Commit 01b3425

Browse files
authored
[coro_io] client pool support limit max live time of client (#1018)
* [coro_io] client pool support limit max live time of client * fix
1 parent a4f27df commit 01b3425

File tree

6 files changed

+43
-8
lines changed

6 files changed

+43
-8
lines changed

include/ylt/coro_io/client_pool.hpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,15 @@ class client_pool : public std::enable_shared_from_this<
307307
}
308308

309309
void collect_free_client(std::unique_ptr<client_t> client) {
310+
if (pool_config_.max_connection_life_time < std::chrono::seconds::max()) {
311+
auto tp = client->get_create_time_point();
312+
if (std::chrono::steady_clock::now() - tp >
313+
pool_config_.max_connection_life_time) [[unlikely]] {
314+
ELOG_INFO << "client{" << client.get()
315+
<< "} live too long, we won't collect it";
316+
return;
317+
}
318+
}
310319
if (!client->has_closed()) {
311320
if (free_clients_.size() < pool_config_.max_connection) {
312321
ELOG_TRACE << "collect free client{" << client.get() << "} enqueue";
@@ -361,6 +370,7 @@ class client_pool : public std::enable_shared_from_this<
361370
30000}; /* zero means wont detect */
362371
typename client_t::config client_config;
363372
std::chrono::seconds dns_cache_update_duration{5 * 60}; // 5mins
373+
std::chrono::seconds max_connection_life_time = std::chrono::seconds::max();
364374
};
365375

366376
private:

include/ylt/coro_io/ibverbs/ib_buffer.hpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,12 +164,12 @@ class ib_buffer_pool_t : public std::enable_shared_from_this<ib_buffer_pool_t> {
164164
std::size_t clear_cnt = self->free_buffers_.clear_old(1000);
165165
self->modify_memory_usage(-1 * (ssize_t)clear_cnt *
166166
(ssize_t)self->buffer_size());
167-
ELOG_WARN << "finish ib_buffer timeout free of pool{" << self.get()
168-
<< "}, now ib_buffer cnt: " << self->free_buffers_.size()
169-
<< " mem usage:"
170-
<< (int64_t)(std::round(self->memory_usage() /
171-
(1.0 * 1024 * 1024)))
172-
<< " MB";
167+
ELOG_TRACE << "finish ib_buffer timeout free of pool{" << self.get()
168+
<< "}, now ib_buffer cnt: " << self->free_buffers_.size()
169+
<< " mem usage:"
170+
<< (int64_t)(std::round(self->memory_usage() /
171+
(1.0 * 1024 * 1024)))
172+
<< " MB";
173173
if (clear_cnt != 0) {
174174
try {
175175
co_await async_simple::coro::Yield{};

include/ylt/coro_io/ibverbs/ib_socket.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ struct ib_socket_shared_state_t
293293
}
294294
ec = make_error_code(wc.status);
295295
if (wc.status != IBV_WC_SUCCESS) {
296-
ELOG_WARN << "rdma failed with error code: " << wc.status;
296+
ELOG_WARN << "rdma failed with error: " << ec.message();
297297
}
298298
else {
299299
ELOG_TRACE << "rdma op success, id:" << (callback_t*)wc.wr_id

include/ylt/coro_rpc/impl/coro_rpc_client.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@ class coro_rpc_client {
288288
}
289289
#endif
290290
[[nodiscard]] bool init_config(const config &conf) {
291+
create_tp_ = std::chrono::steady_clock::now();
291292
config_ = conf;
292293
control_->socket_wrapper_.set_local_ip(config_.local_ip);
293294
return std::visit(
@@ -297,6 +298,7 @@ class coro_rpc_client {
297298
conf.socket_config);
298299
};
299300

301+
auto get_create_time_point() const noexcept { return create_tp_; }
300302
/*!
301303
* Check the client closed or not
302304
*
@@ -1291,5 +1293,6 @@ class coro_rpc_client {
12911293
asio::ssl::context ssl_ctx_{asio::ssl::context::sslv23};
12921294
bool ssl_init_ret_ = true;
12931295
#endif
1296+
std::chrono::time_point<std::chrono::steady_clock> create_tp_;
12941297
};
12951298
} // namespace coro_rpc

include/ylt/standalone/cinatra/coro_http_client.hpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <atomic>
33
#include <cassert>
44
#include <charconv>
5+
#include <chrono>
56
#include <cstddef>
67
#include <filesystem>
78
#include <fstream>
@@ -142,7 +143,8 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
142143
timer_(&executor_wrapper_),
143144
socket_(std::make_shared<socket_t>(executor)),
144145
head_buf_(socket_->head_buf_),
145-
chunked_buf_(socket_->chunked_buf_) {}
146+
chunked_buf_(socket_->chunked_buf_),
147+
create_tp_(std::chrono::steady_clock::now()) {}
146148

147149
coro_http_client(
148150
coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor())
@@ -182,6 +184,10 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
182184

183185
~coro_http_client() { close(); }
184186

187+
auto get_create_time_point() const noexcept {
188+
return std::chrono::steady_clock::now();
189+
}
190+
185191
void close() {
186192
if (socket_ == nullptr || socket_->has_closed_)
187193
return;
@@ -2463,6 +2469,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
24632469
std::chrono::seconds(30);
24642470
std::chrono::steady_clock::duration req_timeout_duration_ =
24652471
std::chrono::seconds(60);
2472+
std::chrono::steady_clock::time_point create_tp_;
24662473
bool enable_tcp_no_delay_ = true;
24672474
std::string resp_chunk_str_;
24682475
std::span<char> out_buf_;

src/coro_io/tests/test_client_pool.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,4 +344,19 @@ TEST_CASE("test client pools dns don't refresh") {
344344
CHECK(eps.get() == eps_init.get());
345345
CHECK(eps->empty());
346346
}());
347+
}
348+
349+
TEST_CASE("test client pools client pool") {
350+
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
351+
auto pool = coro_io::client_pool<coro_http::coro_http_client>::create(
352+
"http://www.baidu.com",
353+
coro_io::client_pool<coro_http::coro_http_client>::pool_config{
354+
.max_connection_life_time = 0s});
355+
co_await pool->send_request(
356+
[](coro_http::coro_http_client &cli) -> Lazy<void> {
357+
cli.close();
358+
co_return;
359+
});
360+
CHECK(pool->free_client_count() == 0);
361+
}());
347362
}

0 commit comments

Comments
 (0)