diff --git a/.github/workflows/ubuntu_clang.yml b/.github/workflows/ubuntu_clang.yml index 4ff4a45fc..b06bf0ad4 100644 --- a/.github/workflows/ubuntu_clang.yml +++ b/.github/workflows/ubuntu_clang.yml @@ -38,10 +38,9 @@ jobs: - name: Configure run: | - CXX=clang++ CC=clang cmake -B ${{github.workspace}}/build -G Ninja \ -DCMAKE_BUILD_TYPE=${{matrix.mode}} -DBUILD_WITH_LIBCXX=${{matrix.libcxx}} -DENABLE_SSL=${{matrix.ssl}} \ - -DUSE_CCACHE=${{env.ccache}} + -DUSE_CCACHE=${{env.ccache}} -DCMAKE_C_COMPILER=clang -DCMAKE_CXX_COMPILER=clang++ - name: Build run: cmake --build ${{github.workspace}}/build --config ${{matrix.mode}} diff --git a/include/ylt/coro_io/channel.hpp b/include/ylt/coro_io/channel.hpp index 793e33a0a..0769d3bc9 100644 --- a/include/ylt/coro_io/channel.hpp +++ b/include/ylt/coro_io/channel.hpp @@ -75,8 +75,8 @@ class channel { channel(const channel& o) = delete; channel& operator=(const channel& o) = delete; - auto send_request(auto&& op, const typename client_t::config& config) - -> decltype(std::declval().send_request(op, + auto send_request(auto op, typename client_t::config& config) + -> decltype(std::declval().send_request(std::move(op), std::string_view{}, config)) { std::shared_ptr client_pool; @@ -91,10 +91,10 @@ class channel { client_pool = client_pools_[0]; } co_return co_await client_pool->send_request( - op, client_pool->get_host_name(), config); + std::move(op), client_pool->get_host_name(), config); } - auto send_request(auto&& op) { - return send_request(op, config_.pool_config.client_config); + auto send_request(auto op) { + return send_request(std::move(op), config_.pool_config.client_config); } static channel create(const std::vector& hosts, diff --git a/include/ylt/coro_io/client_pool.hpp b/include/ylt/coro_io/client_pool.hpp index 2398b2872..878c139f5 100644 --- a/include/ylt/coro_io/client_pool.hpp +++ b/include/ylt/coro_io/client_pool.hpp @@ -60,8 +60,12 @@ class client_pool : public std::enable_shared_from_this< static async_simple::coro::Lazy collect_idle_timeout_client( std::weak_ptr self_weak) { std::shared_ptr self = self_weak.lock(); + if (self == nullptr) { + co_return; + } while (true) { - auto sleep_time = self->pool_config_.idle_timeout_; + auto sleep_time = self->pool_config_.idle_timeout; + auto clear_cnt = self->pool_config_.idle_queue_per_max_clear_count; self->free_clients_.reselect(); self = nullptr; co_await coro_io::sleep_for(sleep_time); @@ -70,9 +74,13 @@ class client_pool : public std::enable_shared_from_this< } std::unique_ptr client; while (true) { - std::size_t is_all_cleared = self->free_clients_.clear_old(10000); + std::size_t is_all_cleared = self->free_clients_.clear_old(clear_cnt); if (is_all_cleared != 0) [[unlikely]] { - co_await async_simple::coro::Yield{}; + try { + co_await async_simple::coro::Yield{}; + } catch (std::exception& e) { + std::cout << e.what() << std::endl; + } } else { break; @@ -94,10 +102,8 @@ class client_pool : public std::enable_shared_from_this< bool ok = false; for (int i = 0; !ok && i < pool_config_.connect_retry_count; ++i) { + co_await coro_io::sleep_for(pool_config_.reconnect_wait_time); ok = (client_t::is_ok(co_await client->reconnect(host_name_))); - if (!ok) { - co_await coro_io::sleep_for(pool_config_.reconnect_wait_time); - } } co_return ok ? std::move(client) : nullptr; } @@ -110,17 +116,7 @@ class client_pool : public std::enable_shared_from_this< if (!free_clients_.try_dequeue(client)) { break; } - if (client->has_closed()) { - [self = this->shared_from_this()](std::unique_ptr client) - -> async_simple::coro::Lazy { - self->collect_free_client( - co_await self->reconnect(std::move(client))); - co_return; - }(std::move(client)) - .start([](auto&& v) { - }); - } - else { + if (!client->has_closed()) { break; } } @@ -143,12 +139,13 @@ class client_pool : public std::enable_shared_from_this< } void collect_free_client(std::unique_ptr client) { - if (client && free_clients_.size() < pool_config_.max_connection_) { + if (client && free_clients_.size() < pool_config_.max_connection) { if (!client->has_closed()) { if (free_clients_.enqueue(std::move(client)) == 1) { std::size_t expected = 0; if (collecter_cnt_.compare_exchange_strong(expected, 1)) { collect_idle_timeout_client(this->shared_from_this()) + .via(coro_io::get_global_executor()) .start([](auto&&) { }); } @@ -180,10 +177,11 @@ class client_pool : public std::enable_shared_from_this< public: struct pool_config { - uint32_t max_connection_ = 100; - uint32_t connect_retry_count = 5; + uint32_t max_connection = 100; + uint32_t connect_retry_count = 3; + uint32_t idle_queue_per_max_clear_count = 1000; std::chrono::milliseconds reconnect_wait_time{1000}; - std::chrono::milliseconds idle_timeout_{3000}; + std::chrono::milliseconds idle_timeout{30000}; typename client_t::config client_config; }; @@ -204,7 +202,7 @@ class client_pool : public std::enable_shared_from_this< : host_name_(host_name), pool_config_(pool_config), io_context_pool_(io_context_pool), - free_clients_(pool_config.max_connection_) { + free_clients_(pool_config.max_connection) { if (pool_config_.connect_retry_count == 0) { pool_config_.connect_retry_count = 1; } @@ -217,7 +215,7 @@ class client_pool : public std::enable_shared_from_this< host_name_(host_name), pool_config_(pool_config), io_context_pool_(io_context_pool), - free_clients_(pool_config.max_connection_) { + free_clients_(pool_config.max_connection) { if (pool_config_.connect_retry_count == 0) { pool_config_.connect_retry_count = 1; } @@ -225,7 +223,7 @@ class client_pool : public std::enable_shared_from_this< template async_simple::coro::Lazy> send_request( - T&& op, const typename client_t::config& client_config) { + T op, typename client_t::config& client_config) { // return type: Lazy> auto client = co_await get_client(client_config); if (!client) { @@ -244,8 +242,8 @@ class client_pool : public std::enable_shared_from_this< } template - decltype(auto) send_request(T&& op) { - return send_request(op, pool_config_.client_config); + decltype(auto) send_request(T op) { + return send_request(std::move(op), pool_config_.client_config); } std::size_t free_client_count() const noexcept { @@ -263,8 +261,8 @@ class client_pool : public std::enable_shared_from_this< template async_simple::coro::Lazy> send_request( - T&& op, std::string_view endpoint, - const typename client_t::config& client_config) { + T op, std::string_view endpoint, + typename client_t::config& client_config) { // return type: Lazy> auto client = co_await get_client(client_config); if (!client) { @@ -285,8 +283,8 @@ class client_pool : public std::enable_shared_from_this< } template - decltype(auto) send_request(T&& op, std::string_view sv) { - return send_request(op, sv, pool_config_.client_config); + decltype(auto) send_request(T op, std::string_view sv) { + return send_request(std::move(op), sv, pool_config_.client_config); } coro_io::detail::client_queue> free_clients_; @@ -308,18 +306,17 @@ class client_pools { const typename client_pool_t::pool_config& pool_config = {}, io_context_pool_t& io_context_pool = coro_io::g_io_context_pool()) : io_context_pool_(io_context_pool), default_pool_config_(pool_config) {} - auto send_request(std::string_view host_name, auto&& op) - -> decltype(std::declval().send_request(op)) { + auto send_request(std::string_view host_name, auto op) + -> decltype(std::declval().send_request(std::move(op))) { auto pool = get_client_pool(host_name, default_pool_config_); - auto ret = co_await pool->send_request(op); + auto ret = co_await pool->send_request(std::move(op)); co_return ret; } auto send_request(std::string_view host_name, - const typename client_pool_t::pool_config& pool_config, - auto&& op) - -> decltype(std::declval().send_request(op)) { + typename client_pool_t::pool_config& pool_config, auto op) + -> decltype(std::declval().send_request(std::move(op))) { auto pool = get_client_pool(host_name, pool_config); - auto ret = co_await pool.send_request(op); + auto ret = co_await pool->send_request(std::move(op)); co_return ret; } auto at(std::string_view host_name) { diff --git a/include/ylt/coro_io/detail/client_queue.hpp b/include/ylt/coro_io/detail/client_queue.hpp index 04a1d435b..8646a5074 100644 --- a/include/ylt/coro_io/detail/client_queue.hpp +++ b/include/ylt/coro_io/detail/client_queue.hpp @@ -49,10 +49,14 @@ class client_queue { } std::size_t enqueue(client_t&& c) { const int_fast16_t index = selected_index_; + auto cnt = ++size_[index]; if (queue_[index].enqueue(std::move(c))) { - return ++size_[index]; + return cnt; + } + else { + --size_[index]; + return 0; } - return 0; } bool try_dequeue(client_t& c) { const int_fast16_t index = selected_index_; diff --git a/include/ylt/coro_io/io_context_pool.hpp b/include/ylt/coro_io/io_context_pool.hpp index 8431dc0ab..f01c17d96 100644 --- a/include/ylt/coro_io/io_context_pool.hpp +++ b/include/ylt/coro_io/io_context_pool.hpp @@ -72,8 +72,9 @@ class ExecutorWrapper : public async_simple::Executor { private: void schedule(Func func, Duration dur) override { - auto timer = std::make_shared(executor_, dur); - timer->async_wait([fn = std::move(func), timer](auto ec) { + auto timer = std::make_unique(executor_, dur); + auto tm = timer.get(); + tm->async_wait([fn = std::move(func), timer = std::move(timer)](auto ec) { fn(); }); } @@ -81,7 +82,7 @@ class ExecutorWrapper : public async_simple::Executor { template inline async_simple::coro::Lazy -get_executor() { +get_current_executor() { auto executor = co_await async_simple::CurrentExecutor{}; assert(executor != nullptr); co_return static_cast(executor->checkout())->get_executor(); @@ -136,6 +137,10 @@ class io_context_pool { work_.clear(); if (ok) { + // clear all unfinished work + for (auto &e : io_contexts_) { + e->run(); + } return; } diff --git a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp index 51dcf7053..cf0f45d14 100644 --- a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp +++ b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp @@ -36,6 +36,7 @@ #include #include +#include "asio/dispatch.hpp" #include "common_service.hpp" #include "context.hpp" #include "expected.hpp" @@ -123,7 +124,8 @@ class coro_rpc_client { */ coro_rpc_client(asio::io_context::executor_type executor, uint32_t client_id = 0) - : executor(executor), socket_(executor) { + : executor(executor), + socket_(std::make_shared(executor)) { config_.client_id = client_id; read_buf_.resize(default_read_buf_size_); } @@ -136,7 +138,8 @@ class coro_rpc_client { coro_io::ExecutorWrapper<> &executor = *coro_io::get_global_executor(), uint32_t client_id = 0) : executor(executor.get_asio_executor()), - socket_(executor.get_asio_executor()) { + socket_(std::make_shared( + executor.get_asio_executor())) { config_.client_id = client_id; read_buf_.resize(default_read_buf_size_); } @@ -144,10 +147,11 @@ class coro_rpc_client { [[nodiscard]] bool init_config(const config &conf) { config_ = conf; #ifdef YLT_ENABLE_SSL - return init_ssl_impl(); -#else - return true; + if (!config_.ssl_cert_path.empty()) + return init_ssl_impl(); + else #endif + return true; }; /*! @@ -306,7 +310,7 @@ class coro_rpc_client { } else { #endif - ret = co_await call_impl(socket_, std::move(args)...); + ret = co_await call_impl(*socket_, std::move(args)...); #ifdef YLT_ENABLE_SSL } #endif @@ -335,6 +339,18 @@ class coro_rpc_client { uint32_t get_client_id() const { return config_.client_id; } + void close() { + if (has_closed_) { + return; + } + + ELOGV(INFO, "client_id %d close", config_.client_id); + + close_socket(socket_); + + has_closed_ = true; + } + template friend class coro_io::client_pool; @@ -346,8 +362,9 @@ class coro_rpc_client { }; void reset() { - close_socket(); - socket_ = decltype(socket_)(executor.get_asio_executor()); + close_socket(socket_); + socket_ = + std::make_shared(executor.get_asio_executor()); is_timeout_ = false; has_closed_ = false; } @@ -380,7 +397,7 @@ class coro_rpc_client { .detach(); std::error_code ec = co_await coro_io::async_connect( - &executor, socket_, config_.host, config_.port); + &executor, *socket_, config_.host, config_.port); std::error_code err_code; timer.cancel(err_code); @@ -433,7 +450,7 @@ class coro_rpc_client { asio::ssl::host_name_verification(config_.ssl_domain)); ssl_stream_ = std::make_unique>( - socket_, ssl_ctx_); + *socket_, ssl_ctx_); ssl_init_ret_ = true; } catch (std::exception &e) { ELOGV(ERROR, "init ssl failed: %s", e.what()); @@ -457,7 +474,7 @@ class coro_rpc_client { } is_timeout_ = is_timeout; - close_socket(); + close_socket(socket_); promise.setValue(async_simple::Unit()); co_return true; } @@ -552,7 +569,7 @@ class coro_rpc_client { ret = co_await coro_io::async_write( socket, asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN)); ELOGV(INFO, "client_id %d shutdown", config_.client_id); - socket_.shutdown(asio::ip::tcp::socket::shutdown_send); + socket_->shutdown(asio::ip::tcp::socket::shutdown_send); r = rpc_result{ unexpect_t{}, coro_rpc_protocol::rpc_error{std::errc::io_error, ret.first.message()}}; @@ -725,21 +742,13 @@ class coro_rpc_client { offset, std::forward(args)...); } - void close_socket() { - asio::error_code ignored_ec; - socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ignored_ec); - socket_.close(ignored_ec); - } - - void close() { - if (has_closed_) { - return; - } - - ELOGV(INFO, "client_id %d close", config_.client_id); - close_socket(); - - has_closed_ = true; + void close_socket(std::shared_ptr socket) { + asio::dispatch( + executor.get_asio_executor(), [socket = std::move(socket)]() { + asio::error_code ignored_ec; + socket->shutdown(asio::ip::tcp::socket::shutdown_both, ignored_ec); + socket->close(ignored_ec); + }); } #ifdef UNIT_TEST_INJECT @@ -757,7 +766,7 @@ class coro_rpc_client { #endif private: coro_io::ExecutorWrapper<> executor; - asio::ip::tcp::socket socket_; + std::shared_ptr socket_; std::vector read_buf_; config config_; constexpr static std::size_t default_read_buf_size_ = 256; diff --git a/include/ylt/coro_rpc/impl/coro_rpc_server.hpp b/include/ylt/coro_rpc/impl/coro_rpc_server.hpp index 9c6831d6f..ccd6d0b29 100644 --- a/include/ylt/coro_rpc/impl/coro_rpc_server.hpp +++ b/include/ylt/coro_rpc/impl/coro_rpc_server.hpp @@ -32,10 +32,12 @@ #include #include +#include "async_simple/Promise.h" #include "common_service.hpp" #include "coro_connection.hpp" #include "ylt/coro_io/coro_io.hpp" #include "ylt/coro_io/io_context_pool.hpp" +#include "ylt/coro_rpc/impl/expected.hpp" namespace coro_rpc { /*! * ```cpp @@ -102,48 +104,18 @@ class coro_rpc_server_base { * @return error code if start failed, otherwise block until server stop. */ [[nodiscard]] std::errc start() noexcept { - std::errc ec{}; - { - std::unique_lock lock(start_mtx_); - if (flag_ != stat::init) { - if (flag_ == stat::started) { - ELOGV(INFO, "start again"); - } - else if (flag_ == stat::stop) { - ELOGV(INFO, "has stoped"); - } - return std::errc::io_error; - } - - ec = listen(); - if (ec == std::errc{}) { - thd_ = std::thread([this] { - pool_.run(); - }); - - flag_ = stat::started; - } - else { - flag_ = stat::stop; - } + auto ret = async_start(); + if (ret) { + ret.value().wait(); + return ret.value().value(); } - - cond_.notify_all(); - - if (ec == std::errc{}) { - async_simple::coro::syncAwait(accept()); + else { + return ret.error(); } - return ec; - } - - [[nodiscard]] bool wait_for_start(auto duration) { - std::unique_lock lock(start_mtx_); - return cond_.wait_for(lock, duration, [this] { - return flag_ == stat::started || flag_ == stat::stop; - }); } - [[nodiscard]] async_simple::coro::Lazy async_start() noexcept { + [[nodiscard]] coro_rpc::expected, std::errc> + async_start() noexcept { std::errc ec{}; { std::unique_lock lock(start_mtx_); @@ -154,9 +126,9 @@ class coro_rpc_server_base { else if (flag_ == stat::stop) { ELOGV(INFO, "has stoped"); } - co_return std::errc::io_error; + return coro_rpc::unexpected{ + std::errc::resource_unavailable_try_again}; } - ec = listen(); if (ec == std::errc{}) { if constexpr (requires(typename server_config::executor_pool_t & pool) { @@ -172,12 +144,22 @@ class coro_rpc_server_base { flag_ = stat::stop; } } - - cond_.notify_all(); if (ec == std::errc{}) { - co_await accept(); + async_simple::Promise promise; + auto future = promise.getFuture(); + accept().start([p = std::move(promise)](auto &&res) mutable { + if (res.hasError()) { + p.setValue(std::errc::io_error); + } + else { + p.setValue(res.value()); + } + }); + return std::move(future); + } + else { + return coro_rpc::unexpected{ec}; } - co_return ec; } /*! @@ -193,8 +175,8 @@ class coro_rpc_server_base { ELOGV(INFO, "begin to stop coro_rpc_server, conn size %d", conns_.size()); - close_acceptor(); if (flag_ == stat::started) { + close_acceptor(); { std::unique_lock lock(conns_mtx_); for (auto &conn : conns_) { @@ -350,9 +332,11 @@ class coro_rpc_server_base { } #endif if (error) { - ELOGV(ERROR, "accept failed, error: %s", error.message().data()); - if (error == asio::error::operation_aborted) { - co_return std::errc::io_error; + ELOGV(INFO, "accept failed, error: %s", error.message().data()); + if (error == asio::error::operation_aborted || + error == asio::error::bad_descriptor) { + acceptor_close_waiter_.set_value(); + co_return std::errc::operation_canceled; } continue; } @@ -392,17 +376,17 @@ class coro_rpc_server_base { acceptor_.cancel(ec); acceptor_.close(ec); }); + acceptor_close_waiter_.get_future().wait(); } typename server_config::executor_pool_t pool_; asio::ip::tcp::acceptor acceptor_; + std::promise acceptor_close_waiter_; std::thread thd_; stat flag_; std::mutex start_mtx_; - std::condition_variable cond_; - uint64_t conn_id_ = 0; std::unordered_map> conns_; std::mutex conns_mtx_; diff --git a/include/ylt/easylog.hpp b/include/ylt/easylog.hpp index 84e854540..5d1f819f8 100644 --- a/include/ylt/easylog.hpp +++ b/include/ylt/easylog.hpp @@ -75,6 +75,9 @@ class logger { private: logger() { static appender appender{}; + appender.start_thread(); + appender.enable_console(true); + async_ = true; appender_ = &appender; } diff --git a/include/ylt/easylog/appender.hpp b/include/ylt/easylog/appender.hpp index 90902ccbb..b3843f1a9 100644 --- a/include/ylt/easylog/appender.hpp +++ b/include/ylt/easylog/appender.hpp @@ -84,36 +84,42 @@ class appender { max_files_ = (std::min)(max_files, static_cast(1000)); open_log_file(); if (async) { - write_thd_ = std::thread([this] { - while (!stop_) { - if (max_files_ > 0 && file_size_ > max_file_size_ && - static_cast(-1) != file_size_) { - roll_log_files(); - } + start_thread(); + } + } - record_t record; - if (queue_.try_dequeue(record)) { - enable_console_ ? write_record(record) - : write_record(record); - } + void enable_console(bool b) { enable_console_ = b; } - if (queue_.size_approx() == 0) { - std::unique_lock lock(que_mtx_); - cnd_.wait(lock, [&]() { - return queue_.size_approx() > 0 || stop_; - }); - } + void start_thread() { + write_thd_ = std::thread([this] { + while (!stop_) { + if (max_files_ > 0 && file_size_ > max_file_size_ && + static_cast(-1) != file_size_) { + roll_log_files(); + } - if (stop_) { - if (queue_.size_approx() > 0) { - while (queue_.try_dequeue(record)) { - write_record(record); - } + record_t record; + if (queue_.try_dequeue(record)) { + enable_console_ ? write_record(record) + : write_record(record); + } + + if (queue_.size_approx() == 0) { + std::unique_lock lock(que_mtx_); + cnd_.wait(lock, [&]() { + return queue_.size_approx() > 0 || stop_; + }); + } + + if (stop_) { + if (queue_.size_approx() > 0) { + while (queue_.try_dequeue(record)) { + write_record(record); } } } - }); - } + } + }); } std::string_view get_tid_buf(unsigned int tid) { diff --git a/src/coro_http/examples/channel.cpp b/src/coro_http/examples/channel.cpp index 5723ea096..64f4e7ac2 100644 --- a/src/coro_http/examples/channel.cpp +++ b/src/coro_http/examples/channel.cpp @@ -68,7 +68,7 @@ int main() { "http://www.baidu.com"}; auto chan = coro_io::channel::create( hosts, coro_io::channel::channel_config{ - .pool_config{.max_connection_ = 1000}}); + .pool_config{.max_connection = 1000}}); for (int i = 0, lim = std::thread::hardware_concurrency() * 10; i < lim; ++i) test_async_channel(chan).start([](auto &&) { diff --git a/src/coro_io/tests/CMakeLists.txt b/src/coro_io/tests/CMakeLists.txt index 4d50197cc..714e8e69b 100644 --- a/src/coro_io/tests/CMakeLists.txt +++ b/src/coro_io/tests/CMakeLists.txt @@ -1,6 +1,8 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/output/tests) add_executable(coro_io_test test_corofile.cpp + test_channel.cpp + test_client_pool.cpp main.cpp ) add_test(NAME coro_io_test COMMAND coro_io_test) diff --git a/src/coro_io/tests/test_channel.cpp b/src/coro_io/tests/test_channel.cpp new file mode 100644 index 000000000..964256266 --- /dev/null +++ b/src/coro_io/tests/test_channel.cpp @@ -0,0 +1,114 @@ +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "async_simple/coro/Lazy.h" +#include "ylt/coro_rpc/impl/coro_rpc_client.hpp" +#include "ylt/coro_rpc/impl/default_config/coro_rpc_config.hpp" + +TEST_CASE("test RR") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + coro_rpc::coro_rpc_server server(1, 8801); + auto res = server.async_start(); + REQUIRE_MESSAGE(res, "server start failed"); + auto hosts = + std::vector{"127.0.0.1:8801", "localhost:8801"}; + auto channel = coro_io::channel::create(hosts); + for (int i = 0; i < 100; ++i) { + auto res = co_await channel.send_request( + [&i, &hosts]( + coro_rpc::coro_rpc_client &client, + std::string_view host) -> async_simple::coro::Lazy { + CHECK(host == hosts[i % 2]); + co_return; + }); + CHECK(res.has_value()); + } + server.stop(); + }()); +} + +TEST_CASE("test Random") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + coro_rpc::coro_rpc_server server(1, 8801); + auto res = server.async_start(); + REQUIRE_MESSAGE(res, "server start failed"); + auto hosts = + std::vector{"127.0.0.1:8801", "localhost:8801"}; + auto channel = coro_io::channel::create( + hosts, {.lba = coro_io::load_blance_algorithm::random}); + int host0_cnt = 0, hostRR_cnt = 0; + for (int i = 0; i < 100; ++i) { + auto res = co_await channel.send_request( + [&i, &hosts, &host0_cnt, &hostRR_cnt]( + coro_rpc::coro_rpc_client &client, + std::string_view host) -> async_simple::coro::Lazy { + if (host == hosts[i % 2]) + ++hostRR_cnt; + if (host == hosts[0]) + ++host0_cnt; + co_return; + }); + CHECK(res.has_value()); + } + CHECK(host0_cnt < 100); + CHECK(hostRR_cnt < 100); + server.stop(); + }()); +} + +TEST_CASE("test single host") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + coro_rpc::coro_rpc_server server(1, 8801); + auto res = server.async_start(); + REQUIRE_MESSAGE(res, "server start failed"); + auto hosts = std::vector{"127.0.0.1:8801"}; + auto channel = coro_io::channel::create(hosts); + for (int i = 0; i < 100; ++i) { + auto res = co_await channel.send_request( + [&i, &hosts]( + coro_rpc::coro_rpc_client &client, + std::string_view host) -> async_simple::coro::Lazy { + CHECK(host == hosts[0]); + co_return; + }); + CHECK(res.has_value()); + } + server.stop(); + }()); +} + +TEST_CASE("test send_request config") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + coro_rpc::coro_rpc_server server(1, 8802); + auto res = server.async_start(); + REQUIRE_MESSAGE(res, "server start failed"); + auto hosts = std::vector{"127.0.0.1:8802"}; + auto channel = coro_io::channel::create(hosts); + for (int i = 0; i < 100; ++i) { + auto config = coro_rpc::coro_rpc_client::config{.client_id = 114514}; + auto res = co_await channel.send_request( + [&i, &hosts](coro_rpc::coro_rpc_client &client, std::string_view host) + -> async_simple::coro::Lazy { + CHECK(client.get_client_id() == 114514); + co_return; + }, + config); + CHECK(res.has_value()); + } + server.stop(); + }()); +} \ No newline at end of file diff --git a/src/coro_io/tests/test_client_pool.cpp b/src/coro_io/tests/test_client_pool.cpp new file mode 100644 index 000000000..483d649b8 --- /dev/null +++ b/src/coro_io/tests/test_client_pool.cpp @@ -0,0 +1,190 @@ +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "async_simple/Executor.h" +#include "async_simple/Promise.h" +#include "async_simple/Unit.h" +#include "async_simple/coro/ConditionVariable.h" +#include "async_simple/coro/Lazy.h" +#include "async_simple/coro/Semaphore.h" +#include "async_simple/coro/Sleep.h" +#include "async_simple/coro/SpinLock.h" +#include "ylt/coro_rpc/impl/coro_rpc_client.hpp" +#include "ylt/coro_rpc/impl/default_config/coro_rpc_config.hpp" +#include "ylt/coro_rpc/impl/expected.hpp" +using namespace std::chrono_literals; +using namespace async_simple::coro; +auto event = + []( + int lim, coro_io::client_pool &pool, + ConditionVariable &cv, SpinLock &lock, + std::function user_op = + [](auto &client) { + }) -> async_simple::coro::Lazy { + std::vector> works; + int64_t cnt = 0; + for (int i = 0; i < lim; ++i) { + auto op = [&cnt, &lock, &cv, &lim, + &user_op](coro_rpc::coro_rpc_client &client) + -> async_simple::coro::Lazy { + user_op(client); + auto l = co_await lock.coScopedLock(); + if (++cnt < lim) { + co_await cv.wait(lock, [&cnt, &lim] { + return cnt >= lim; + }); + } + else { + l.unlock(); + cv.notifyAll(); + } + co_return; + }; + auto backer = [&cv, &lock, &cnt, &lim]( + auto &pool, auto op) -> async_simple::coro::Lazy { + async_simple::Promise p; + auto res = co_await pool.send_request(op); + if (!res.has_value()) { + { + co_await lock.coScopedLock(); + cnt = lim; + } + cv.notifyAll(); + co_return false; + } + co_return true; + }; + works.emplace_back(backer(pool, op).via(coro_io::get_global_executor())); + } + auto res = co_await collectAll(std::move(works)); + for (auto &e : res) { + if (!e.value()) { + co_return false; + } + } + co_return true; +}; +TEST_CASE("test client pool") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + coro_rpc::coro_rpc_server server(1, 8801); + auto is_started = server.async_start(); + REQUIRE(is_started); + auto pool = coro_io::client_pool::create( + "127.0.0.1:8801", {.max_connection = 100, .idle_timeout = 300ms}); + SpinLock lock; + ConditionVariable cv; + auto res = co_await event(20, *pool, cv, lock); + CHECK(res); + CHECK(pool->free_client_count() == 20); + res = co_await event(200, *pool, cv, lock); + CHECK(res); + auto sz = pool->free_client_count(); + CHECK((sz >= 100 && sz <= 105)); + co_await coro_io::sleep_for(700ms); + CHECK(pool->free_client_count() == 0); + server.stop(); + }()); +} +TEST_CASE("test idle timeout yield") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + coro_rpc::coro_rpc_server server(1, 8801); + auto is_started = server.async_start(); + REQUIRE(is_started); + auto pool = coro_io::client_pool::create( + "127.0.0.1:8801", {.max_connection = 100, + .idle_queue_per_max_clear_count = 1, + .idle_timeout = 300ms}); + SpinLock lock; + ConditionVariable cv; + auto res = co_await event(100, *pool, cv, lock); + CHECK(res); + CHECK(pool->free_client_count() == 100); + co_await coro_io::sleep_for(700ms); + CHECK(pool->free_client_count() == 0); + server.stop(); + }()); +} + +TEST_CASE("test reconnect") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + auto pool = coro_io::client_pool::create( + "127.0.0.1:8801", + {.connect_retry_count = 3, .reconnect_wait_time = 500ms}); + SpinLock lock; + ConditionVariable cv; + coro_rpc::coro_rpc_server server(2, 8801); + async_simple::Promise p; + coro_io::sleep_for(700ms).start([&server, &p](auto &&) { + auto server_is_started = server.async_start(); + REQUIRE(server_is_started); + }); + + auto res = co_await event(100, *pool, cv, lock); + CHECK(res); + CHECK(pool->free_client_count() == 100); + server.stop(); + co_return; + }()); +} + +TEST_CASE("test collect_free_client") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + coro_rpc::coro_rpc_server server(1, 8801); + auto is_started = server.async_start(); + REQUIRE(is_started); + auto pool = coro_io::client_pool::create( + "127.0.0.1:8801", {.max_connection = 100, .idle_timeout = 300ms}); + + SpinLock lock; + ConditionVariable cv; + auto res = co_await event(50, *pool, cv, lock, [](auto &client) { + client.close(); + }); + CHECK(res); + CHECK(pool->free_client_count() == 0); + server.stop(); + co_return; + }()); +} + +TEST_CASE("test client pools parallel r/w") { + async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { + auto pool = coro_io::client_pools{}; + for (int i = 0; i < 10000; ++i) { + auto cli = pool[std::to_string(i)]; + CHECK(cli->get_host_name() == std::to_string(i)); + } + auto rw = [&pool](int i) -> Lazy { + auto cli = pool[std::to_string(i)]; + CHECK(cli->get_host_name() == std::to_string(i)); + co_return; + }; + + std::vector> works; + for (int i = 0; i < 20000; ++i) { + works.emplace_back(rw(i).via(coro_io::get_global_executor())); + } + for (int i = 0; i < 10000; ++i) { + works.emplace_back(rw(i).via(coro_io::get_global_executor())); + } + co_await collectAll(std::move(works)); + }()); +} \ No newline at end of file diff --git a/src/coro_rpc/benchmark/data_gen.cpp b/src/coro_rpc/benchmark/data_gen.cpp index 283643338..9d19a60b3 100644 --- a/src/coro_rpc/benchmark/data_gen.cpp +++ b/src/coro_rpc/benchmark/data_gen.cpp @@ -28,15 +28,10 @@ using namespace std::chrono_literals; int main() { using namespace coro_rpc; coro_rpc::coro_rpc_server server(std::thread::hardware_concurrency(), 0); - std::thread thrd([&] { - start_server(server); - }); - bool started = server.wait_for_start(3s); + auto started = server.async_start(); if (!started) { ELOGV(ERROR, "server started failed"); - server.stop(); - thrd.join(); return -1; } @@ -109,7 +104,6 @@ int main() { syncAwait(client.call(42)); server.stop(); - thrd.join(); return 0; }; \ No newline at end of file diff --git a/src/coro_rpc/examples/base_examples/channel.cpp b/src/coro_rpc/examples/base_examples/channel.cpp index 4fdbe5022..e57b274cd 100644 --- a/src/coro_rpc/examples/base_examples/channel.cpp +++ b/src/coro_rpc/examples/base_examples/channel.cpp @@ -87,11 +87,11 @@ Lazy qps_watcher() { int main() { auto hosts = - std::vector{{"127.0.0.1:8801", "localhost:8801"}}; + std::vector{"127.0.0.1:8801", "localhost:8801"}; auto worker_cnt = std::thread::hardware_concurrency() * 20; auto chan = coro_io::channel::create( hosts, coro_io::channel::channel_config{ - .pool_config{.max_connection_ = worker_cnt}}); + .pool_config{.max_connection = worker_cnt}}); auto chan_ptr = std::make_shared(std::move(chan)); for (int i = 0; i < worker_cnt; ++i) { call_echo(chan_ptr, 10000).start([](auto &&) { diff --git a/src/coro_rpc/examples/base_examples/client_pool.cpp b/src/coro_rpc/examples/base_examples/client_pool.cpp index 04475481a..a086016d2 100644 --- a/src/coro_rpc/examples/base_examples/client_pool.cpp +++ b/src/coro_rpc/examples/base_examples/client_pool.cpp @@ -78,7 +78,7 @@ int main() { auto client_pool = coro_io::client_pool::create( "localhost:8801", coro_io::client_pool::pool_config{ - .max_connection_ = thread_cnt * 20, + .max_connection = thread_cnt * 20, .client_config = {.timeout_duration = std::chrono::seconds{50}}}); for (int i = 0, lim = thread_cnt * 20; i < lim; ++i) { diff --git a/src/coro_rpc/examples/base_examples/rpc_service.cpp b/src/coro_rpc/examples/base_examples/rpc_service.cpp index 3d1f3e0d5..54d70b4ce 100644 --- a/src/coro_rpc/examples/base_examples/rpc_service.cpp +++ b/src/coro_rpc/examples/base_examples/rpc_service.cpp @@ -57,7 +57,7 @@ void hello_with_delay(context conn, async_simple::coro::Lazy nested_echo(std::string_view sv) { ELOGV(INFO, "start nested echo"); - coro_rpc::coro_rpc_client client(co_await coro_io::get_executor()); + coro_rpc::coro_rpc_client client(co_await coro_io::get_current_executor()); [[maybe_unused]] auto ec = co_await client.connect("127.0.0.1", "8802"); assert(ec == std::errc{}); ELOGV(INFO, "connect another server"); diff --git a/src/coro_rpc/tests/ServerTester.hpp b/src/coro_rpc/tests/ServerTester.hpp index 4190f8257..9d9f05ea4 100644 --- a/src/coro_rpc/tests/ServerTester.hpp +++ b/src/coro_rpc/tests/ServerTester.hpp @@ -44,7 +44,6 @@ using namespace std::chrono_literals; struct TesterConfig { TesterConfig() = default; TesterConfig(TesterConfig &c) { - async_start = c.sync_client; enable_heartbeat = c.enable_heartbeat; use_ssl = c.use_ssl; sync_client = c.sync_client; @@ -52,7 +51,6 @@ struct TesterConfig { port = c.port; conn_timeout_duration = c.conn_timeout_duration; } - bool async_start; bool enable_heartbeat; bool use_ssl; bool sync_client; @@ -64,8 +62,7 @@ struct TesterConfig { friend std::ostream &operator<<(std::ostream &os, const TesterConfig &config) { os << std::boolalpha; - os << "async_start: " << config.async_start << ";" - << " enable_heartbeat: " << config.enable_heartbeat << ";" + os << " enable_heartbeat: " << config.enable_heartbeat << ";" << " use_ssl: " << config.use_ssl << ";" << " sync_client: " << config.sync_client << ";" << " use_outer_io_context: " << config.use_outer_io_context << ";" diff --git a/src/coro_rpc/tests/test_coro_rpc_client.cpp b/src/coro_rpc/tests/test_coro_rpc_client.cpp index cfa849592..19653f188 100644 --- a/src/coro_rpc/tests/test_coro_rpc_client.cpp +++ b/src/coro_rpc/tests/test_coro_rpc_client.cpp @@ -84,10 +84,9 @@ TEST_CASE("testing client") { server.init_ssl_context( ssl_configure{"../openssl_files", "server.crt", "server.key"}); #endif - server.async_start().start([](auto&&) { - }); + auto res = server.async_start(); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + CHECK_MESSAGE(res, "server start failed"); SUBCASE("call rpc, function not registered") { g_action = {}; @@ -171,9 +170,8 @@ TEST_CASE("testing client with inject server") { server.init_ssl_context( ssl_configure{"../openssl_files", "server.crt", "server.key"}); #endif - server.async_start().start([](auto&&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + auto res = server.async_start(); + CHECK_MESSAGE(res, "server start failed"); server.register_handler(); @@ -238,9 +236,8 @@ class SSLClientTester { ssl_configure config{base_path, server_crt_path, server_key_path, dh_path}; server.init_ssl_context(config); server.template register_handler(); - server.async_start().start([](auto&&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + auto res = server.async_start(); + CHECK_MESSAGE(res, "server start timeout"); std::promise promise; auto future = promise.get_future(); @@ -365,9 +362,9 @@ TEST_CASE("testing client with ssl server") { TEST_CASE("testing client with eof") { g_action = {}; coro_rpc_server server(2, 8801); - server.async_start().start([](auto&&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + + auto res = server.async_start(); + REQUIRE_MESSAGE(res, "server start failed"); coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); auto ec = client.sync_connect("127.0.0.1", "8801"); REQUIRE_MESSAGE(ec == std::errc{}, make_error_code(ec).message()); @@ -387,9 +384,8 @@ TEST_CASE("testing client with eof") { TEST_CASE("testing client with shutdown") { g_action = {}; coro_rpc_server server(2, 8801); - server.async_start().start([](auto&&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + auto res = server.async_start(); + CHECK_MESSAGE(res, "server start timeout"); coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); auto ec = client.sync_connect("127.0.0.1", "8801"); REQUIRE_MESSAGE(ec == std::errc{}, make_error_code(ec).message()); @@ -454,9 +450,8 @@ TEST_CASE("testing client sync connect, unit test inject only") { SUBCASE("client use ssl but server don't use ssl") { g_action = {}; coro_rpc_server server(2, 8801); - server.async_start().start([](auto&&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + auto res = server.async_start(); + CHECK_MESSAGE(res, "server start timeout"); coro_rpc_client client2(*coro_io::get_global_executor(), g_client_id++); bool ok = client2.init_ssl("../openssl_files", "server.crt"); CHECK(ok == true); @@ -489,9 +484,8 @@ TEST_CASE("testing client call timeout") { server.register_handler(); server.register_handler(); - server.async_start().start([](auto&&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + auto res = server.async_start(); + CHECK_MESSAGE(res, "server start timeout"); coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); auto ec_lazy = client.connect("127.0.0.1", "8801"); auto ec = syncAwait(ec_lazy); diff --git a/src/coro_rpc/tests/test_coro_rpc_server.cpp b/src/coro_rpc/tests/test_coro_rpc_server.cpp index 4495ae952..0abb3fbb3 100644 --- a/src/coro_rpc/tests/test_coro_rpc_server.cpp +++ b/src/coro_rpc/tests/test_coro_rpc_server.cpp @@ -39,41 +39,10 @@ struct CoroServerTester : ServerTester { ssl_configure{"../openssl_files", "server.crt", "server.key"}); } #endif - if (async_start) { - // https://timsong-cpp.github.io/cppwp/n4861/temp.names#5.example-1 - // https://developercommunity.visualstudio.com/t/c2059-syntax-error-template-for-valid-template-mem/1632142 - /* - template struct A { - void f(int); - template void f(U); - }; - - template void f(T t) { - A a; - a.template f<>(t); // OK: calls template - a.template f(t); // error: not a template-id - } - */ - server.async_start().template start<>([](auto &&) { - }); - } - else { - thd = std::thread([&] { - auto ec = server.start(); - REQUIRE(ec == std::errc{}); - }); - } - - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); - } - ~CoroServerTester() { - if (async_start) { - } - else { - server.stop(); - thd.join(); - } + auto res = server.async_start(); + CHECK_MESSAGE(res, "server start timeout"); } + ~CoroServerTester() { server.stop(); } async_simple::coro::Lazy get_value(int val) { co_return val; } @@ -147,13 +116,8 @@ struct CoroServerTester : ServerTester { void test_server_start_again() { ELOGV(INFO, "run %s", __func__); - std::errc ec; - if (async_start) { - ec = syncAwait(server.async_start()); - } - else { - ec = server.start(); - } + + auto ec = server.start(); REQUIRE_MESSAGE(ec == std::errc::io_error, make_error_code(ec).message()); } @@ -161,15 +125,10 @@ struct CoroServerTester : ServerTester { ELOGV(INFO, "run %s", __func__); { auto new_server = coro_rpc_server(2, std::stoi(this->port_)); - std::errc ec; - if (async_start) { - ec = syncAwait(new_server.async_start()); - } - else { - ec = new_server.start(); - } - REQUIRE_MESSAGE(ec == std::errc::address_in_use, - make_error_code(ec).message()); + auto ec = new_server.async_start(); + REQUIRE(!ec); + REQUIRE_MESSAGE(ec.error() == std::errc::address_in_use, + make_error_code(ec.error()).message()); } ELOGV(INFO, "OH NO"); } @@ -238,35 +197,31 @@ TEST_CASE("testing coro rpc server") { unsigned short server_port = 8810; auto conn_timeout_duration = 500ms; std::vector switch_list{true, false}; - for (auto async_start : switch_list) { - for (auto enable_heartbeat : switch_list) { - for (auto use_ssl : switch_list) { - TesterConfig config; - config.async_start = async_start; - config.enable_heartbeat = enable_heartbeat; - config.use_ssl = use_ssl; - config.sync_client = false; - config.use_outer_io_context = false; - config.port = server_port; - if (enable_heartbeat) { - config.conn_timeout_duration = conn_timeout_duration; - } - std::stringstream ss; - ss << config; - ELOGV(INFO, "config: %s", ss.str().data()); - CoroServerTester(config).run(); + for (auto enable_heartbeat : switch_list) { + for (auto use_ssl : switch_list) { + TesterConfig config; + config.enable_heartbeat = enable_heartbeat; + config.use_ssl = use_ssl; + config.sync_client = false; + config.use_outer_io_context = false; + config.port = server_port; + if (enable_heartbeat) { + config.conn_timeout_duration = conn_timeout_duration; } - // } + std::stringstream ss; + ss << config; + ELOGV(INFO, "config: %s", ss.str().data()); + CoroServerTester(config).run(); } + // } } } TEST_CASE("testing coro rpc server stop") { ELOGV(INFO, "run testing coro rpc server stop"); coro_rpc_server server(2, 8810); - server.async_start().start([](auto &&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + auto res = server.async_start(); + REQUIRE_MESSAGE(res, "server start failed"); SUBCASE("stop twice") { server.stop(); server.stop(); @@ -288,9 +243,8 @@ TEST_CASE("test server accept error") { g_action = inject_action::force_inject_server_accept_error; coro_rpc_server server(2, 8810); server.register_handler(); - server.async_start().start([](auto &&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + auto res = server.async_start(); + CHECK_MESSAGE(res, "server start timeout"); coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); ELOGV(INFO, "run test server accept error, client_id %d", client.get_client_id()); @@ -318,9 +272,8 @@ TEST_CASE("test server write queue") { g_action = {}; coro_rpc_server server(2, 8810); server.register_handler(); - server.async_start().start([](auto &&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + auto res = server.async_start(); + CHECK_MESSAGE(res, "server start timeout"); std::string buffer; buffer.reserve(coro_rpc_protocol::REQ_HEAD_LEN + struct_pack::get_needed_size(std::monostate{})); @@ -383,9 +336,8 @@ TEST_CASE("testing coro rpc write error") { g_action = inject_action::force_inject_connection_close_socket; coro_rpc_server server(2, 8810); server.register_handler(); - server.async_start().start([](auto &&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + auto res = server.async_start(); + CHECK_MESSAGE(res, "server start failed"); coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); ELOGV(INFO, "run testing coro rpc write error, client_id %d", client.get_client_id()); diff --git a/src/coro_rpc/tests/test_variadic.cpp b/src/coro_rpc/tests/test_variadic.cpp index 0dc423268..1f6feba63 100644 --- a/src/coro_rpc/tests/test_variadic.cpp +++ b/src/coro_rpc/tests/test_variadic.cpp @@ -29,16 +29,9 @@ TEST_CASE("test varadic param") { auto server = std::make_unique( std::thread::hardware_concurrency(), 8808); - std::thread thrd([&] { - server->register_handler(); - try { - auto ec = server->start(); - REQUIRE(ec == std::errc{}); - } catch (const std::exception& e) { - std::cerr << "test varadic param Exception: " << e.what() << "\n"; - } - }); - REQUIRE_MESSAGE(server->wait_for_start(3s), "server start timeout"); + server->register_handler(); + auto res = server->async_start(); + REQUIRE_MESSAGE(res, "server start failed"); coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); syncAwait(client.connect("localhost", std::to_string(server->port()))); @@ -51,8 +44,6 @@ TEST_CASE("test varadic param") { })); ELOGV(INFO, "begin to stop server"); server->stop(); - if (thrd.joinable()) - thrd.join(); ELOGV(INFO, "finished stop server"); if (ret) { ELOGV(INFO, "ret value %s", ret.value().data());