Skip to content

Commit

Permalink
fix client
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle committed May 7, 2024
1 parent 2b7fc80 commit 7ed4da8
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 101 deletions.
19 changes: 15 additions & 4 deletions include/ylt/coro_rpc/impl/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class coro_rpc_client {
std::chrono::milliseconds{5000};
std::string host;
std::string port;
bool enable_tcp_no_delay_ = true;
#ifdef YLT_ENABLE_SSL
std::filesystem::path ssl_cert_path;
std::string ssl_domain;
Expand Down Expand Up @@ -332,7 +333,7 @@ class coro_rpc_client {
uint32_t get_client_id() const { return config_.client_id; }

void close() {
ELOGV(INFO, "client_id %d close", config_.client_id);
//ELOGV(INFO, "client_id %d close", config_.client_id);
close_socket(control_);
}

Expand Down Expand Up @@ -409,8 +410,9 @@ class coro_rpc_client {
ELOGV(WARN, "client_id %d connect timeout", config_.client_id);
co_return errc::timed_out;
}

control_->socket_.set_option(asio::ip::tcp::no_delay(true), ec);
if (config_.enable_tcp_no_delay_==true) {
control_->socket_.set_option(asio::ip::tcp::no_delay(true), ec);
}

#ifdef YLT_ENABLE_SSL
if (!config_.ssl_cert_path.empty()) {
Expand Down Expand Up @@ -614,11 +616,20 @@ class coro_rpc_client {
constexpr bool has_conn_v = requires { typename First::return_type; };
return util::get_args<has_conn_v, FuncArgs>();
}
template<typename T,typename U>
static decltype(auto) add_const(U&& u) {
if constexpr (std::is_const_v<std::remove_reference_t<U>>) {
return struct_pack::detail::declval<const T>();
}
else {
return struct_pack::detail::declval<T>();
}
}

template <typename... FuncArgs, typename Buffer, typename... Args>
void pack_to_impl(Buffer &buffer, std::size_t offset, Args &&...args) {
struct_pack::serialize_to_with_offset(
buffer, offset, std::forward<FuncArgs>(std::forward<Args>(args))...);
buffer, offset, std::forward<decltype(add_const<FuncArgs>(args))>((std::forward<Args>(args)))...);
}

template <typename Tuple, size_t... Is, typename Buffer, typename... Args>
Expand Down
146 changes: 91 additions & 55 deletions src/coro_rpc/examples/base_examples/client_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,37 @@
#include <atomic>
#include <chrono>
#include <climits>
#include <cstdint>
#include <cstdlib>
#include <locale>
#include <memory>
#include <mutex>
#include <optional>
#include <thread>
#include <ylt/coro_io/client_pool.hpp>
#include <ylt/coro_io/coro_io.hpp>
#include <ylt/coro_rpc/coro_rpc_client.hpp>

#include "async_simple/coro/Collect.h"
#include "async_simple/coro/Lazy.h"
#include "async_simple/coro/Mutex.h"
#include "async_simple/coro/SyncAwait.h"
#include "ylt/coro_io/io_context_pool.hpp"
#include "ylt/coro_rpc/impl/coro_rpc_client.hpp"
#include "ylt/coro_rpc/impl/errno.h"
#include "ylt/coro_rpc/impl/expected.hpp"
#include "ylt/easylog.hpp"
std::string echo(std::string_view sv);

constexpr auto thread_cnt = 96*2;
constexpr auto request_cnt= 200000;
using namespace coro_rpc;
using namespace async_simple::coro;
using namespace std::string_view_literals;

std::atomic<uint64_t> qps = 0;

auto finish_executor = coro_io::get_global_block_executor();
std::atomic<uint64_t> working_echo = 0;
std::atomic<uint64_t> busy_echo = 0;
std::atomic<bool> stop=false;
struct guard {
guard(std::atomic<uint64_t> &ref) : ref(ref) { ++ref; }
~guard() { --ref; }
Expand All @@ -50,57 +58,92 @@ struct guard {
* \brief demo for run concurrency clients
*/

Lazy<std::vector<std::chrono::microseconds>> call_echo(
coro_io::client_pool<coro_rpc_client> &client_pool, int cnt) {
std::vector<std::chrono::microseconds> result;
result.reserve(cnt);
std::vector<std::unique_ptr<coro_rpc_client>> clients;

std::atomic<uint64_t>& get_qps(int id) {
static std::atomic<uint64_t> ar[thread_cnt*8];
return ar[id*8];
}

int& get_cnt(int id) {
static int ar[thread_cnt*16];
return ar[id*16];
}
int& get_flag(int id) {
static int ar[thread_cnt*16];
return ar[id*16];
}
std::vector<std::chrono::microseconds>& get_result(int id) {
static std::vector<std::chrono::microseconds> ar[thread_cnt*3];
return ar[id*3];
}
int cnt_max=10;
Lazy<void> send(int id) {
auto &cli=*clients[id];
auto& qps=get_qps(id);
auto &cnt=get_cnt(id);
auto &result=get_result(id);
++working_echo;
for (int i = 0; i < cnt; ++i) {
for (;result.size()<request_cnt;) {
auto tp = std::chrono::steady_clock::now();
auto res = co_await client_pool.send_request(
[=](coro_rpc_client &client) -> Lazy<void> {
guard g{busy_echo};
if (client.has_closed()) {
co_return;
}
auto res = co_await client.call<echo>("Hello world!");
if (!res.has_value()) {
ELOG_ERROR << "coro_rpc err: \n" << res.error().msg;
client.close();
co_return;
}
if (res.value() != "Hello world!"sv) {
ELOG_ERROR << "err echo resp: \n" << res.value();
co_return;
}
++qps;
co_return;
});
if (!res) {
ELOG_ERROR << "client pool err: connect failed.\n";
auto res_ = co_await cli.send_request<echo>("Hello world!");
if (!res_.has_value()) {
ELOG_ERROR << "coro_rpc err: \n" << res_.error().msg;
continue;
}
else {
result.push_back(std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - tp));
res_.value().start([id,&qps,&cnt,&result,old_tp=tp](auto&&res) {
auto&res1=res.value();
if (!res1.has_value()) {
ELOG_ERROR << "coro_rpc err: \n" << res1.error().msg;
}
else {
++qps;
result.push_back(std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now()-old_tp));
auto tmp=cnt--;
if (tmp==cnt_max) {
get_flag(id)=true;
}
else if (tmp==cnt_max/2 && get_flag(id)) {
get_flag(id)=false;
send(id).start([](auto&& res){
});
}
}
});
auto cnt_tmp=++cnt;
if (cnt_tmp==cnt_max) {
break;
}
}
co_return std::move(result);
--working_echo;
co_return;
}

Lazy<void> qps_watcher(coro_io::client_pool<coro_rpc_client> &clients) {
Lazy<void> qps_watcher() {
using namespace std::chrono_literals;
while (working_echo > 0) {
do {
co_await coro_io::sleep_for(1s);
uint64_t cnt = qps.exchange(0);
uint64_t cnt = 0;
for (int i=0;i<thread_cnt;++i)
{
auto & qps=get_qps(i);
uint64_t tmp=qps.exchange(0);
cnt+=tmp;
}
std::cout << "QPS:" << cnt
<< " free connection: " << clients.free_client_count()
<< " working echo:" << working_echo << " busy echo:" << busy_echo
// << " free connection: " << clients.free_client_count()
<< " working echo:" << working_echo
<< std::endl;
cnt = 0;
}
} while (working_echo>0);
}
std::vector<std::chrono::microseconds> result;
void latency_watcher() {
result.reserve(request_cnt*thread_cnt);
for (int i=0;i<thread_cnt;++i) {
auto&res = get_result(i);
result.insert(result.end(), res.begin(), res.end());
}
std::sort(result.begin(), result.end());
auto arr = {0.1, 0.3, 0.5, 0.7, 0.9, 0.95, 0.99, 0.999, 0.9999, 0.99999, 1.0};
for (auto e : arr) {
Expand All @@ -112,23 +155,16 @@ void latency_watcher() {
}
}
int main() {
auto thread_cnt = std::thread::hardware_concurrency();
auto client_pool = coro_io::client_pool<coro_rpc_client>::create(
"localhost:8801",
coro_io::client_pool<coro_rpc_client>::pool_config{
.max_connection = thread_cnt * 20,
.client_config = {.timeout_duration = std::chrono::seconds{50}}});

auto finish_executor = coro_io::get_global_block_executor();
for (int i = 0, lim = thread_cnt * 20; i < lim; ++i) {
call_echo(*client_pool, 10000).start([finish_executor](auto &&res) {
finish_executor->schedule([res = std::move(res.value())] {
result.insert(result.end(), res.begin(), res.end());
--working_echo;
});
for (int i=0;i<thread_cnt;++i) {
clients.emplace_back(std::make_unique<coro_rpc_client>(coro_io::get_global_executor()->get_asio_executor()));
syncAwait(clients.back()->connect("localhost:8801"));
get_result(i).reserve(request_cnt);
}
for (int i = 0, lim = thread_cnt; i < lim; ++i) {
send(i).via(&clients[i]->get_executor()).start([](auto &&res) {
});
}
syncAwait(qps_watcher(*client_pool));
syncAwait(qps_watcher());
latency_watcher();
std::cout << "Done!" << std::endl;
return 0;
Expand Down
Loading

0 comments on commit 7ed4da8

Please sign in to comment.