From fa92e01e32394ba0acb9f06b18e23fbb2a5da4ac Mon Sep 17 00:00:00 2001 From: saipubw Date: Mon, 10 Jul 2023 16:33:20 +0800 Subject: [PATCH] [coro_rpc] [coro_http] use global executor instead of inner io_context (#358) --- include/ylt/coro_rpc/impl/coro_rpc_client.hpp | 67 +++----------- .../thirdparty/cinatra/coro_http_client.hpp | 39 +------- src/coro_http/examples/CMakeLists.txt | 3 +- src/coro_io/examples/CMakeLists.txt | 3 +- .../examples/base_examples/CMakeLists.txt | 1 + .../examples/base_examples/channel.cpp | 4 +- .../examples/base_examples/client_pool.cpp | 4 +- .../examples/base_examples/client_pools.cpp | 4 +- .../base_examples/concurrent_clients.cpp | 92 +++++++++++++++++++ src/coro_rpc/tests/ServerTester.hpp | 7 +- src/coro_rpc/tests/test_coro_rpc_client.cpp | 16 ++-- src/coro_rpc/tests/test_coro_rpc_server.cpp | 4 +- src/coro_rpc/tests/test_variadic.cpp | 2 +- 13 files changed, 136 insertions(+), 110 deletions(-) create mode 100644 src/coro_rpc/examples/base_examples/concurrent_clients.cpp diff --git a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp index 12d116aa1..51dcf7053 100644 --- a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp +++ b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp @@ -132,32 +132,15 @@ class coro_rpc_client { * Create client with io_context * @param io_context asio io_context, async event handler */ - coro_rpc_client(coro_io::ExecutorWrapper<> &executor, uint32_t client_id = 0) + coro_rpc_client( + coro_io::ExecutorWrapper<> &executor = *coro_io::get_global_executor(), + uint32_t client_id = 0) : executor(executor.get_asio_executor()), socket_(executor.get_asio_executor()) { config_.client_id = client_id; read_buf_.resize(default_read_buf_size_); } - /*! - * Create client - */ - coro_rpc_client(uint32_t client_id = 0) - : inner_io_context_(std::make_unique()), - executor(inner_io_context_->get_executor()), - socket_(inner_io_context_->get_executor()) { - config_.client_id = client_id; - std::promise promise; - thd_ = std::thread([this, &promise] { - work_ = std::make_unique(*inner_io_context_); - executor.schedule([&] { - promise.set_value(); - }); - inner_io_context_->run(); - }); - promise.get_future().wait(); - } - [[nodiscard]] bool init_config(const config &conf) { config_ = conf; #ifdef YLT_ENABLE_SSL @@ -194,7 +177,7 @@ class coro_rpc_client { config_.timeout_duration = std::chrono::duration_cast(timeout_duration); reset(); - return connect(true); + return connect(is_reconnect_t{true}); } [[nodiscard]] async_simple::coro::Lazy reconnect( @@ -207,7 +190,7 @@ class coro_rpc_client { config_.timeout_duration = std::chrono::duration_cast(timeout_duration); reset(); - return connect(true); + return connect(is_reconnect_t{true}); } /*! * Connect server @@ -231,7 +214,7 @@ class coro_rpc_client { return connect(); } [[nodiscard]] async_simple::coro::Lazy connect( - std::string endpoint, + std::string_view endpoint, std::chrono::steady_clock::duration timeout_duration = std::chrono::seconds(5)) { auto pos = endpoint.find(':'); @@ -254,10 +237,7 @@ class coro_rpc_client { } #endif - ~coro_rpc_client() { - close(); - stop_inner_io_context(); - } + ~coro_rpc_client() { close(); } /*! * Call RPC function with default timeout (5 second) @@ -359,6 +339,12 @@ class coro_rpc_client { friend class coro_io::client_pool; private: + // the const char * will convert to bool instead of std::string_view + // use this struct to prevent it. + struct is_reconnect_t { + bool value = false; + }; + void reset() { close_socket(); socket_ = decltype(socket_)(executor.get_asio_executor()); @@ -368,14 +354,14 @@ class coro_rpc_client { static bool is_ok(std::errc ec) noexcept { return ec == std::errc{}; } [[nodiscard]] async_simple::coro::Lazy connect( - bool is_reconnect = false) { + is_reconnect_t is_reconnect = is_reconnect_t{false}) { #ifdef YLT_ENABLE_SSL if (!ssl_init_ret_) { std::cout << "ssl_init_ret_: " << ssl_init_ret_ << std::endl; co_return std::errc::not_connected; } #endif - if (!is_reconnect && has_closed_) + if (!is_reconnect.value && has_closed_) AS_UNLIKELY { ELOGV(ERROR, "a closed client is not allowed connect again, please use " @@ -756,26 +742,6 @@ class coro_rpc_client { has_closed_ = true; } - void stop_inner_io_context() { - if (thd_.joinable()) { - work_ = nullptr; - if (thd_.get_id() == std::this_thread::get_id()) { - // we are now running in inner_io_context_, so destruction it in - // another thread - std::thread thrd{[ioc = std::move(inner_io_context_), - thd = std::move(thd_)]() mutable { - thd.join(); - }}; - thrd.detach(); - } - else { - thd_.join(); - } - } - - return; - } - #ifdef UNIT_TEST_INJECT public: std::errc sync_connect(const std::string &host, const std::string &port) { @@ -790,9 +756,6 @@ class coro_rpc_client { } #endif private: - std::unique_ptr inner_io_context_; - std::unique_ptr work_; - std::thread thd_; coro_io::ExecutorWrapper<> executor; asio::ip::tcp::socket socket_; std::vector read_buf_; diff --git a/include/ylt/thirdparty/cinatra/coro_http_client.hpp b/include/ylt/thirdparty/cinatra/coro_http_client.hpp index 478a00671..cb40abfd7 100644 --- a/include/ylt/thirdparty/cinatra/coro_http_client.hpp +++ b/include/ylt/thirdparty/cinatra/coro_http_client.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include "http_parser.hpp" #include "response_cv.hpp" @@ -181,28 +182,14 @@ class coro_http_client { std::string domain; #endif }; - coro_http_client() - : io_ctx_(std::make_unique()), - socket_(std::make_shared(io_ctx_->get_executor())), - executor_wrapper_(io_ctx_->get_executor()), - timer_(&executor_wrapper_) { - std::promise promise; - io_thd_ = std::thread([this, &promise] { - work_ = std::make_unique(*io_ctx_); - executor_wrapper_.schedule([&] { - promise.set_value(); - }); - io_ctx_->run(); - }); - promise.get_future().wait(); - } coro_http_client(asio::io_context::executor_type executor) : socket_(std::make_shared(executor)), executor_wrapper_(executor), timer_(&executor_wrapper_) {} - coro_http_client(coro_io::ExecutorWrapper<> *executor) + coro_http_client( + coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor()) : coro_http_client(executor->get_asio_executor()) {} bool init_config(const config &conf) { @@ -237,22 +224,7 @@ class coro_http_client { return true; } - ~coro_http_client() { - async_close(); - if (io_thd_.joinable()) { - work_ = nullptr; - if (io_thd_.get_id() == std::this_thread::get_id()) { - std::thread thrd{[io_ctx = std::move(io_ctx_), - io_thd = std::move(io_thd_)]() mutable { - io_thd.join(); - }}; - thrd.detach(); - } - else { - io_thd_.join(); - } - } - } + ~coro_http_client() { async_close(); } void async_close() { if (socket_->has_closed_) @@ -1649,12 +1621,9 @@ class coro_http_client { return has_http_scheme; } - std::unique_ptr io_ctx_; - coro_io::ExecutorWrapper<> executor_wrapper_; std::unique_ptr work_; coro_io::period_timer timer_; - std::thread io_thd_; std::shared_ptr socket_; asio::streambuf read_buf_; simple_buffer body_{}; diff --git a/src/coro_http/examples/CMakeLists.txt b/src/coro_http/examples/CMakeLists.txt index 2bc0018de..dc662924a 100644 --- a/src/coro_http/examples/CMakeLists.txt +++ b/src/coro_http/examples/CMakeLists.txt @@ -1,7 +1,6 @@ if("${yaLanTingLibs_SOURCE_DIR}" STREQUAL "${CMAKE_SOURCE_DIR}") - # if is the subproject of yalantinglibs - # do nothing +set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/output/examples/coro_http) else() # else find installed yalantinglibs cmake_minimum_required(VERSION 3.15) diff --git a/src/coro_io/examples/CMakeLists.txt b/src/coro_io/examples/CMakeLists.txt index 8ff905d93..a766f24f7 100644 --- a/src/coro_io/examples/CMakeLists.txt +++ b/src/coro_io/examples/CMakeLists.txt @@ -1,7 +1,6 @@ if("${yaLanTingLibs_SOURCE_DIR}" STREQUAL "${CMAKE_SOURCE_DIR}") - # if is the subproject of yalantinglibs - # do nothing + set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/output/examples) else() # else find installed yalantinglibs cmake_minimum_required(VERSION 3.15) diff --git a/src/coro_rpc/examples/base_examples/CMakeLists.txt b/src/coro_rpc/examples/base_examples/CMakeLists.txt index 029537c3c..567c24c60 100644 --- a/src/coro_rpc/examples/base_examples/CMakeLists.txt +++ b/src/coro_rpc/examples/base_examples/CMakeLists.txt @@ -40,5 +40,6 @@ add_executable(coro_rpc_example_channel channel.cpp) add_executable(coro_rpc_example_client_pool client_pool.cpp) add_executable(coro_rpc_example_client_pools client_pools.cpp) add_executable(coro_rpc_example_client client.cpp) +add_executable(coro_rpc_example_concurrent_clients concurrent_clients.cpp) add_executable(coro_rpc_example_server server.cpp rpc_service.cpp) diff --git a/src/coro_rpc/examples/base_examples/channel.cpp b/src/coro_rpc/examples/base_examples/channel.cpp index 27c0c8ff8..4fdbe5022 100644 --- a/src/coro_rpc/examples/base_examples/channel.cpp +++ b/src/coro_rpc/examples/base_examples/channel.cpp @@ -51,11 +51,11 @@ Lazy call_echo(std::shared_ptr> channel, [](coro_rpc_client &client, std::string_view hostname) -> Lazy { auto res = co_await client.call("Hello world!"); if (!res.has_value()) { - std::cout << "coro_rpc err: \n"; + std::cout << "coro_rpc err: \n" << res.error().msg; co_return; } if (res.value() != "Hello world!"sv) { - std::cout << "err echo resp: \n"; + std::cout << "err echo resp: \n" << res.value(); co_return; } ++qps; diff --git a/src/coro_rpc/examples/base_examples/client_pool.cpp b/src/coro_rpc/examples/base_examples/client_pool.cpp index b16fd096b..04475481a 100644 --- a/src/coro_rpc/examples/base_examples/client_pool.cpp +++ b/src/coro_rpc/examples/base_examples/client_pool.cpp @@ -44,11 +44,11 @@ Lazy call_echo(coro_io::client_pool &client_pool, [=](coro_rpc_client &client) -> Lazy { auto res = co_await client.call("Hello world!"); if (!res.has_value()) { - std::cout << "coro_rpc err: \n"; + std::cout << "coro_rpc err: \n" << res.error().msg; co_return; } if (res.value() != "Hello world!"sv) { - std::cout << "err echo resp: \n"; + std::cout << "err echo resp: \n" << res.value(); co_return; } ++qps; diff --git a/src/coro_rpc/examples/base_examples/client_pools.cpp b/src/coro_rpc/examples/base_examples/client_pools.cpp index 63afdb31f..b06303a98 100644 --- a/src/coro_rpc/examples/base_examples/client_pools.cpp +++ b/src/coro_rpc/examples/base_examples/client_pools.cpp @@ -48,11 +48,11 @@ Lazy call_echo(coro_io::client_pools &client_pools, [=](coro_rpc_client &client) -> Lazy { auto res = co_await client.call("Hello world!"); if (!res.has_value()) { - std::cout << "coro_rpc err: \n"; + std::cout << "coro_rpc err: \n" << res.error().msg; co_return; } if (res.value() != "Hello world!"sv) { - std::cout << "err echo resp: \n"; + std::cout << "err echo resp: \n" << res.value(); co_return; } ++qps; diff --git a/src/coro_rpc/examples/base_examples/concurrent_clients.cpp b/src/coro_rpc/examples/base_examples/concurrent_clients.cpp new file mode 100644 index 000000000..4965d55da --- /dev/null +++ b/src/coro_rpc/examples/base_examples/concurrent_clients.cpp @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2023, Alibaba Group Holding Limited; + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "ylt/coro_rpc/impl/coro_rpc_client.hpp" +std::string echo(std::string_view sv); +using namespace coro_rpc; +using namespace async_simple::coro; +using namespace std::string_view_literals; +using namespace std::chrono_literals; +std::atomic qps = 0; + +std::atomic working_echo = 0; +/*! + * \example helloworld/concurrency_clients.main.cpp + * \brief demo for run concurrency clients + */ + +Lazy call_echo(int cnt) { + ++working_echo; + coro_rpc_client client; + std::errc ec = co_await client.connect("localhost:8801"); + for (int i = 0; i < 3 && ec != std::errc{}; ++i) { + co_await coro_io::sleep_for(rand() % 10000 * 1ms); + ec = co_await client.reconnect("localhost:8801"); + } + if (ec == std::errc{}) { + for (int i = 0; i < cnt; ++i) { + auto res = co_await client.call("Hello world!"); + if (!res.has_value()) { + std::cout << "coro_rpc err: \n" << res.error().msg; + co_return; + } + if (res.value() != "Hello world!"sv) { + std::cout << "err echo resp: \n" << res.value(); + co_return; + } + ++qps; + } + } + else { + std::cout << "connect failed \n"; + } + --working_echo; +} + +Lazy qps_watcher() { + using namespace std::chrono_literals; + while (working_echo > 0) { + co_await coro_io::sleep_for(1s); + uint64_t cnt = qps.exchange(0); + std::cout << "QPS:" << cnt << " working echo:" << working_echo << std::endl; + cnt = 0; + } +} + +int main() { + auto thread_cnt = std::thread::hardware_concurrency(); + for (int i = 0, lim = thread_cnt * 20; i < lim; ++i) { + call_echo(100000).start([](auto &&) { + }); + } + syncAwait(qps_watcher()); + std::cout << "Done!" << std::endl; + return 0; +} \ No newline at end of file diff --git a/src/coro_rpc/tests/ServerTester.hpp b/src/coro_rpc/tests/ServerTester.hpp index 72821bbcb..4190f8257 100644 --- a/src/coro_rpc/tests/ServerTester.hpp +++ b/src/coro_rpc/tests/ServerTester.hpp @@ -28,6 +28,7 @@ #include "doctest.h" #include "inject_action.hpp" #include "rpc_api.hpp" +#include "ylt/coro_io/io_context_pool.hpp" #ifdef _MSC_VER #define CORO_RPC_FUNCTION_SIGNATURE __FUNCSIG__ @@ -138,7 +139,8 @@ struct ServerTester : TesterConfig { g_client_id++); } else { - client = std::make_shared(g_client_id++); + client = std::make_shared( + *coro_io::get_global_executor(), g_client_id++); } #ifdef YLT_ENABLE_SSL if (use_ssl) { @@ -355,7 +357,8 @@ struct ServerTester : TesterConfig { g_client_id++); } else { - client = std::make_shared(g_client_id++); + client = std::make_shared( + *coro_io::get_global_executor(), g_client_id++); } #ifdef YLT_ENABLE_SSL if (use_ssl) { diff --git a/src/coro_rpc/tests/test_coro_rpc_client.cpp b/src/coro_rpc/tests/test_coro_rpc_client.cpp index 6452a40fa..cfa849592 100644 --- a/src/coro_rpc/tests/test_coro_rpc_client.cpp +++ b/src/coro_rpc/tests/test_coro_rpc_client.cpp @@ -368,7 +368,7 @@ TEST_CASE("testing client with eof") { server.async_start().start([](auto&&) { }); CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); - coro_rpc_client client(g_client_id++); + coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); auto ec = client.sync_connect("127.0.0.1", "8801"); REQUIRE_MESSAGE(ec == std::errc{}, make_error_code(ec).message()); @@ -390,7 +390,7 @@ TEST_CASE("testing client with shutdown") { server.async_start().start([](auto&&) { }); CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); - coro_rpc_client client(g_client_id++); + coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); auto ec = client.sync_connect("127.0.0.1", "8801"); REQUIRE_MESSAGE(ec == std::errc{}, make_error_code(ec).message()); server.register_handler(); @@ -420,7 +420,7 @@ TEST_CASE("testing client timeout") { SUBCASE("connect, ip timeout") { g_action = {}; // https://stackoverflow.com/questions/100841/artificially-create-a-connection-timeout-error - coro_rpc_client client(g_client_id++); + coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); auto ret = client.connect("10.255.255.1", "8801", 5ms); auto val = syncAwait(ret); CHECK_MESSAGE(val == std::errc::timed_out, make_error_code(val).message()); @@ -439,14 +439,14 @@ TEST_CASE("testing client timeout") { // } } TEST_CASE("testing client connect err") { - coro_rpc_client client(g_client_id++); + coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); auto val = syncAwait(client.connect("127.0.0.1", "8801")); CHECK_MESSAGE(val == std::errc::not_connected, make_error_code(val).message()); } #ifdef UNIT_TEST_INJECT TEST_CASE("testing client sync connect, unit test inject only") { - coro_rpc_client client(g_client_id++); + coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); auto val = client.sync_connect("127.0.0.1", "8801"); CHECK_MESSAGE(val == std::errc::not_connected, make_error_code(val).message()); @@ -457,7 +457,7 @@ TEST_CASE("testing client sync connect, unit test inject only") { server.async_start().start([](auto&&) { }); CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); - coro_rpc_client client2(g_client_id++); + coro_rpc_client client2(*coro_io::get_global_executor(), g_client_id++); bool ok = client2.init_ssl("../openssl_files", "server.crt"); CHECK(ok == true); val = client2.sync_connect("127.0.0.1", "8801"); @@ -474,7 +474,7 @@ TEST_CASE("testing client call timeout") { // coro_rpc_server server(2, 8801); // server.async_start().start([](auto&&) { // }); - coro_rpc_client client(g_client_id++); + coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); // auto ec_lazy = client.connect("127.0.0.1", "8801", 5ms); // auto ec = syncAwait(ec_lazy); // assert(ec == std::errc{}); @@ -492,7 +492,7 @@ TEST_CASE("testing client call timeout") { server.async_start().start([](auto&&) { }); CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); - coro_rpc_client client(g_client_id++); + coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); auto ec_lazy = client.connect("127.0.0.1", "8801"); auto ec = syncAwait(ec_lazy); REQUIRE(ec == std::errc{}); diff --git a/src/coro_rpc/tests/test_coro_rpc_server.cpp b/src/coro_rpc/tests/test_coro_rpc_server.cpp index 65ec66c6b..4495ae952 100644 --- a/src/coro_rpc/tests/test_coro_rpc_server.cpp +++ b/src/coro_rpc/tests/test_coro_rpc_server.cpp @@ -291,7 +291,7 @@ TEST_CASE("test server accept error") { server.async_start().start([](auto &&) { }); CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); - coro_rpc_client client(g_client_id++); + coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); ELOGV(INFO, "run test server accept error, client_id %d", client.get_client_id()); auto ec = syncAwait(client.connect("127.0.0.1", "8810")); @@ -386,7 +386,7 @@ TEST_CASE("testing coro rpc write error") { server.async_start().start([](auto &&) { }); CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout"); - coro_rpc_client client(g_client_id++); + coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); ELOGV(INFO, "run testing coro rpc write error, client_id %d", client.get_client_id()); auto ec = syncAwait(client.connect("127.0.0.1", "8810")); diff --git a/src/coro_rpc/tests/test_variadic.cpp b/src/coro_rpc/tests/test_variadic.cpp index ed4a229ad..0dc423268 100644 --- a/src/coro_rpc/tests/test_variadic.cpp +++ b/src/coro_rpc/tests/test_variadic.cpp @@ -39,7 +39,7 @@ TEST_CASE("test varadic param") { } }); REQUIRE_MESSAGE(server->wait_for_start(3s), "server start timeout"); - coro_rpc_client client(g_client_id++); + coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); syncAwait(client.connect("localhost", std::to_string(server->port())));