From 21030e40c0452b4a1c91d069c200c5ced6de6c0f Mon Sep 17 00:00:00 2001 From: "Zezheng.Li" Date: Tue, 11 Jul 2023 17:11:17 +0800 Subject: [PATCH] [coro_io] add test for coro_io --- include/ylt/coro_io/channel.hpp | 10 +- include/ylt/coro_io/client_pool.hpp | 45 +++-- include/ylt/coro_rpc/impl/coro_rpc_client.hpp | 29 +-- src/coro_io/tests/CMakeLists.txt | 2 + src/coro_io/tests/test_channel.cpp | 121 ++++++++++++ src/coro_io/tests/test_client_pool.cpp | 177 ++++++++++++++++++ 6 files changed, 346 insertions(+), 38 deletions(-) create mode 100644 src/coro_io/tests/test_channel.cpp create mode 100644 src/coro_io/tests/test_client_pool.cpp diff --git a/include/ylt/coro_io/channel.hpp b/include/ylt/coro_io/channel.hpp index 793e33a0a..28a0dec85 100644 --- a/include/ylt/coro_io/channel.hpp +++ b/include/ylt/coro_io/channel.hpp @@ -75,8 +75,8 @@ class channel { channel(const channel& o) = delete; channel& operator=(const channel& o) = delete; - auto send_request(auto&& op, const typename client_t::config& config) - -> decltype(std::declval().send_request(op, + auto send_request(auto op, const typename client_t::config& config) + -> decltype(std::declval().send_request(std::move(op), std::string_view{}, config)) { std::shared_ptr client_pool; @@ -91,10 +91,10 @@ class channel { client_pool = client_pools_[0]; } co_return co_await client_pool->send_request( - op, client_pool->get_host_name(), config); + std::move(op), client_pool->get_host_name(), config); } - auto send_request(auto&& op) { - return send_request(op, config_.pool_config.client_config); + auto send_request(auto op) { + return send_request(std::move(op), config_.pool_config.client_config); } static channel create(const std::vector& hosts, diff --git a/include/ylt/coro_io/client_pool.hpp b/include/ylt/coro_io/client_pool.hpp index 2398b2872..7daf96165 100644 --- a/include/ylt/coro_io/client_pool.hpp +++ b/include/ylt/coro_io/client_pool.hpp @@ -61,7 +61,8 @@ class client_pool : public std::enable_shared_from_this< std::weak_ptr self_weak) { std::shared_ptr self = self_weak.lock(); while (true) { - auto sleep_time = self->pool_config_.idle_timeout_; + auto sleep_time = self->pool_config_.idle_timeout; + auto clear_cnt = self->pool_config_.idle_queue_max_clear_count; self->free_clients_.reselect(); self = nullptr; co_await coro_io::sleep_for(sleep_time); @@ -70,9 +71,13 @@ class client_pool : public std::enable_shared_from_this< } std::unique_ptr client; while (true) { - std::size_t is_all_cleared = self->free_clients_.clear_old(10000); + std::size_t is_all_cleared = self->free_clients_.clear_old(clear_cnt); if (is_all_cleared != 0) [[unlikely]] { - co_await async_simple::coro::Yield{}; + try { + co_await async_simple::coro::Yield{}; + } catch (std::exception& e) { + std::cout << e.what() << std::endl; + } } else { break; @@ -143,12 +148,13 @@ class client_pool : public std::enable_shared_from_this< } void collect_free_client(std::unique_ptr client) { - if (client && free_clients_.size() < pool_config_.max_connection_) { + if (client && free_clients_.size() < pool_config_.max_connection) { if (!client->has_closed()) { if (free_clients_.enqueue(std::move(client)) == 1) { std::size_t expected = 0; if (collecter_cnt_.compare_exchange_strong(expected, 1)) { collect_idle_timeout_client(this->shared_from_this()) + .via(coro_io::get_global_executor()) .start([](auto&&) { }); } @@ -180,10 +186,11 @@ class client_pool : public std::enable_shared_from_this< public: struct pool_config { - uint32_t max_connection_ = 100; - uint32_t connect_retry_count = 5; + uint32_t max_connection = 100; + uint32_t connect_retry_count = 3; + uint32_t idle_queue_max_clear_count = 1000; std::chrono::milliseconds reconnect_wait_time{1000}; - std::chrono::milliseconds idle_timeout_{3000}; + std::chrono::milliseconds idle_timeout{10000}; typename client_t::config client_config; }; @@ -204,7 +211,7 @@ class client_pool : public std::enable_shared_from_this< : host_name_(host_name), pool_config_(pool_config), io_context_pool_(io_context_pool), - free_clients_(pool_config.max_connection_) { + free_clients_(pool_config.max_connection) { if (pool_config_.connect_retry_count == 0) { pool_config_.connect_retry_count = 1; } @@ -217,7 +224,7 @@ class client_pool : public std::enable_shared_from_this< host_name_(host_name), pool_config_(pool_config), io_context_pool_(io_context_pool), - free_clients_(pool_config.max_connection_) { + free_clients_(pool_config.max_connection) { if (pool_config_.connect_retry_count == 0) { pool_config_.connect_retry_count = 1; } @@ -225,7 +232,7 @@ class client_pool : public std::enable_shared_from_this< template async_simple::coro::Lazy> send_request( - T&& op, const typename client_t::config& client_config) { + T op, const typename client_t::config& client_config) { // return type: Lazy> auto client = co_await get_client(client_config); if (!client) { @@ -244,7 +251,7 @@ class client_pool : public std::enable_shared_from_this< } template - decltype(auto) send_request(T&& op) { + decltype(auto) send_request(T op) { return send_request(op, pool_config_.client_config); } @@ -263,7 +270,7 @@ class client_pool : public std::enable_shared_from_this< template async_simple::coro::Lazy> send_request( - T&& op, std::string_view endpoint, + T op, std::string_view endpoint, const typename client_t::config& client_config) { // return type: Lazy> auto client = co_await get_client(client_config); @@ -285,7 +292,7 @@ class client_pool : public std::enable_shared_from_this< } template - decltype(auto) send_request(T&& op, std::string_view sv) { + decltype(auto) send_request(T op, std::string_view sv) { return send_request(op, sv, pool_config_.client_config); } @@ -308,18 +315,18 @@ class client_pools { const typename client_pool_t::pool_config& pool_config = {}, io_context_pool_t& io_context_pool = coro_io::g_io_context_pool()) : io_context_pool_(io_context_pool), default_pool_config_(pool_config) {} - auto send_request(std::string_view host_name, auto&& op) - -> decltype(std::declval().send_request(op)) { + auto send_request(std::string_view host_name, auto op) + -> decltype(std::declval().send_request(std::move(op))) { auto pool = get_client_pool(host_name, default_pool_config_); - auto ret = co_await pool->send_request(op); + auto ret = co_await pool->send_request(std::move(op)); co_return ret; } auto send_request(std::string_view host_name, const typename client_pool_t::pool_config& pool_config, - auto&& op) - -> decltype(std::declval().send_request(op)) { + auto op) + -> decltype(std::declval().send_request(std::move(op))) { auto pool = get_client_pool(host_name, pool_config); - auto ret = co_await pool.send_request(op); + auto ret = co_await pool.send_request(std::move(op)); co_return ret; } auto at(std::string_view host_name) { diff --git a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp index 51dcf7053..27fa271b0 100644 --- a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp +++ b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp @@ -144,10 +144,11 @@ class coro_rpc_client { [[nodiscard]] bool init_config(const config &conf) { config_ = conf; #ifdef YLT_ENABLE_SSL - return init_ssl_impl(); -#else - return true; + if (!config_.ssl_cert_path.empty()) + return init_ssl_impl(); + else #endif + return true; }; /*! @@ -335,6 +336,17 @@ class coro_rpc_client { uint32_t get_client_id() const { return config_.client_id; } + void close() { + if (has_closed_) { + return; + } + + ELOGV(INFO, "client_id %d close", config_.client_id); + close_socket(); + + has_closed_ = true; + } + template friend class coro_io::client_pool; @@ -731,17 +743,6 @@ class coro_rpc_client { socket_.close(ignored_ec); } - void close() { - if (has_closed_) { - return; - } - - ELOGV(INFO, "client_id %d close", config_.client_id); - close_socket(); - - has_closed_ = true; - } - #ifdef UNIT_TEST_INJECT public: std::errc sync_connect(const std::string &host, const std::string &port) { diff --git a/src/coro_io/tests/CMakeLists.txt b/src/coro_io/tests/CMakeLists.txt index 4d50197cc..714e8e69b 100644 --- a/src/coro_io/tests/CMakeLists.txt +++ b/src/coro_io/tests/CMakeLists.txt @@ -1,6 +1,8 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/output/tests) add_executable(coro_io_test test_corofile.cpp + test_channel.cpp + test_client_pool.cpp main.cpp ) add_test(NAME coro_io_test COMMAND coro_io_test) diff --git a/src/coro_io/tests/test_channel.cpp b/src/coro_io/tests/test_channel.cpp new file mode 100644 index 000000000..7d8fb2840 --- /dev/null +++ b/src/coro_io/tests/test_channel.cpp @@ -0,0 +1,121 @@ +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "async_simple/coro/Lazy.h" +#include "ylt/coro_rpc/impl/coro_rpc_client.hpp" +#include "ylt/coro_rpc/impl/default_config/coro_rpc_config.hpp" + +TEST_CASE("test RR") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + coro_rpc::coro_rpc_server server(1, 8801); + server.async_start().start([](auto &&) { + }); + auto is_started = server.wait_for_start(std::chrono::seconds{1}); + CHECK(is_started); + auto hosts = + std::vector{"127.0.0.1:8801", "localhost:8801"}; + auto channel = coro_io::channel::create(hosts); + for (int i = 0; i < 100; ++i) { + auto res = co_await channel.send_request( + [&i, &hosts]( + coro_rpc::coro_rpc_client &client, + std::string_view host) -> async_simple::coro::Lazy { + CHECK(host == hosts[i % 2]); + co_return; + }); + CHECK(res.has_value()); + } + server.stop(); + }()); +} + +TEST_CASE("test Random") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + coro_rpc::coro_rpc_server server(1, 8801); + server.async_start().start([](auto &&) { + }); + auto is_started = server.wait_for_start(std::chrono::seconds{1}); + CHECK(is_started); + auto hosts = + std::vector{"127.0.0.1:8801", "localhost:8801"}; + auto channel = coro_io::channel::create( + hosts, {.lba = coro_io::load_blance_algorithm::random}); + int host0_cnt = 0, hostRR_cnt = 0; + for (int i = 0; i < 100; ++i) { + auto res = co_await channel.send_request( + [&i, &hosts, &host0_cnt, &hostRR_cnt]( + coro_rpc::coro_rpc_client &client, + std::string_view host) -> async_simple::coro::Lazy { + if (host == hosts[i % 2]) + ++hostRR_cnt; + if (host == hosts[0]) + ++host0_cnt; + co_return; + }); + CHECK(res.has_value()); + } + CHECK(host0_cnt < 100); + CHECK(hostRR_cnt < 100); + server.stop(); + }()); +} + +TEST_CASE("test single host") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + coro_rpc::coro_rpc_server server(1, 8801); + server.async_start().start([](auto &&) { + }); + auto is_started = server.wait_for_start(std::chrono::seconds{1}); + CHECK(is_started); + auto hosts = std::vector{"127.0.0.1:8801"}; + auto channel = coro_io::channel::create(hosts); + for (int i = 0; i < 100; ++i) { + auto res = co_await channel.send_request( + [&i, &hosts]( + coro_rpc::coro_rpc_client &client, + std::string_view host) -> async_simple::coro::Lazy { + CHECK(host == hosts[0]); + co_return; + }); + CHECK(res.has_value()); + } + server.stop(); + }()); +} + +TEST_CASE("test send_request config") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + coro_rpc::coro_rpc_server server(1, 8802); + server.async_start().start([](auto &&) { + }); + auto is_started = server.wait_for_start(std::chrono::seconds{1}); + CHECK(is_started); + auto hosts = std::vector{"127.0.0.1:8802"}; + auto channel = coro_io::channel::create(hosts); + for (int i = 0; i < 100; ++i) { + auto res = co_await channel.send_request( + [&i, &hosts](coro_rpc::coro_rpc_client &client, std::string_view host) + -> async_simple::coro::Lazy { + CHECK(client.get_client_id() == 114514); + co_return; + }, + coro_rpc::coro_rpc_client::config{.client_id = 114514}); + CHECK(res.has_value()); + } + server.stop(); + }()); +} \ No newline at end of file diff --git a/src/coro_io/tests/test_client_pool.cpp b/src/coro_io/tests/test_client_pool.cpp new file mode 100644 index 000000000..23824679e --- /dev/null +++ b/src/coro_io/tests/test_client_pool.cpp @@ -0,0 +1,177 @@ +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "async_simple/Executor.h" +#include "async_simple/Promise.h" +#include "async_simple/Unit.h" +#include "async_simple/coro/ConditionVariable.h" +#include "async_simple/coro/Lazy.h" +#include "async_simple/coro/Semaphore.h" +#include "async_simple/coro/Sleep.h" +#include "async_simple/coro/SpinLock.h" +#include "ylt/coro_rpc/impl/coro_rpc_client.hpp" +#include "ylt/coro_rpc/impl/default_config/coro_rpc_config.hpp" +#include "ylt/coro_rpc/impl/expected.hpp" +using namespace std::chrono_literals; +using namespace async_simple::coro; +TEST_CASE("test client pool") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + coro_rpc::coro_rpc_server server(1, 8801); + server.async_start().start([](auto &&) { + }); + auto is_started = server.wait_for_start(1s); + REQUIRE(is_started); + auto pool = coro_io::client_pool::create( + "127.0.0.1:8801", {.max_connection = 100, .idle_timeout = 300ms}); + std::vector>> res; + auto event = [&res, &pool]() { + auto op = [](coro_rpc::coro_rpc_client &client) + -> async_simple::coro::Lazy { + co_await coro_io::sleep_for(100ms); + }; + res.emplace_back(pool->send_request(op)); + }; + for (int i = 0; i < 50; ++i) { + event(); + } + co_await collectAll(std::move(res)); + CHECK(pool->free_client_count() == 50); + res.clear(); + for (int i = 0; i < 110; ++i) { + event(); + } + co_await collectAll(std::move(res)); + CHECK(pool->free_client_count() == 100); + co_await coro_io::sleep_for(700ms); + CHECK(pool->free_client_count() == 0); + server.stop(); + }()); +} +TEST_CASE("test idle timeout yield") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + coro_rpc::coro_rpc_server server(1, 8801); + server.async_start().start([](auto &&) { + }); + auto is_started = server.wait_for_start(1s); + REQUIRE(is_started); + auto pool = coro_io::client_pool::create( + "127.0.0.1:8801", {.max_connection = 100, + .idle_queue_max_clear_count = 1, + .idle_timeout = 300ms}); + std::vector>> res; + auto event = [&res, &pool]() { + auto op = [](coro_rpc::coro_rpc_client &client) + -> async_simple::coro::Lazy { + co_await coro_io::sleep_for(100ms); + }; + res.emplace_back(pool->send_request(op)); + }; + for (int i = 0; i < 100; ++i) { + event(); + } + co_await collectAll(std::move(res)); + CHECK(pool->free_client_count() == 100); + res.clear(); + co_await coro_io::sleep_for(700ms); + CHECK(pool->free_client_count() == 0); + server.stop(); + }()); +} + +TEST_CASE("test reconnect") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + auto pool = coro_io::client_pool::create( + "127.0.0.1:8801", + {.connect_retry_count = 3, .reconnect_wait_time = 500ms}); + std::vector>> res; + auto event = [&res, &pool]() { + auto op = [](coro_rpc::coro_rpc_client &client) + -> async_simple::coro::Lazy { + co_await coro_io::sleep_for(100ms); + }; + res.emplace_back(pool->send_request(op)); + }; + for (int i = 0; i < 100; ++i) { + event(); + } + coro_rpc::coro_rpc_server server(1, 8801); + res.push_back([&server]() -> Lazy> { + co_await coro_io::sleep_for(700ms); + server.async_start().start([](auto &&) { + }); + co_return coro_rpc::expected{}; + }()); + co_await collectAll(std::move(res)); + CHECK(pool->free_client_count() == 100); + server.stop(); + }()); +} + +TEST_CASE("test collect_free_client") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + coro_rpc::coro_rpc_server server(1, 8801); + server.async_start().start([](auto &&) { + }); + auto is_started = server.wait_for_start(1s); + REQUIRE(is_started); + auto pool = coro_io::client_pool::create( + "127.0.0.1:8801", {.max_connection = 100, .idle_timeout = 300ms}); + std::vector>> res; + auto event = [&res, &pool]() { + auto op = [](coro_rpc::coro_rpc_client &client) + -> async_simple::coro::Lazy { + co_await coro_io::sleep_for(100ms); + client.close(); + }; + res.emplace_back(pool->send_request(op)); + }; + for (int i = 0; i < 50; ++i) { + event(); + } + co_await collectAll(std::move(res)); + CHECK(pool->free_client_count() == 0); + server.stop(); + }()); +} + +TEST_CASE("test client pools parallel r/w") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + auto pool = coro_io::client_pools{}; + for (int i = 0; i < 10000; ++i) { + auto cli = pool[std::to_string(i)]; + CHECK(cli->get_host_name() == std::to_string(i)); + } + auto rw = [&pool](int i) -> Lazy { + auto cli = pool[std::to_string(i)]; + CHECK(cli->get_host_name() == std::to_string(i)); + co_return; + }; + + std::vector> works; + for (int i = 0; i < 20000; ++i) { + works.emplace_back(rw(i).via(coro_io::get_global_executor())); + } + for (int i = 0; i < 10000; ++i) { + works.emplace_back(rw(i).via(coro_io::get_global_executor())); + } + co_await collectAll(std::move(works)); + }()); +} \ No newline at end of file