Skip to content

Commit

Permalink
[coro_io] add test for coro_io
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle committed Jul 13, 2023
1 parent 81f2f24 commit 21030e4
Show file tree
Hide file tree
Showing 6 changed files with 346 additions and 38 deletions.
10 changes: 5 additions & 5 deletions include/ylt/coro_io/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ class channel {
channel(const channel& o) = delete;
channel& operator=(const channel& o) = delete;

auto send_request(auto&& op, const typename client_t::config& config)
-> decltype(std::declval<client_pool_t>().send_request(op,
auto send_request(auto op, const typename client_t::config& config)
-> decltype(std::declval<client_pool_t>().send_request(std::move(op),
std::string_view{},
config)) {
std::shared_ptr<client_pool_t> client_pool;
Expand All @@ -91,10 +91,10 @@ class channel {
client_pool = client_pools_[0];
}
co_return co_await client_pool->send_request(
op, client_pool->get_host_name(), config);
std::move(op), client_pool->get_host_name(), config);
}
auto send_request(auto&& op) {
return send_request(op, config_.pool_config.client_config);
auto send_request(auto op) {
return send_request(std::move(op), config_.pool_config.client_config);
}

static channel create(const std::vector<std::string_view>& hosts,
Expand Down
45 changes: 26 additions & 19 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class client_pool : public std::enable_shared_from_this<
std::weak_ptr<client_pool> self_weak) {
std::shared_ptr<client_pool> self = self_weak.lock();
while (true) {
auto sleep_time = self->pool_config_.idle_timeout_;
auto sleep_time = self->pool_config_.idle_timeout;
auto clear_cnt = self->pool_config_.idle_queue_max_clear_count;
self->free_clients_.reselect();
self = nullptr;
co_await coro_io::sleep_for(sleep_time);
Expand All @@ -70,9 +71,13 @@ class client_pool : public std::enable_shared_from_this<
}
std::unique_ptr<client_t> client;
while (true) {
std::size_t is_all_cleared = self->free_clients_.clear_old(10000);
std::size_t is_all_cleared = self->free_clients_.clear_old(clear_cnt);
if (is_all_cleared != 0) [[unlikely]] {
co_await async_simple::coro::Yield{};
try {
co_await async_simple::coro::Yield{};
} catch (std::exception& e) {
std::cout << e.what() << std::endl;
}
}
else {
break;
Expand Down Expand Up @@ -143,12 +148,13 @@ class client_pool : public std::enable_shared_from_this<
}

void collect_free_client(std::unique_ptr<client_t> client) {
if (client && free_clients_.size() < pool_config_.max_connection_) {
if (client && free_clients_.size() < pool_config_.max_connection) {
if (!client->has_closed()) {
if (free_clients_.enqueue(std::move(client)) == 1) {
std::size_t expected = 0;
if (collecter_cnt_.compare_exchange_strong(expected, 1)) {
collect_idle_timeout_client(this->shared_from_this())
.via(coro_io::get_global_executor())
.start([](auto&&) {
});
}
Expand Down Expand Up @@ -180,10 +186,11 @@ class client_pool : public std::enable_shared_from_this<

public:
struct pool_config {
uint32_t max_connection_ = 100;
uint32_t connect_retry_count = 5;
uint32_t max_connection = 100;
uint32_t connect_retry_count = 3;
uint32_t idle_queue_max_clear_count = 1000;
std::chrono::milliseconds reconnect_wait_time{1000};
std::chrono::milliseconds idle_timeout_{3000};
std::chrono::milliseconds idle_timeout{10000};
typename client_t::config client_config;
};

Expand All @@ -204,7 +211,7 @@ class client_pool : public std::enable_shared_from_this<
: host_name_(host_name),
pool_config_(pool_config),
io_context_pool_(io_context_pool),
free_clients_(pool_config.max_connection_) {
free_clients_(pool_config.max_connection) {
if (pool_config_.connect_retry_count == 0) {
pool_config_.connect_retry_count = 1;
}
Expand All @@ -217,15 +224,15 @@ class client_pool : public std::enable_shared_from_this<
host_name_(host_name),
pool_config_(pool_config),
io_context_pool_(io_context_pool),
free_clients_(pool_config.max_connection_) {
free_clients_(pool_config.max_connection) {
if (pool_config_.connect_retry_count == 0) {
pool_config_.connect_retry_count = 1;
}
};

template <typename T>
async_simple::coro::Lazy<return_type<T>> send_request(
T&& op, const typename client_t::config& client_config) {
T op, const typename client_t::config& client_config) {
// return type: Lazy<expected<T::returnType,std::errc>>
auto client = co_await get_client(client_config);
if (!client) {
Expand All @@ -244,7 +251,7 @@ class client_pool : public std::enable_shared_from_this<
}

template <typename T>
decltype(auto) send_request(T&& op) {
decltype(auto) send_request(T op) {
return send_request(op, pool_config_.client_config);
}

Expand All @@ -263,7 +270,7 @@ class client_pool : public std::enable_shared_from_this<

template <typename T>
async_simple::coro::Lazy<return_type_with_host<T>> send_request(
T&& op, std::string_view endpoint,
T op, std::string_view endpoint,
const typename client_t::config& client_config) {
// return type: Lazy<expected<T::returnType,std::errc>>
auto client = co_await get_client(client_config);
Expand All @@ -285,7 +292,7 @@ class client_pool : public std::enable_shared_from_this<
}

template <typename T>
decltype(auto) send_request(T&& op, std::string_view sv) {
decltype(auto) send_request(T op, std::string_view sv) {
return send_request(op, sv, pool_config_.client_config);
}

Expand All @@ -308,18 +315,18 @@ class client_pools {
const typename client_pool_t::pool_config& pool_config = {},
io_context_pool_t& io_context_pool = coro_io::g_io_context_pool())
: io_context_pool_(io_context_pool), default_pool_config_(pool_config) {}
auto send_request(std::string_view host_name, auto&& op)
-> decltype(std::declval<client_pool_t>().send_request(op)) {
auto send_request(std::string_view host_name, auto op)
-> decltype(std::declval<client_pool_t>().send_request(std::move(op))) {
auto pool = get_client_pool(host_name, default_pool_config_);
auto ret = co_await pool->send_request(op);
auto ret = co_await pool->send_request(std::move(op));
co_return ret;
}
auto send_request(std::string_view host_name,
const typename client_pool_t::pool_config& pool_config,
auto&& op)
-> decltype(std::declval<client_pool_t>().send_request(op)) {
auto op)
-> decltype(std::declval<client_pool_t>().send_request(std::move(op))) {
auto pool = get_client_pool(host_name, pool_config);
auto ret = co_await pool.send_request(op);
auto ret = co_await pool.send_request(std::move(op));
co_return ret;
}
auto at(std::string_view host_name) {
Expand Down
29 changes: 15 additions & 14 deletions include/ylt/coro_rpc/impl/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,11 @@ class coro_rpc_client {
[[nodiscard]] bool init_config(const config &conf) {
config_ = conf;
#ifdef YLT_ENABLE_SSL
return init_ssl_impl();
#else
return true;
if (!config_.ssl_cert_path.empty())
return init_ssl_impl();
else
#endif
return true;
};

/*!
Expand Down Expand Up @@ -335,6 +336,17 @@ class coro_rpc_client {

uint32_t get_client_id() const { return config_.client_id; }

void close() {
if (has_closed_) {
return;
}

ELOGV(INFO, "client_id %d close", config_.client_id);
close_socket();

has_closed_ = true;
}

template <typename T, typename U>
friend class coro_io::client_pool;

Expand Down Expand Up @@ -731,17 +743,6 @@ class coro_rpc_client {
socket_.close(ignored_ec);
}

void close() {
if (has_closed_) {
return;
}

ELOGV(INFO, "client_id %d close", config_.client_id);
close_socket();

has_closed_ = true;
}

#ifdef UNIT_TEST_INJECT
public:
std::errc sync_connect(const std::string &host, const std::string &port) {
Expand Down
2 changes: 2 additions & 0 deletions src/coro_io/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/output/tests)
add_executable(coro_io_test
test_corofile.cpp
test_channel.cpp
test_client_pool.cpp
main.cpp
)
add_test(NAME coro_io_test COMMAND coro_io_test)
Expand Down
121 changes: 121 additions & 0 deletions src/coro_io/tests/test_channel.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#include <async_simple/coro/Collect.h>
#include <async_simple/coro/SyncAwait.h>
#include <doctest.h>

#include <asio/io_context.hpp>
#include <cassert>
#include <filesystem>
#include <fstream>
#include <memory>
#include <string_view>
#include <system_error>
#include <thread>
#include <ylt/coro_io/channel.hpp>
#include <ylt/coro_io/coro_file.hpp>
#include <ylt/coro_io/coro_io.hpp>
#include <ylt/coro_io/io_context_pool.hpp>

#include "async_simple/coro/Lazy.h"
#include "ylt/coro_rpc/impl/coro_rpc_client.hpp"
#include "ylt/coro_rpc/impl/default_config/coro_rpc_config.hpp"

TEST_CASE("test RR") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
server.async_start().start([](auto &&) {
});
auto is_started = server.wait_for_start(std::chrono::seconds{1});
CHECK(is_started);
auto hosts =
std::vector<std::string_view>{"127.0.0.1:8801", "localhost:8801"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts);
for (int i = 0; i < 100; ++i) {
auto res = co_await channel.send_request(
[&i, &hosts](
coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
CHECK(host == hosts[i % 2]);
co_return;
});
CHECK(res.has_value());
}
server.stop();
}());
}

TEST_CASE("test Random") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
server.async_start().start([](auto &&) {
});
auto is_started = server.wait_for_start(std::chrono::seconds{1});
CHECK(is_started);
auto hosts =
std::vector<std::string_view>{"127.0.0.1:8801", "localhost:8801"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(
hosts, {.lba = coro_io::load_blance_algorithm::random});
int host0_cnt = 0, hostRR_cnt = 0;
for (int i = 0; i < 100; ++i) {
auto res = co_await channel.send_request(
[&i, &hosts, &host0_cnt, &hostRR_cnt](
coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
if (host == hosts[i % 2])
++hostRR_cnt;
if (host == hosts[0])
++host0_cnt;
co_return;
});
CHECK(res.has_value());
}
CHECK(host0_cnt < 100);
CHECK(hostRR_cnt < 100);
server.stop();
}());
}

TEST_CASE("test single host") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
server.async_start().start([](auto &&) {
});
auto is_started = server.wait_for_start(std::chrono::seconds{1});
CHECK(is_started);
auto hosts = std::vector<std::string_view>{"127.0.0.1:8801"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts);
for (int i = 0; i < 100; ++i) {
auto res = co_await channel.send_request(
[&i, &hosts](
coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
CHECK(host == hosts[0]);
co_return;
});
CHECK(res.has_value());
}
server.stop();
}());
}

TEST_CASE("test send_request config") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8802);
server.async_start().start([](auto &&) {
});
auto is_started = server.wait_for_start(std::chrono::seconds{1});
CHECK(is_started);
auto hosts = std::vector<std::string_view>{"127.0.0.1:8802"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts);
for (int i = 0; i < 100; ++i) {
auto res = co_await channel.send_request(
[&i, &hosts](coro_rpc::coro_rpc_client &client, std::string_view host)
-> async_simple::coro::Lazy<void> {
CHECK(client.get_client_id() == 114514);
co_return;
},
coro_rpc::coro_rpc_client::config{.client_id = 114514});
CHECK(res.has_value());
}
server.stop();
}());
}
Loading

0 comments on commit 21030e4

Please sign in to comment.