diff --git a/.github/workflows/ubuntu_clang.yml b/.github/workflows/ubuntu_clang.yml index 4ff4a45fc3..b06bf0ad4d 100644 --- a/.github/workflows/ubuntu_clang.yml +++ b/.github/workflows/ubuntu_clang.yml @@ -38,10 +38,9 @@ jobs: - name: Configure run: | - CXX=clang++ CC=clang cmake -B ${{github.workspace}}/build -G Ninja \ -DCMAKE_BUILD_TYPE=${{matrix.mode}} -DBUILD_WITH_LIBCXX=${{matrix.libcxx}} -DENABLE_SSL=${{matrix.ssl}} \ - -DUSE_CCACHE=${{env.ccache}} + -DUSE_CCACHE=${{env.ccache}} -DCMAKE_C_COMPILER=clang -DCMAKE_CXX_COMPILER=clang++ - name: Build run: cmake --build ${{github.workspace}}/build --config ${{matrix.mode}} diff --git a/include/ylt/coro_io/channel.hpp b/include/ylt/coro_io/channel.hpp index 28a0dec853..0769d3bc9b 100644 --- a/include/ylt/coro_io/channel.hpp +++ b/include/ylt/coro_io/channel.hpp @@ -75,7 +75,7 @@ class channel { channel(const channel& o) = delete; channel& operator=(const channel& o) = delete; - auto send_request(auto op, const typename client_t::config& config) + auto send_request(auto op, typename client_t::config& config) -> decltype(std::declval().send_request(std::move(op), std::string_view{}, config)) { diff --git a/include/ylt/coro_io/client_pool.hpp b/include/ylt/coro_io/client_pool.hpp index 7daf961653..51ab33aaa5 100644 --- a/include/ylt/coro_io/client_pool.hpp +++ b/include/ylt/coro_io/client_pool.hpp @@ -99,10 +99,8 @@ class client_pool : public std::enable_shared_from_this< bool ok = false; for (int i = 0; !ok && i < pool_config_.connect_retry_count; ++i) { + co_await coro_io::sleep_for(pool_config_.reconnect_wait_time); ok = (client_t::is_ok(co_await client->reconnect(host_name_))); - if (!ok) { - co_await coro_io::sleep_for(pool_config_.reconnect_wait_time); - } } co_return ok ? std::move(client) : nullptr; } @@ -115,17 +113,7 @@ class client_pool : public std::enable_shared_from_this< if (!free_clients_.try_dequeue(client)) { break; } - if (client->has_closed()) { - [self = this->shared_from_this()](std::unique_ptr client) - -> async_simple::coro::Lazy { - self->collect_free_client( - co_await self->reconnect(std::move(client))); - co_return; - }(std::move(client)) - .start([](auto&& v) { - }); - } - else { + if (!client->has_closed()) { break; } } @@ -232,7 +220,7 @@ class client_pool : public std::enable_shared_from_this< template async_simple::coro::Lazy> send_request( - T op, const typename client_t::config& client_config) { + T op, typename client_t::config& client_config) { // return type: Lazy> auto client = co_await get_client(client_config); if (!client) { @@ -252,7 +240,7 @@ class client_pool : public std::enable_shared_from_this< template decltype(auto) send_request(T op) { - return send_request(op, pool_config_.client_config); + return send_request(std::move(op), pool_config_.client_config); } std::size_t free_client_count() const noexcept { @@ -271,7 +259,7 @@ class client_pool : public std::enable_shared_from_this< template async_simple::coro::Lazy> send_request( T op, std::string_view endpoint, - const typename client_t::config& client_config) { + typename client_t::config& client_config) { // return type: Lazy> auto client = co_await get_client(client_config); if (!client) { @@ -293,7 +281,7 @@ class client_pool : public std::enable_shared_from_this< template decltype(auto) send_request(T op, std::string_view sv) { - return send_request(op, sv, pool_config_.client_config); + return send_request(std::move(op), sv, pool_config_.client_config); } coro_io::detail::client_queue> free_clients_; @@ -322,11 +310,10 @@ class client_pools { co_return ret; } auto send_request(std::string_view host_name, - const typename client_pool_t::pool_config& pool_config, - auto op) + typename client_pool_t::pool_config& pool_config, auto op) -> decltype(std::declval().send_request(std::move(op))) { auto pool = get_client_pool(host_name, pool_config); - auto ret = co_await pool.send_request(std::move(op)); + auto ret = co_await pool->send_request(std::move(op)); co_return ret; } auto at(std::string_view host_name) { diff --git a/include/ylt/coro_io/io_context_pool.hpp b/include/ylt/coro_io/io_context_pool.hpp index 853f829be5..f01c17d967 100644 --- a/include/ylt/coro_io/io_context_pool.hpp +++ b/include/ylt/coro_io/io_context_pool.hpp @@ -72,8 +72,9 @@ class ExecutorWrapper : public async_simple::Executor { private: void schedule(Func func, Duration dur) override { - auto timer = std::make_shared(executor_, dur); - timer->async_wait([fn = std::move(func), timer](auto ec) { + auto timer = std::make_unique(executor_, dur); + auto tm = timer.get(); + tm->async_wait([fn = std::move(func), timer = std::move(timer)](auto ec) { fn(); }); } @@ -81,7 +82,7 @@ class ExecutorWrapper : public async_simple::Executor { template inline async_simple::coro::Lazy -get_executor() { +get_current_executor() { auto executor = co_await async_simple::CurrentExecutor{}; assert(executor != nullptr); co_return static_cast(executor->checkout())->get_executor(); diff --git a/include/ylt/coro_rpc/impl/coro_rpc_server.hpp b/include/ylt/coro_rpc/impl/coro_rpc_server.hpp index 6516b9c9ad..e4cd0f1012 100644 --- a/include/ylt/coro_rpc/impl/coro_rpc_server.hpp +++ b/include/ylt/coro_rpc/impl/coro_rpc_server.hpp @@ -32,10 +32,12 @@ #include #include +#include "async_simple/Promise.h" #include "common_service.hpp" #include "coro_connection.hpp" #include "ylt/coro_io/coro_io.hpp" #include "ylt/coro_io/io_context_pool.hpp" +#include "ylt/coro_rpc/impl/expected.hpp" namespace coro_rpc { /*! * ```cpp @@ -102,48 +104,18 @@ class coro_rpc_server_base { * @return error code if start failed, otherwise block until server stop. */ [[nodiscard]] std::errc start() noexcept { - std::errc ec{}; - { - std::unique_lock lock(start_mtx_); - if (flag_ != stat::init) { - if (flag_ == stat::started) { - ELOGV(INFO, "start again"); - } - else if (flag_ == stat::stop) { - ELOGV(INFO, "has stoped"); - } - return std::errc::io_error; - } - - ec = listen(); - if (ec == std::errc{}) { - thd_ = std::thread([this] { - pool_.run(); - }); - - flag_ = stat::started; - } - else { - flag_ = stat::stop; - } + auto ret = async_start(); + if (ret) { + ret.value().wait(); + return ret.value().value(); } - - cond_.notify_all(); - - if (ec == std::errc{}) { - async_simple::coro::syncAwait(accept()); + else { + return ret.error(); } - return ec; - } - - [[nodiscard]] bool wait_for_start(auto duration) { - std::unique_lock lock(start_mtx_); - return cond_.wait_for(lock, duration, [this] { - return flag_ == stat::started || flag_ == stat::stop; - }); } - [[nodiscard]] async_simple::coro::Lazy async_start() noexcept { + [[nodiscard]] coro_rpc::expected, std::errc> + async_start() noexcept { std::errc ec{}; { std::unique_lock lock(start_mtx_); @@ -154,9 +126,9 @@ class coro_rpc_server_base { else if (flag_ == stat::stop) { ELOGV(INFO, "has stoped"); } - co_return std::errc::io_error; + return coro_rpc::unexpected{ + std::errc::resource_unavailable_try_again}; } - ec = listen(); if (ec == std::errc{}) { if constexpr (requires(typename server_config::executor_pool_t & pool) { @@ -172,12 +144,22 @@ class coro_rpc_server_base { flag_ = stat::stop; } } - - cond_.notify_all(); if (ec == std::errc{}) { - co_await accept(); + async_simple::Promise promise; + auto future = promise.getFuture(); + accept().start([p = std::move(promise)](auto &&res) mutable { + if (res.hasError()) { + p.setValue(std::errc::io_error); + } + else { + p.setValue(res.value()); + } + }); + return std::move(future); + } + else { + return coro_rpc::unexpected{ec}; } - co_return ec; } /*! @@ -193,8 +175,8 @@ class coro_rpc_server_base { ELOGV(INFO, "begin to stop coro_rpc_server, conn size %d", conns_.size()); - close_acceptor(); if (flag_ == stat::started) { + close_acceptor(); { std::unique_lock lock(conns_mtx_); for (auto &conn : conns_) { @@ -350,9 +332,11 @@ class coro_rpc_server_base { } #endif if (error) { - ELOGV(ERROR, "accept failed, error: %s", error.message().data()); - if (error == asio::error::operation_aborted) { - co_return std::errc::io_error; + ELOGV(INFO, "accept failed, error: %s", error.message().data()); + if (error == asio::error::operation_aborted || + error == asio::error::bad_descriptor) { + acceptor_close_waiter_.set_value(); + co_return std::errc::operation_canceled; } continue; } @@ -392,17 +376,17 @@ class coro_rpc_server_base { acceptor_.cancel(ec); acceptor_.close(ec); }); + acceptor_close_waiter_.get_future().wait(); } typename server_config::executor_pool_t pool_; asio::ip::tcp::acceptor acceptor_; + std::promise acceptor_close_waiter_; std::thread thd_; stat flag_; std::mutex start_mtx_; - std::condition_variable cond_; - uint64_t conn_id_ = 0; std::unordered_map> conns_; std::mutex conns_mtx_; diff --git a/src/coro_io/tests/test_channel.cpp b/src/coro_io/tests/test_channel.cpp index 7d8fb28403..9642562660 100644 --- a/src/coro_io/tests/test_channel.cpp +++ b/src/coro_io/tests/test_channel.cpp @@ -22,10 +22,8 @@ TEST_CASE("test RR") { async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { coro_rpc::coro_rpc_server server(1, 8801); - server.async_start().start([](auto &&) { - }); - auto is_started = server.wait_for_start(std::chrono::seconds{1}); - CHECK(is_started); + auto res = server.async_start(); + REQUIRE_MESSAGE(res, "server start failed"); auto hosts = std::vector{"127.0.0.1:8801", "localhost:8801"}; auto channel = coro_io::channel::create(hosts); @@ -46,10 +44,8 @@ TEST_CASE("test RR") { TEST_CASE("test Random") { async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { coro_rpc::coro_rpc_server server(1, 8801); - server.async_start().start([](auto &&) { - }); - auto is_started = server.wait_for_start(std::chrono::seconds{1}); - CHECK(is_started); + auto res = server.async_start(); + REQUIRE_MESSAGE(res, "server start failed"); auto hosts = std::vector{"127.0.0.1:8801", "localhost:8801"}; auto channel = coro_io::channel::create( @@ -77,10 +73,8 @@ TEST_CASE("test Random") { TEST_CASE("test single host") { async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { coro_rpc::coro_rpc_server server(1, 8801); - server.async_start().start([](auto &&) { - }); - auto is_started = server.wait_for_start(std::chrono::seconds{1}); - CHECK(is_started); + auto res = server.async_start(); + REQUIRE_MESSAGE(res, "server start failed"); auto hosts = std::vector{"127.0.0.1:8801"}; auto channel = coro_io::channel::create(hosts); for (int i = 0; i < 100; ++i) { @@ -100,20 +94,19 @@ TEST_CASE("test single host") { TEST_CASE("test send_request config") { async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { coro_rpc::coro_rpc_server server(1, 8802); - server.async_start().start([](auto &&) { - }); - auto is_started = server.wait_for_start(std::chrono::seconds{1}); - CHECK(is_started); + auto res = server.async_start(); + REQUIRE_MESSAGE(res, "server start failed"); auto hosts = std::vector{"127.0.0.1:8802"}; auto channel = coro_io::channel::create(hosts); for (int i = 0; i < 100; ++i) { + auto config = coro_rpc::coro_rpc_client::config{.client_id = 114514}; auto res = co_await channel.send_request( [&i, &hosts](coro_rpc::coro_rpc_client &client, std::string_view host) -> async_simple::coro::Lazy { CHECK(client.get_client_id() == 114514); co_return; }, - coro_rpc::coro_rpc_client::config{.client_id = 114514}); + config); CHECK(res.has_value()); } server.stop(); diff --git a/src/coro_io/tests/test_client_pool.cpp b/src/coro_io/tests/test_client_pool.cpp index 23824679e7..8f946da312 100644 --- a/src/coro_io/tests/test_client_pool.cpp +++ b/src/coro_io/tests/test_client_pool.cpp @@ -32,33 +32,72 @@ #include "ylt/coro_rpc/impl/expected.hpp" using namespace std::chrono_literals; using namespace async_simple::coro; +auto event = + []( + int lim, coro_io::client_pool &pool, + ConditionVariable &cv, SpinLock &lock, + std::function user_op = + [](auto &client) { + }) -> async_simple::coro::Lazy { + std::vector> works; + int64_t cnt = 0; + for (int i = 0; i < lim; ++i) { + auto op = [&cnt, &lock, &cv, &lim, + &user_op](coro_rpc::coro_rpc_client &client) + -> async_simple::coro::Lazy { + user_op(client); + auto l = co_await lock.coScopedLock(); + if (++cnt < lim) { + std::cout << cnt << std::endl; + co_await cv.wait(lock, [&cnt, &lim] { + return cnt >= lim; + }); + } + else { + l.unlock(); + cv.notifyAll(); + } + co_return; + }; + auto backer = [&cv, &lock, &cnt, &lim]( + auto &pool, auto op) -> async_simple::coro::Lazy { + async_simple::Promise p; + auto res = co_await pool.send_request(op); + if (!res.has_value()) { + { + co_await lock.coScopedLock(); + cnt = lim; + } + cv.notifyAll(); + co_return false; + } + co_return true; + }; + works.emplace_back(backer(pool, op).via(coro_io::get_global_executor())); + } + std::cout << works.size() << std::endl; + auto res = co_await collectAll(std::move(works)); + for (auto &e : res) { + if (!e.value()) { + co_return false; + } + } + co_return true; +}; TEST_CASE("test client pool") { async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { coro_rpc::coro_rpc_server server(1, 8801); - server.async_start().start([](auto &&) { - }); - auto is_started = server.wait_for_start(1s); + auto is_started = server.async_start(); REQUIRE(is_started); auto pool = coro_io::client_pool::create( "127.0.0.1:8801", {.max_connection = 100, .idle_timeout = 300ms}); - std::vector>> res; - auto event = [&res, &pool]() { - auto op = [](coro_rpc::coro_rpc_client &client) - -> async_simple::coro::Lazy { - co_await coro_io::sleep_for(100ms); - }; - res.emplace_back(pool->send_request(op)); - }; - for (int i = 0; i < 50; ++i) { - event(); - } - co_await collectAll(std::move(res)); - CHECK(pool->free_client_count() == 50); - res.clear(); - for (int i = 0; i < 110; ++i) { - event(); - } - co_await collectAll(std::move(res)); + SpinLock lock; + ConditionVariable cv; + auto res = co_await event(20, *pool, cv, lock); + CHECK(res); + CHECK(pool->free_client_count() == 20); + res = co_await event(100, *pool, cv, lock); + CHECK(res); CHECK(pool->free_client_count() == 100); co_await coro_io::sleep_for(700ms); CHECK(pool->free_client_count() == 0); @@ -68,28 +107,17 @@ TEST_CASE("test client pool") { TEST_CASE("test idle timeout yield") { async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { coro_rpc::coro_rpc_server server(1, 8801); - server.async_start().start([](auto &&) { - }); - auto is_started = server.wait_for_start(1s); + auto is_started = server.async_start(); REQUIRE(is_started); auto pool = coro_io::client_pool::create( "127.0.0.1:8801", {.max_connection = 100, .idle_queue_max_clear_count = 1, .idle_timeout = 300ms}); - std::vector>> res; - auto event = [&res, &pool]() { - auto op = [](coro_rpc::coro_rpc_client &client) - -> async_simple::coro::Lazy { - co_await coro_io::sleep_for(100ms); - }; - res.emplace_back(pool->send_request(op)); - }; - for (int i = 0; i < 100; ++i) { - event(); - } - co_await collectAll(std::move(res)); + SpinLock lock; + ConditionVariable cv; + auto res = co_await event(100, *pool, cv, lock); + CHECK(res); CHECK(pool->free_client_count() == 100); - res.clear(); co_await coro_io::sleep_for(700ms); CHECK(pool->free_client_count() == 0); server.stop(); @@ -101,54 +129,40 @@ TEST_CASE("test reconnect") { auto pool = coro_io::client_pool::create( "127.0.0.1:8801", {.connect_retry_count = 3, .reconnect_wait_time = 500ms}); - std::vector>> res; - auto event = [&res, &pool]() { - auto op = [](coro_rpc::coro_rpc_client &client) - -> async_simple::coro::Lazy { - co_await coro_io::sleep_for(100ms); - }; - res.emplace_back(pool->send_request(op)); - }; - for (int i = 0; i < 100; ++i) { - event(); - } - coro_rpc::coro_rpc_server server(1, 8801); - res.push_back([&server]() -> Lazy> { - co_await coro_io::sleep_for(700ms); - server.async_start().start([](auto &&) { - }); - co_return coro_rpc::expected{}; - }()); - co_await collectAll(std::move(res)); + SpinLock lock; + ConditionVariable cv; + coro_rpc::coro_rpc_server server(2, 8801); + async_simple::Promise p; + coro_io::sleep_for(700ms).start([&server, &p](auto &&) { + auto server_is_started = server.async_start(); + REQUIRE(server_is_started); + }); + + auto res = co_await event(100, *pool, cv, lock); + CHECK(res); CHECK(pool->free_client_count() == 100); server.stop(); + co_return; }()); } TEST_CASE("test collect_free_client") { async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy { coro_rpc::coro_rpc_server server(1, 8801); - server.async_start().start([](auto &&) { - }); - auto is_started = server.wait_for_start(1s); + auto is_started = server.async_start(); REQUIRE(is_started); auto pool = coro_io::client_pool::create( "127.0.0.1:8801", {.max_connection = 100, .idle_timeout = 300ms}); - std::vector>> res; - auto event = [&res, &pool]() { - auto op = [](coro_rpc::coro_rpc_client &client) - -> async_simple::coro::Lazy { - co_await coro_io::sleep_for(100ms); - client.close(); - }; - res.emplace_back(pool->send_request(op)); - }; - for (int i = 0; i < 50; ++i) { - event(); - } - co_await collectAll(std::move(res)); + + SpinLock lock; + ConditionVariable cv; + auto res = co_await event(50, *pool, cv, lock, [](auto &client) { + client.close(); + }); + CHECK(res); CHECK(pool->free_client_count() == 0); server.stop(); + co_return; }()); } diff --git a/src/coro_rpc/benchmark/data_gen.cpp b/src/coro_rpc/benchmark/data_gen.cpp index 2836433385..9d19a60b3b 100644 --- a/src/coro_rpc/benchmark/data_gen.cpp +++ b/src/coro_rpc/benchmark/data_gen.cpp @@ -28,15 +28,10 @@ using namespace std::chrono_literals; int main() { using namespace coro_rpc; coro_rpc::coro_rpc_server server(std::thread::hardware_concurrency(), 0); - std::thread thrd([&] { - start_server(server); - }); - bool started = server.wait_for_start(3s); + auto started = server.async_start(); if (!started) { ELOGV(ERROR, "server started failed"); - server.stop(); - thrd.join(); return -1; } @@ -109,7 +104,6 @@ int main() { syncAwait(client.call(42)); server.stop(); - thrd.join(); return 0; }; \ No newline at end of file diff --git a/src/coro_rpc/examples/base_examples/rpc_service.cpp b/src/coro_rpc/examples/base_examples/rpc_service.cpp index 3d1f3e0d52..54d70b4ceb 100644 --- a/src/coro_rpc/examples/base_examples/rpc_service.cpp +++ b/src/coro_rpc/examples/base_examples/rpc_service.cpp @@ -57,7 +57,7 @@ void hello_with_delay(context conn, async_simple::coro::Lazy nested_echo(std::string_view sv) { ELOGV(INFO, "start nested echo"); - coro_rpc::coro_rpc_client client(co_await coro_io::get_executor()); + coro_rpc::coro_rpc_client client(co_await coro_io::get_current_executor()); [[maybe_unused]] auto ec = co_await client.connect("127.0.0.1", "8802"); assert(ec == std::errc{}); ELOGV(INFO, "connect another server"); diff --git a/src/coro_rpc/tests/ServerTester.hpp b/src/coro_rpc/tests/ServerTester.hpp index 4190f82571..9d9f05ea47 100644 --- a/src/coro_rpc/tests/ServerTester.hpp +++ b/src/coro_rpc/tests/ServerTester.hpp @@ -44,7 +44,6 @@ using namespace std::chrono_literals; struct TesterConfig { TesterConfig() = default; TesterConfig(TesterConfig &c) { - async_start = c.sync_client; enable_heartbeat = c.enable_heartbeat; use_ssl = c.use_ssl; sync_client = c.sync_client; @@ -52,7 +51,6 @@ struct TesterConfig { port = c.port; conn_timeout_duration = c.conn_timeout_duration; } - bool async_start; bool enable_heartbeat; bool use_ssl; bool sync_client; @@ -64,8 +62,7 @@ struct TesterConfig { friend std::ostream &operator<<(std::ostream &os, const TesterConfig &config) { os << std::boolalpha; - os << "async_start: " << config.async_start << ";" - << " enable_heartbeat: " << config.enable_heartbeat << ";" + os << " enable_heartbeat: " << config.enable_heartbeat << ";" << " use_ssl: " << config.use_ssl << ";" << " sync_client: " << config.sync_client << ";" << " use_outer_io_context: " << config.use_outer_io_context << ";" diff --git a/src/coro_rpc/tests/test_coro_rpc_client.cpp b/src/coro_rpc/tests/test_coro_rpc_client.cpp index cfa8495923..19653f1887 100644 --- a/src/coro_rpc/tests/test_coro_rpc_client.cpp +++ b/src/coro_rpc/tests/test_coro_rpc_client.cpp @@ -84,10 +84,9 @@ TEST_CASE("testing client") { server.init_ssl_context( ssl_configure{"../openssl_files", "server.crt", "server.key"}); #endif - server.async_start().start([](auto&&) { - }); + auto res = server.async_start(); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + CHECK_MESSAGE(res, "server start failed"); SUBCASE("call rpc, function not registered") { g_action = {}; @@ -171,9 +170,8 @@ TEST_CASE("testing client with inject server") { server.init_ssl_context( ssl_configure{"../openssl_files", "server.crt", "server.key"}); #endif - server.async_start().start([](auto&&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + auto res = server.async_start(); + CHECK_MESSAGE(res, "server start failed"); server.register_handler(); @@ -238,9 +236,8 @@ class SSLClientTester { ssl_configure config{base_path, server_crt_path, server_key_path, dh_path}; server.init_ssl_context(config); server.template register_handler(); - server.async_start().start([](auto&&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + auto res = server.async_start(); + CHECK_MESSAGE(res, "server start timeout"); std::promise promise; auto future = promise.get_future(); @@ -365,9 +362,9 @@ TEST_CASE("testing client with ssl server") { TEST_CASE("testing client with eof") { g_action = {}; coro_rpc_server server(2, 8801); - server.async_start().start([](auto&&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + + auto res = server.async_start(); + REQUIRE_MESSAGE(res, "server start failed"); coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); auto ec = client.sync_connect("127.0.0.1", "8801"); REQUIRE_MESSAGE(ec == std::errc{}, make_error_code(ec).message()); @@ -387,9 +384,8 @@ TEST_CASE("testing client with eof") { TEST_CASE("testing client with shutdown") { g_action = {}; coro_rpc_server server(2, 8801); - server.async_start().start([](auto&&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + auto res = server.async_start(); + CHECK_MESSAGE(res, "server start timeout"); coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); auto ec = client.sync_connect("127.0.0.1", "8801"); REQUIRE_MESSAGE(ec == std::errc{}, make_error_code(ec).message()); @@ -454,9 +450,8 @@ TEST_CASE("testing client sync connect, unit test inject only") { SUBCASE("client use ssl but server don't use ssl") { g_action = {}; coro_rpc_server server(2, 8801); - server.async_start().start([](auto&&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + auto res = server.async_start(); + CHECK_MESSAGE(res, "server start timeout"); coro_rpc_client client2(*coro_io::get_global_executor(), g_client_id++); bool ok = client2.init_ssl("../openssl_files", "server.crt"); CHECK(ok == true); @@ -489,9 +484,8 @@ TEST_CASE("testing client call timeout") { server.register_handler(); server.register_handler(); - server.async_start().start([](auto&&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + auto res = server.async_start(); + CHECK_MESSAGE(res, "server start timeout"); coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); auto ec_lazy = client.connect("127.0.0.1", "8801"); auto ec = syncAwait(ec_lazy); diff --git a/src/coro_rpc/tests/test_coro_rpc_server.cpp b/src/coro_rpc/tests/test_coro_rpc_server.cpp index 4495ae9528..0abb3fbb3a 100644 --- a/src/coro_rpc/tests/test_coro_rpc_server.cpp +++ b/src/coro_rpc/tests/test_coro_rpc_server.cpp @@ -39,41 +39,10 @@ struct CoroServerTester : ServerTester { ssl_configure{"../openssl_files", "server.crt", "server.key"}); } #endif - if (async_start) { - // https://timsong-cpp.github.io/cppwp/n4861/temp.names#5.example-1 - // https://developercommunity.visualstudio.com/t/c2059-syntax-error-template-for-valid-template-mem/1632142 - /* - template struct A { - void f(int); - template void f(U); - }; - - template void f(T t) { - A a; - a.template f<>(t); // OK: calls template - a.template f(t); // error: not a template-id - } - */ - server.async_start().template start<>([](auto &&) { - }); - } - else { - thd = std::thread([&] { - auto ec = server.start(); - REQUIRE(ec == std::errc{}); - }); - } - - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); - } - ~CoroServerTester() { - if (async_start) { - } - else { - server.stop(); - thd.join(); - } + auto res = server.async_start(); + CHECK_MESSAGE(res, "server start timeout"); } + ~CoroServerTester() { server.stop(); } async_simple::coro::Lazy get_value(int val) { co_return val; } @@ -147,13 +116,8 @@ struct CoroServerTester : ServerTester { void test_server_start_again() { ELOGV(INFO, "run %s", __func__); - std::errc ec; - if (async_start) { - ec = syncAwait(server.async_start()); - } - else { - ec = server.start(); - } + + auto ec = server.start(); REQUIRE_MESSAGE(ec == std::errc::io_error, make_error_code(ec).message()); } @@ -161,15 +125,10 @@ struct CoroServerTester : ServerTester { ELOGV(INFO, "run %s", __func__); { auto new_server = coro_rpc_server(2, std::stoi(this->port_)); - std::errc ec; - if (async_start) { - ec = syncAwait(new_server.async_start()); - } - else { - ec = new_server.start(); - } - REQUIRE_MESSAGE(ec == std::errc::address_in_use, - make_error_code(ec).message()); + auto ec = new_server.async_start(); + REQUIRE(!ec); + REQUIRE_MESSAGE(ec.error() == std::errc::address_in_use, + make_error_code(ec.error()).message()); } ELOGV(INFO, "OH NO"); } @@ -238,35 +197,31 @@ TEST_CASE("testing coro rpc server") { unsigned short server_port = 8810; auto conn_timeout_duration = 500ms; std::vector switch_list{true, false}; - for (auto async_start : switch_list) { - for (auto enable_heartbeat : switch_list) { - for (auto use_ssl : switch_list) { - TesterConfig config; - config.async_start = async_start; - config.enable_heartbeat = enable_heartbeat; - config.use_ssl = use_ssl; - config.sync_client = false; - config.use_outer_io_context = false; - config.port = server_port; - if (enable_heartbeat) { - config.conn_timeout_duration = conn_timeout_duration; - } - std::stringstream ss; - ss << config; - ELOGV(INFO, "config: %s", ss.str().data()); - CoroServerTester(config).run(); + for (auto enable_heartbeat : switch_list) { + for (auto use_ssl : switch_list) { + TesterConfig config; + config.enable_heartbeat = enable_heartbeat; + config.use_ssl = use_ssl; + config.sync_client = false; + config.use_outer_io_context = false; + config.port = server_port; + if (enable_heartbeat) { + config.conn_timeout_duration = conn_timeout_duration; } - // } + std::stringstream ss; + ss << config; + ELOGV(INFO, "config: %s", ss.str().data()); + CoroServerTester(config).run(); } + // } } } TEST_CASE("testing coro rpc server stop") { ELOGV(INFO, "run testing coro rpc server stop"); coro_rpc_server server(2, 8810); - server.async_start().start([](auto &&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + auto res = server.async_start(); + REQUIRE_MESSAGE(res, "server start failed"); SUBCASE("stop twice") { server.stop(); server.stop(); @@ -288,9 +243,8 @@ TEST_CASE("test server accept error") { g_action = inject_action::force_inject_server_accept_error; coro_rpc_server server(2, 8810); server.register_handler(); - server.async_start().start([](auto &&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + auto res = server.async_start(); + CHECK_MESSAGE(res, "server start timeout"); coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); ELOGV(INFO, "run test server accept error, client_id %d", client.get_client_id()); @@ -318,9 +272,8 @@ TEST_CASE("test server write queue") { g_action = {}; coro_rpc_server server(2, 8810); server.register_handler(); - server.async_start().start([](auto &&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + auto res = server.async_start(); + CHECK_MESSAGE(res, "server start timeout"); std::string buffer; buffer.reserve(coro_rpc_protocol::REQ_HEAD_LEN + struct_pack::get_needed_size(std::monostate{})); @@ -383,9 +336,8 @@ TEST_CASE("testing coro rpc write error") { g_action = inject_action::force_inject_connection_close_socket; coro_rpc_server server(2, 8810); server.register_handler(); - server.async_start().start([](auto &&) { - }); - CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); + auto res = server.async_start(); + CHECK_MESSAGE(res, "server start failed"); coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); ELOGV(INFO, "run testing coro rpc write error, client_id %d", client.get_client_id()); diff --git a/src/coro_rpc/tests/test_variadic.cpp b/src/coro_rpc/tests/test_variadic.cpp index 0dc4232683..1f6feba63b 100644 --- a/src/coro_rpc/tests/test_variadic.cpp +++ b/src/coro_rpc/tests/test_variadic.cpp @@ -29,16 +29,9 @@ TEST_CASE("test varadic param") { auto server = std::make_unique( std::thread::hardware_concurrency(), 8808); - std::thread thrd([&] { - server->register_handler(); - try { - auto ec = server->start(); - REQUIRE(ec == std::errc{}); - } catch (const std::exception& e) { - std::cerr << "test varadic param Exception: " << e.what() << "\n"; - } - }); - REQUIRE_MESSAGE(server->wait_for_start(3s), "server start timeout"); + server->register_handler(); + auto res = server->async_start(); + REQUIRE_MESSAGE(res, "server start failed"); coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); syncAwait(client.connect("localhost", std::to_string(server->port()))); @@ -51,8 +44,6 @@ TEST_CASE("test varadic param") { })); ELOGV(INFO, "begin to stop server"); server->stop(); - if (thrd.joinable()) - thrd.join(); ELOGV(INFO, "finished stop server"); if (ret) { ELOGV(INFO, "ret value %s", ret.value().data());