Skip to content

Commit

Permalink
[coro_rpc] [coro_http] use global executor instead of inner io_context (
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle authored Jul 10, 2023
1 parent a50c4e1 commit fa92e01
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 110 deletions.
67 changes: 15 additions & 52 deletions include/ylt/coro_rpc/impl/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,32 +132,15 @@ class coro_rpc_client {
* Create client with io_context
* @param io_context asio io_context, async event handler
*/
coro_rpc_client(coro_io::ExecutorWrapper<> &executor, uint32_t client_id = 0)
coro_rpc_client(
coro_io::ExecutorWrapper<> &executor = *coro_io::get_global_executor(),
uint32_t client_id = 0)
: executor(executor.get_asio_executor()),
socket_(executor.get_asio_executor()) {
config_.client_id = client_id;
read_buf_.resize(default_read_buf_size_);
}

/*!
* Create client
*/
coro_rpc_client(uint32_t client_id = 0)
: inner_io_context_(std::make_unique<asio::io_context>()),
executor(inner_io_context_->get_executor()),
socket_(inner_io_context_->get_executor()) {
config_.client_id = client_id;
std::promise<void> promise;
thd_ = std::thread([this, &promise] {
work_ = std::make_unique<asio::io_context::work>(*inner_io_context_);
executor.schedule([&] {
promise.set_value();
});
inner_io_context_->run();
});
promise.get_future().wait();
}

[[nodiscard]] bool init_config(const config &conf) {
config_ = conf;
#ifdef YLT_ENABLE_SSL
Expand Down Expand Up @@ -194,7 +177,7 @@ class coro_rpc_client {
config_.timeout_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(timeout_duration);
reset();
return connect(true);
return connect(is_reconnect_t{true});
}

[[nodiscard]] async_simple::coro::Lazy<std::errc> reconnect(
Expand All @@ -207,7 +190,7 @@ class coro_rpc_client {
config_.timeout_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(timeout_duration);
reset();
return connect(true);
return connect(is_reconnect_t{true});
}
/*!
* Connect server
Expand All @@ -231,7 +214,7 @@ class coro_rpc_client {
return connect();
}
[[nodiscard]] async_simple::coro::Lazy<std::errc> connect(
std::string endpoint,
std::string_view endpoint,
std::chrono::steady_clock::duration timeout_duration =
std::chrono::seconds(5)) {
auto pos = endpoint.find(':');
Expand All @@ -254,10 +237,7 @@ class coro_rpc_client {
}
#endif

~coro_rpc_client() {
close();
stop_inner_io_context();
}
~coro_rpc_client() { close(); }

/*!
* Call RPC function with default timeout (5 second)
Expand Down Expand Up @@ -359,6 +339,12 @@ class coro_rpc_client {
friend class coro_io::client_pool;

private:
// the const char * will convert to bool instead of std::string_view
// use this struct to prevent it.
struct is_reconnect_t {
bool value = false;
};

void reset() {
close_socket();
socket_ = decltype(socket_)(executor.get_asio_executor());
Expand All @@ -368,14 +354,14 @@ class coro_rpc_client {

static bool is_ok(std::errc ec) noexcept { return ec == std::errc{}; }
[[nodiscard]] async_simple::coro::Lazy<std::errc> connect(
bool is_reconnect = false) {
is_reconnect_t is_reconnect = is_reconnect_t{false}) {
#ifdef YLT_ENABLE_SSL
if (!ssl_init_ret_) {
std::cout << "ssl_init_ret_: " << ssl_init_ret_ << std::endl;
co_return std::errc::not_connected;
}
#endif
if (!is_reconnect && has_closed_)
if (!is_reconnect.value && has_closed_)
AS_UNLIKELY {
ELOGV(ERROR,
"a closed client is not allowed connect again, please use "
Expand Down Expand Up @@ -756,26 +742,6 @@ class coro_rpc_client {
has_closed_ = true;
}

void stop_inner_io_context() {
if (thd_.joinable()) {
work_ = nullptr;
if (thd_.get_id() == std::this_thread::get_id()) {
// we are now running in inner_io_context_, so destruction it in
// another thread
std::thread thrd{[ioc = std::move(inner_io_context_),
thd = std::move(thd_)]() mutable {
thd.join();
}};
thrd.detach();
}
else {
thd_.join();
}
}

return;
}

#ifdef UNIT_TEST_INJECT
public:
std::errc sync_connect(const std::string &host, const std::string &port) {
Expand All @@ -790,9 +756,6 @@ class coro_rpc_client {
}
#endif
private:
std::unique_ptr<asio::io_context> inner_io_context_;
std::unique_ptr<asio::io_context::work> work_;
std::thread thd_;
coro_io::ExecutorWrapper<> executor;
asio::ip::tcp::socket socket_;
std::vector<std::byte> read_buf_;
Expand Down
39 changes: 4 additions & 35 deletions include/ylt/thirdparty/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <utility>
#include <ylt/coro_io/coro_file.hpp>
#include <ylt/coro_io/coro_io.hpp>
#include <ylt/coro_io/io_context_pool.hpp>

#include "http_parser.hpp"
#include "response_cv.hpp"
Expand Down Expand Up @@ -181,28 +182,14 @@ class coro_http_client {
std::string domain;
#endif
};
coro_http_client()
: io_ctx_(std::make_unique<asio::io_context>()),
socket_(std::make_shared<socket_t>(io_ctx_->get_executor())),
executor_wrapper_(io_ctx_->get_executor()),
timer_(&executor_wrapper_) {
std::promise<void> promise;
io_thd_ = std::thread([this, &promise] {
work_ = std::make_unique<asio::io_context::work>(*io_ctx_);
executor_wrapper_.schedule([&] {
promise.set_value();
});
io_ctx_->run();
});
promise.get_future().wait();
}

coro_http_client(asio::io_context::executor_type executor)
: socket_(std::make_shared<socket_t>(executor)),
executor_wrapper_(executor),
timer_(&executor_wrapper_) {}

coro_http_client(coro_io::ExecutorWrapper<> *executor)
coro_http_client(
coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor())
: coro_http_client(executor->get_asio_executor()) {}

bool init_config(const config &conf) {
Expand Down Expand Up @@ -237,22 +224,7 @@ class coro_http_client {
return true;
}

~coro_http_client() {
async_close();
if (io_thd_.joinable()) {
work_ = nullptr;
if (io_thd_.get_id() == std::this_thread::get_id()) {
std::thread thrd{[io_ctx = std::move(io_ctx_),
io_thd = std::move(io_thd_)]() mutable {
io_thd.join();
}};
thrd.detach();
}
else {
io_thd_.join();
}
}
}
~coro_http_client() { async_close(); }

void async_close() {
if (socket_->has_closed_)
Expand Down Expand Up @@ -1649,12 +1621,9 @@ class coro_http_client {
return has_http_scheme;
}

std::unique_ptr<asio::io_context> io_ctx_;

coro_io::ExecutorWrapper<> executor_wrapper_;
std::unique_ptr<asio::io_context::work> work_;
coro_io::period_timer timer_;
std::thread io_thd_;
std::shared_ptr<socket_t> socket_;
asio::streambuf read_buf_;
simple_buffer body_{};
Expand Down
3 changes: 1 addition & 2 deletions src/coro_http/examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@

if("${yaLanTingLibs_SOURCE_DIR}" STREQUAL "${CMAKE_SOURCE_DIR}")
# if is the subproject of yalantinglibs
# do nothing
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/output/examples/coro_http)
else()
# else find installed yalantinglibs
cmake_minimum_required(VERSION 3.15)
Expand Down
3 changes: 1 addition & 2 deletions src/coro_io/examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@

if("${yaLanTingLibs_SOURCE_DIR}" STREQUAL "${CMAKE_SOURCE_DIR}")
# if is the subproject of yalantinglibs
# do nothing
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/output/examples)
else()
# else find installed yalantinglibs
cmake_minimum_required(VERSION 3.15)
Expand Down
1 change: 1 addition & 0 deletions src/coro_rpc/examples/base_examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ add_executable(coro_rpc_example_channel channel.cpp)
add_executable(coro_rpc_example_client_pool client_pool.cpp)
add_executable(coro_rpc_example_client_pools client_pools.cpp)
add_executable(coro_rpc_example_client client.cpp)
add_executable(coro_rpc_example_concurrent_clients concurrent_clients.cpp)
add_executable(coro_rpc_example_server server.cpp rpc_service.cpp)

4 changes: 2 additions & 2 deletions src/coro_rpc/examples/base_examples/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ Lazy<void> call_echo(std::shared_ptr<coro_io::channel<coro_rpc_client>> channel,
[](coro_rpc_client &client, std::string_view hostname) -> Lazy<void> {
auto res = co_await client.call<echo>("Hello world!");
if (!res.has_value()) {
std::cout << "coro_rpc err: \n";
std::cout << "coro_rpc err: \n" << res.error().msg;
co_return;
}
if (res.value() != "Hello world!"sv) {
std::cout << "err echo resp: \n";
std::cout << "err echo resp: \n" << res.value();
co_return;
}
++qps;
Expand Down
4 changes: 2 additions & 2 deletions src/coro_rpc/examples/base_examples/client_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ Lazy<void> call_echo(coro_io::client_pool<coro_rpc_client> &client_pool,
[=](coro_rpc_client &client) -> Lazy<void> {
auto res = co_await client.call<echo>("Hello world!");
if (!res.has_value()) {
std::cout << "coro_rpc err: \n";
std::cout << "coro_rpc err: \n" << res.error().msg;
co_return;
}
if (res.value() != "Hello world!"sv) {
std::cout << "err echo resp: \n";
std::cout << "err echo resp: \n" << res.value();
co_return;
}
++qps;
Expand Down
4 changes: 2 additions & 2 deletions src/coro_rpc/examples/base_examples/client_pools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ Lazy<void> call_echo(coro_io::client_pools<coro_rpc_client> &client_pools,
[=](coro_rpc_client &client) -> Lazy<void> {
auto res = co_await client.call<echo>("Hello world!");
if (!res.has_value()) {
std::cout << "coro_rpc err: \n";
std::cout << "coro_rpc err: \n" << res.error().msg;
co_return;
}
if (res.value() != "Hello world!"sv) {
std::cout << "err echo resp: \n";
std::cout << "err echo resp: \n" << res.value();
co_return;
}
++qps;
Expand Down
92 changes: 92 additions & 0 deletions src/coro_rpc/examples/base_examples/concurrent_clients.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2023, Alibaba Group Holding Limited;
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <async_simple/coro/Lazy.h>
#include <async_simple/coro/Sleep.h>

#include <asio/io_context.hpp>
#include <asio/steady_timer.hpp>
#include <atomic>
#include <chrono>
#include <climits>
#include <cstdlib>
#include <system_error>
#include <thread>
#include <ylt/coro_io/client_pool.hpp>
#include <ylt/coro_io/coro_io.hpp>
#include <ylt/coro_rpc/coro_rpc_client.hpp>

#include "ylt/coro_rpc/impl/coro_rpc_client.hpp"
std::string echo(std::string_view sv);
using namespace coro_rpc;
using namespace async_simple::coro;
using namespace std::string_view_literals;
using namespace std::chrono_literals;
std::atomic<uint64_t> qps = 0;

std::atomic<uint64_t> working_echo = 0;
/*!
* \example helloworld/concurrency_clients.main.cpp
* \brief demo for run concurrency clients
*/

Lazy<void> call_echo(int cnt) {
++working_echo;
coro_rpc_client client;
std::errc ec = co_await client.connect("localhost:8801");
for (int i = 0; i < 3 && ec != std::errc{}; ++i) {
co_await coro_io::sleep_for(rand() % 10000 * 1ms);
ec = co_await client.reconnect("localhost:8801");
}
if (ec == std::errc{}) {
for (int i = 0; i < cnt; ++i) {
auto res = co_await client.call<echo>("Hello world!");
if (!res.has_value()) {
std::cout << "coro_rpc err: \n" << res.error().msg;
co_return;
}
if (res.value() != "Hello world!"sv) {
std::cout << "err echo resp: \n" << res.value();
co_return;
}
++qps;
}
}
else {
std::cout << "connect failed \n";
}
--working_echo;
}

Lazy<void> qps_watcher() {
using namespace std::chrono_literals;
while (working_echo > 0) {
co_await coro_io::sleep_for(1s);
uint64_t cnt = qps.exchange(0);
std::cout << "QPS:" << cnt << " working echo:" << working_echo << std::endl;
cnt = 0;
}
}

int main() {
auto thread_cnt = std::thread::hardware_concurrency();
for (int i = 0, lim = thread_cnt * 20; i < lim; ++i) {
call_echo(100000).start([](auto &&) {
});
}
syncAwait(qps_watcher());
std::cout << "Done!" << std::endl;
return 0;
}
Loading

0 comments on commit fa92e01

Please sign in to comment.