diff --git a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp index c42954d8a..b94eaeece 100644 --- a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp +++ b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp @@ -139,6 +139,7 @@ class coro_rpc_client { std::chrono::milliseconds{5000}; std::string host; std::string port; + bool enable_tcp_no_delay_ = true; #ifdef YLT_ENABLE_SSL std::filesystem::path ssl_cert_path; std::string ssl_domain; @@ -332,7 +333,7 @@ class coro_rpc_client { uint32_t get_client_id() const { return config_.client_id; } void close() { - ELOGV(INFO, "client_id %d close", config_.client_id); + //ELOGV(INFO, "client_id %d close", config_.client_id); close_socket(control_); } @@ -409,8 +410,9 @@ class coro_rpc_client { ELOGV(WARN, "client_id %d connect timeout", config_.client_id); co_return errc::timed_out; } - - control_->socket_.set_option(asio::ip::tcp::no_delay(true), ec); + if (config_.enable_tcp_no_delay_==true) { + control_->socket_.set_option(asio::ip::tcp::no_delay(true), ec); + } #ifdef YLT_ENABLE_SSL if (!config_.ssl_cert_path.empty()) { @@ -614,11 +616,20 @@ class coro_rpc_client { constexpr bool has_conn_v = requires { typename First::return_type; }; return util::get_args(); } + template + static decltype(auto) add_const(U&& u) { + if constexpr (std::is_const_v>) { + return struct_pack::detail::declval(); + } + else { + return struct_pack::detail::declval(); + } + } template void pack_to_impl(Buffer &buffer, std::size_t offset, Args &&...args) { struct_pack::serialize_to_with_offset( - buffer, offset, std::forward(std::forward(args))...); + buffer, offset, std::forward(args))>((std::forward(args)))...); } template diff --git a/src/coro_rpc/examples/base_examples/client_pool.cpp b/src/coro_rpc/examples/base_examples/client_pool.cpp index 23ba2b628..437144227 100644 --- a/src/coro_rpc/examples/base_examples/client_pool.cpp +++ b/src/coro_rpc/examples/base_examples/client_pool.cpp @@ -17,9 +17,12 @@ #include #include #include +#include #include #include #include +#include +#include #include #include #include @@ -27,19 +30,24 @@ #include "async_simple/coro/Collect.h" #include "async_simple/coro/Lazy.h" +#include "async_simple/coro/Mutex.h" #include "async_simple/coro/SyncAwait.h" #include "ylt/coro_io/io_context_pool.hpp" +#include "ylt/coro_rpc/impl/coro_rpc_client.hpp" +#include "ylt/coro_rpc/impl/errno.h" +#include "ylt/coro_rpc/impl/expected.hpp" #include "ylt/easylog.hpp" std::string echo(std::string_view sv); +constexpr auto thread_cnt = 96*2; +constexpr auto request_cnt= 200000; using namespace coro_rpc; using namespace async_simple::coro; using namespace std::string_view_literals; -std::atomic qps = 0; - +auto finish_executor = coro_io::get_global_block_executor(); std::atomic working_echo = 0; -std::atomic busy_echo = 0; +std::atomic stop=false; struct guard { guard(std::atomic &ref) : ref(ref) { ++ref; } ~guard() { --ref; } @@ -50,57 +58,92 @@ struct guard { * \brief demo for run concurrency clients */ -Lazy> call_echo( - coro_io::client_pool &client_pool, int cnt) { - std::vector result; - result.reserve(cnt); +std::vector> clients; + +std::atomic& get_qps(int id) { + static std::atomic ar[thread_cnt*8]; + return ar[id*8]; +} + +int& get_cnt(int id) { + static int ar[thread_cnt*16]; + return ar[id*16]; +} +int& get_flag(int id) { + static int ar[thread_cnt*16]; + return ar[id*16]; +} +std::vector& get_result(int id) { + static std::vector ar[thread_cnt*3]; + return ar[id*3]; +} +int cnt_max=10; +Lazy send(int id) { + auto &cli=*clients[id]; + auto& qps=get_qps(id); + auto &cnt=get_cnt(id); + auto &result=get_result(id); ++working_echo; - for (int i = 0; i < cnt; ++i) { + for (;result.size() Lazy { - guard g{busy_echo}; - if (client.has_closed()) { - co_return; - } - auto res = co_await client.call("Hello world!"); - if (!res.has_value()) { - ELOG_ERROR << "coro_rpc err: \n" << res.error().msg; - client.close(); - co_return; - } - if (res.value() != "Hello world!"sv) { - ELOG_ERROR << "err echo resp: \n" << res.value(); - co_return; - } - ++qps; - co_return; - }); - if (!res) { - ELOG_ERROR << "client pool err: connect failed.\n"; + auto res_ = co_await cli.send_request("Hello world!"); + if (!res_.has_value()) { + ELOG_ERROR << "coro_rpc err: \n" << res_.error().msg; + continue; } - else { - result.push_back(std::chrono::duration_cast( - std::chrono::steady_clock::now() - tp)); + res_.value().start([id,&qps,&cnt,&result,old_tp=tp](auto&&res) { + auto&res1=res.value(); + if (!res1.has_value()) { + ELOG_ERROR << "coro_rpc err: \n" << res1.error().msg; + } + else { + ++qps; + result.push_back(std::chrono::duration_cast( + std::chrono::steady_clock::now()-old_tp)); + auto tmp=cnt--; + if (tmp==cnt_max) { + get_flag(id)=true; + } + else if (tmp==cnt_max/2 && get_flag(id)) { + get_flag(id)=false; + send(id).start([](auto&& res){ + }); + } + } + }); + auto cnt_tmp=++cnt; + if (cnt_tmp==cnt_max) { + break; } } - co_return std::move(result); + --working_echo; + co_return; } -Lazy qps_watcher(coro_io::client_pool &clients) { +Lazy qps_watcher() { using namespace std::chrono_literals; - while (working_echo > 0) { + do { co_await coro_io::sleep_for(1s); - uint64_t cnt = qps.exchange(0); + uint64_t cnt = 0; + for (int i=0;i0); } std::vector result; void latency_watcher() { + result.reserve(request_cnt*thread_cnt); + for (int i=0;i::create( - "localhost:8801", - coro_io::client_pool::pool_config{ - .max_connection = thread_cnt * 20, - .client_config = {.timeout_duration = std::chrono::seconds{50}}}); - - auto finish_executor = coro_io::get_global_block_executor(); - for (int i = 0, lim = thread_cnt * 20; i < lim; ++i) { - call_echo(*client_pool, 10000).start([finish_executor](auto &&res) { - finish_executor->schedule([res = std::move(res.value())] { - result.insert(result.end(), res.begin(), res.end()); - --working_echo; - }); + for (int i=0;i(coro_io::get_global_executor()->get_asio_executor())); + syncAwait(clients.back()->connect("localhost:8801")); + get_result(i).reserve(request_cnt); + } + for (int i = 0, lim = thread_cnt; i < lim; ++i) { + send(i).via(&clients[i]->get_executor()).start([](auto &&res) { }); } - syncAwait(qps_watcher(*client_pool)); + syncAwait(qps_watcher()); latency_watcher(); std::cout << "Done!" << std::endl; return 0; diff --git a/src/coro_rpc/examples/base_examples/concurrent_clients.cpp b/src/coro_rpc/examples/base_examples/concurrent_clients.cpp index c8e8e7a20..65d7fa4ab 100644 --- a/src/coro_rpc/examples/base_examples/concurrent_clients.cpp +++ b/src/coro_rpc/examples/base_examples/concurrent_clients.cpp @@ -13,80 +13,114 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include -#include -#include -#include #include #include #include +#include #include -#include +#include +#include +#include +#include #include #include #include #include +#include "async_simple/coro/Collect.h" +#include "async_simple/coro/Lazy.h" +#include "async_simple/coro/Mutex.h" +#include "async_simple/coro/SyncAwait.h" +#include "ylt/coro_io/io_context_pool.hpp" #include "ylt/coro_rpc/impl/coro_rpc_client.hpp" +#include "ylt/coro_rpc/impl/errno.h" +#include "ylt/coro_rpc/impl/expected.hpp" +#include "ylt/easylog.hpp" std::string echo(std::string_view sv); + +constexpr auto thread_cnt = 96*20; using namespace coro_rpc; using namespace async_simple::coro; using namespace std::string_view_literals; -using namespace std::chrono_literals; -std::atomic qps = 0; +auto finish_executor = coro_io::get_global_block_executor(); std::atomic working_echo = 0; -/*! - * \example helloworld/concurrency_clients.main.cpp - * \brief demo for run concurrency clients - */ -Lazy call_echo(int cnt) { +std::vector> clients; + +std::atomic& get_qps(int id) { + static std::atomic ar[thread_cnt*8]; + return ar[id*8]; +} + +Lazy> send(int id,int cnt) { + std::vector result; + auto &cli=*clients[id]; + auto& qps=get_qps(id); + result.reserve(cnt); ++working_echo; - coro_rpc_client client; - auto ec = co_await client.connect("localhost:8801"); - for (int i = 0; i < 3 && !ec; ++i) { - co_await coro_io::sleep_for(rand() % 10000 * 1ms); - ec = co_await client.reconnect("localhost:8801"); - } - if (!ec) { - for (int i = 0; i < cnt; ++i) { - auto res = co_await client.call("Hello world!"); - if (!res.has_value()) { - ELOG_ERROR << "coro_rpc err: \n" << res.error().msg; - co_return; - } - if (res.value() != "Hello world!"sv) { - ELOG_ERROR << "err echo resp: \n" << res.value(); - co_return; - } - ++qps; + auto tp = std::chrono::steady_clock::now(); + for (int i=0;i("Hello world!"); + if (!res_.has_value()) { + ELOG_ERROR << "coro_rpc err: \n" << res_.error().msg; + continue; } - } - else { - std::cout << "connect failed \n"; + ++qps; + auto old_tp=tp; + tp = std::chrono::steady_clock::now(); + result.push_back(std::chrono::duration_cast(tp-old_tp)); } --working_echo; + co_return std::move(result); } Lazy qps_watcher() { using namespace std::chrono_literals; - while (working_echo > 0) { + do { co_await coro_io::sleep_for(1s); - uint64_t cnt = qps.exchange(0); - std::cout << "QPS:" << cnt << " working echo:" << working_echo << std::endl; + uint64_t cnt = 0; + for (int i=0;i0); +} +std::vector result; +void latency_watcher() { + std::sort(result.begin(), result.end()); + auto arr = {0.1, 0.3, 0.5, 0.7, 0.9, 0.95, 0.99, 0.999, 0.9999, 0.99999, 1.0}; + for (auto e : arr) { + std::cout + << (e * 100) << "% request finished in:" + << result[std::max(0, result.size() * e - 1)].count() / + 1000.0 + << "ms" << std::endl; } } - int main() { - auto thread_cnt = std::thread::hardware_concurrency(); - for (int i = 0, lim = thread_cnt * 20; i < lim; ++i) { - call_echo(100000).start([](auto &&) { + for (int i=0;i(coro_io::get_global_executor()->get_asio_executor())); + syncAwait(clients.back()->connect("localhost:8801")); + } + for (int i = 0, lim = thread_cnt; i < lim; ++i) { + send(i,20000).via(&clients[i]->get_executor()).start([](auto &&res) { + finish_executor->schedule([res = std::move(res.value())] { + result.insert(result.end(), res.begin(), res.end()); + }); }); } syncAwait(qps_watcher()); + latency_watcher(); std::cout << "Done!" << std::endl; return 0; } \ No newline at end of file diff --git a/src/coro_rpc/examples/base_examples/rpc_service.cpp b/src/coro_rpc/examples/base_examples/rpc_service.cpp index 27f04db59..e661e6728 100644 --- a/src/coro_rpc/examples/base_examples/rpc_service.cpp +++ b/src/coro_rpc/examples/base_examples/rpc_service.cpp @@ -102,8 +102,8 @@ Lazy nested_echo(std::string_view sv) { coro_io::g_clients_pool().at("127.0.0.1:8802"); assert(client != nullptr); ELOGV(INFO, "connect another server"); - auto ret = co_await client->send_request([sv](coro_rpc_client &client) { - return client.call(sv); + auto ret = co_await client->send_request([sv](coro_rpc_client &client) ->Lazy> { + co_return co_await client.call(sv); }); co_return ret.value().value(); }