Skip to content

Commit

Permalink
Merge branch 'main' into update_time_util
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Jul 13, 2023
2 parents 2ce8238 + 639e1d0 commit 6b5514a
Show file tree
Hide file tree
Showing 21 changed files with 521 additions and 280 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/ubuntu_clang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ jobs:

- name: Configure
run: |
CXX=clang++ CC=clang
cmake -B ${{github.workspace}}/build -G Ninja \
-DCMAKE_BUILD_TYPE=${{matrix.mode}} -DBUILD_WITH_LIBCXX=${{matrix.libcxx}} -DENABLE_SSL=${{matrix.ssl}} \
-DUSE_CCACHE=${{env.ccache}}
-DUSE_CCACHE=${{env.ccache}} -DCMAKE_C_COMPILER=clang -DCMAKE_CXX_COMPILER=clang++
- name: Build
run: cmake --build ${{github.workspace}}/build --config ${{matrix.mode}}

Expand Down
10 changes: 5 additions & 5 deletions include/ylt/coro_io/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ class channel {
channel(const channel& o) = delete;
channel& operator=(const channel& o) = delete;

auto send_request(auto&& op, const typename client_t::config& config)
-> decltype(std::declval<client_pool_t>().send_request(op,
auto send_request(auto op, typename client_t::config& config)
-> decltype(std::declval<client_pool_t>().send_request(std::move(op),
std::string_view{},
config)) {
std::shared_ptr<client_pool_t> client_pool;
Expand All @@ -91,10 +91,10 @@ class channel {
client_pool = client_pools_[0];
}
co_return co_await client_pool->send_request(
op, client_pool->get_host_name(), config);
std::move(op), client_pool->get_host_name(), config);
}
auto send_request(auto&& op) {
return send_request(op, config_.pool_config.client_config);
auto send_request(auto op) {
return send_request(std::move(op), config_.pool_config.client_config);
}

static channel create(const std::vector<std::string_view>& hosts,
Expand Down
71 changes: 34 additions & 37 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ class client_pool : public std::enable_shared_from_this<
static async_simple::coro::Lazy<void> collect_idle_timeout_client(
std::weak_ptr<client_pool> self_weak) {
std::shared_ptr<client_pool> self = self_weak.lock();
if (self == nullptr) {
co_return;
}
while (true) {
auto sleep_time = self->pool_config_.idle_timeout_;
auto sleep_time = self->pool_config_.idle_timeout;
auto clear_cnt = self->pool_config_.idle_queue_per_max_clear_count;
self->free_clients_.reselect();
self = nullptr;
co_await coro_io::sleep_for(sleep_time);
Expand All @@ -70,9 +74,13 @@ class client_pool : public std::enable_shared_from_this<
}
std::unique_ptr<client_t> client;
while (true) {
std::size_t is_all_cleared = self->free_clients_.clear_old(10000);
std::size_t is_all_cleared = self->free_clients_.clear_old(clear_cnt);
if (is_all_cleared != 0) [[unlikely]] {
co_await async_simple::coro::Yield{};
try {
co_await async_simple::coro::Yield{};
} catch (std::exception& e) {
std::cout << e.what() << std::endl;
}
}
else {
break;
Expand All @@ -94,10 +102,8 @@ class client_pool : public std::enable_shared_from_this<
bool ok = false;

for (int i = 0; !ok && i < pool_config_.connect_retry_count; ++i) {
co_await coro_io::sleep_for(pool_config_.reconnect_wait_time);
ok = (client_t::is_ok(co_await client->reconnect(host_name_)));
if (!ok) {
co_await coro_io::sleep_for(pool_config_.reconnect_wait_time);
}
}
co_return ok ? std::move(client) : nullptr;
}
Expand All @@ -110,17 +116,7 @@ class client_pool : public std::enable_shared_from_this<
if (!free_clients_.try_dequeue(client)) {
break;
}
if (client->has_closed()) {
[self = this->shared_from_this()](std::unique_ptr<client_t> client)
-> async_simple::coro::Lazy<void> {
self->collect_free_client(
co_await self->reconnect(std::move(client)));
co_return;
}(std::move(client))
.start([](auto&& v) {
});
}
else {
if (!client->has_closed()) {
break;
}
}
Expand All @@ -143,12 +139,13 @@ class client_pool : public std::enable_shared_from_this<
}

void collect_free_client(std::unique_ptr<client_t> client) {
if (client && free_clients_.size() < pool_config_.max_connection_) {
if (client && free_clients_.size() < pool_config_.max_connection) {
if (!client->has_closed()) {
if (free_clients_.enqueue(std::move(client)) == 1) {
std::size_t expected = 0;
if (collecter_cnt_.compare_exchange_strong(expected, 1)) {
collect_idle_timeout_client(this->shared_from_this())
.via(coro_io::get_global_executor())
.start([](auto&&) {
});
}
Expand Down Expand Up @@ -180,10 +177,11 @@ class client_pool : public std::enable_shared_from_this<

public:
struct pool_config {
uint32_t max_connection_ = 100;
uint32_t connect_retry_count = 5;
uint32_t max_connection = 100;
uint32_t connect_retry_count = 3;
uint32_t idle_queue_per_max_clear_count = 1000;
std::chrono::milliseconds reconnect_wait_time{1000};
std::chrono::milliseconds idle_timeout_{3000};
std::chrono::milliseconds idle_timeout{30000};
typename client_t::config client_config;
};

Expand All @@ -204,7 +202,7 @@ class client_pool : public std::enable_shared_from_this<
: host_name_(host_name),
pool_config_(pool_config),
io_context_pool_(io_context_pool),
free_clients_(pool_config.max_connection_) {
free_clients_(pool_config.max_connection) {
if (pool_config_.connect_retry_count == 0) {
pool_config_.connect_retry_count = 1;
}
Expand All @@ -217,15 +215,15 @@ class client_pool : public std::enable_shared_from_this<
host_name_(host_name),
pool_config_(pool_config),
io_context_pool_(io_context_pool),
free_clients_(pool_config.max_connection_) {
free_clients_(pool_config.max_connection) {
if (pool_config_.connect_retry_count == 0) {
pool_config_.connect_retry_count = 1;
}
};

template <typename T>
async_simple::coro::Lazy<return_type<T>> send_request(
T&& op, const typename client_t::config& client_config) {
T op, typename client_t::config& client_config) {
// return type: Lazy<expected<T::returnType,std::errc>>
auto client = co_await get_client(client_config);
if (!client) {
Expand All @@ -244,8 +242,8 @@ class client_pool : public std::enable_shared_from_this<
}

template <typename T>
decltype(auto) send_request(T&& op) {
return send_request(op, pool_config_.client_config);
decltype(auto) send_request(T op) {
return send_request(std::move(op), pool_config_.client_config);
}

std::size_t free_client_count() const noexcept {
Expand All @@ -263,8 +261,8 @@ class client_pool : public std::enable_shared_from_this<

template <typename T>
async_simple::coro::Lazy<return_type_with_host<T>> send_request(
T&& op, std::string_view endpoint,
const typename client_t::config& client_config) {
T op, std::string_view endpoint,
typename client_t::config& client_config) {
// return type: Lazy<expected<T::returnType,std::errc>>
auto client = co_await get_client(client_config);
if (!client) {
Expand All @@ -285,8 +283,8 @@ class client_pool : public std::enable_shared_from_this<
}

template <typename T>
decltype(auto) send_request(T&& op, std::string_view sv) {
return send_request(op, sv, pool_config_.client_config);
decltype(auto) send_request(T op, std::string_view sv) {
return send_request(std::move(op), sv, pool_config_.client_config);
}

coro_io::detail::client_queue<std::unique_ptr<client_t>> free_clients_;
Expand All @@ -308,18 +306,17 @@ class client_pools {
const typename client_pool_t::pool_config& pool_config = {},
io_context_pool_t& io_context_pool = coro_io::g_io_context_pool())
: io_context_pool_(io_context_pool), default_pool_config_(pool_config) {}
auto send_request(std::string_view host_name, auto&& op)
-> decltype(std::declval<client_pool_t>().send_request(op)) {
auto send_request(std::string_view host_name, auto op)
-> decltype(std::declval<client_pool_t>().send_request(std::move(op))) {
auto pool = get_client_pool(host_name, default_pool_config_);
auto ret = co_await pool->send_request(op);
auto ret = co_await pool->send_request(std::move(op));
co_return ret;
}
auto send_request(std::string_view host_name,
const typename client_pool_t::pool_config& pool_config,
auto&& op)
-> decltype(std::declval<client_pool_t>().send_request(op)) {
typename client_pool_t::pool_config& pool_config, auto op)
-> decltype(std::declval<client_pool_t>().send_request(std::move(op))) {
auto pool = get_client_pool(host_name, pool_config);
auto ret = co_await pool.send_request(op);
auto ret = co_await pool->send_request(std::move(op));
co_return ret;
}
auto at(std::string_view host_name) {
Expand Down
8 changes: 6 additions & 2 deletions include/ylt/coro_io/detail/client_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@ class client_queue {
}
std::size_t enqueue(client_t&& c) {
const int_fast16_t index = selected_index_;
auto cnt = ++size_[index];
if (queue_[index].enqueue(std::move(c))) {
return ++size_[index];
return cnt;
}
else {
--size_[index];
return 0;
}
return 0;
}
bool try_dequeue(client_t& c) {
const int_fast16_t index = selected_index_;
Expand Down
11 changes: 8 additions & 3 deletions include/ylt/coro_io/io_context_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,17 @@ class ExecutorWrapper : public async_simple::Executor {

private:
void schedule(Func func, Duration dur) override {
auto timer = std::make_shared<asio::steady_timer>(executor_, dur);
timer->async_wait([fn = std::move(func), timer](auto ec) {
auto timer = std::make_unique<asio::steady_timer>(executor_, dur);
auto tm = timer.get();
tm->async_wait([fn = std::move(func), timer = std::move(timer)](auto ec) {
fn();
});
}
};

template <typename ExecutorImpl = asio::io_context>
inline async_simple::coro::Lazy<typename ExecutorImpl::executor_type>
get_executor() {
get_current_executor() {
auto executor = co_await async_simple::CurrentExecutor{};
assert(executor != nullptr);
co_return static_cast<ExecutorImpl *>(executor->checkout())->get_executor();
Expand Down Expand Up @@ -136,6 +137,10 @@ class io_context_pool {
work_.clear();

if (ok) {
// clear all unfinished work
for (auto &e : io_contexts_) {
e->run();
}
return;
}

Expand Down
Loading

0 comments on commit 6b5514a

Please sign in to comment.