Skip to content

Commit

Permalink
[doc] add coro_rpc doc (#681)
Browse files Browse the repository at this point in the history
* add coro_rpc doc

* fix doc

* update en doc

* fix code
  • Loading branch information
poor-circle authored May 24, 2024
1 parent f786c5a commit 5f3bd02
Show file tree
Hide file tree
Showing 22 changed files with 1,525 additions and 309 deletions.
6 changes: 4 additions & 2 deletions include/ylt/coro_rpc/impl/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,12 @@ class context_base {
/*finish here*/
self_->status_ = context_status::finish_response;
}
const context_info_t<rpc_protocol> *get_context() const noexcept {
const context_info_t<rpc_protocol> *get_context_info() const noexcept {
return self_.get();
}
context_info_t<rpc_protocol> *get_context_info() noexcept {
return self_.get();
}
context_info_t<rpc_protocol> *get_context() noexcept { return self_.get(); }
};

template <typename T>
Expand Down
19 changes: 12 additions & 7 deletions include/ylt/coro_rpc/impl/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ class client_pool;

namespace coro_rpc {

inline uint64_t get_global_client_id() {
static std::atomic<uint64_t> cid = 0;
return cid.fetch_add(1, std::memory_order::relaxed);
}

#ifdef GENERATE_BENCHMARK_DATA
std::string benchmark_file_path = "./";
#endif
Expand Down Expand Up @@ -105,7 +110,7 @@ struct async_rpc_result_value_t {
async_rpc_result_value_t(T &&result) : result_(std::move(result)) {}
T &result() noexcept { return result_; }
const T &result() const noexcept { return result_; }
std::string_view attachment() const noexcept {
std::string_view get_attachment() const noexcept {
return buffer_.resp_attachment_buf_;
}
resp_body release_buffer() { return std::move(buffer_); }
Expand Down Expand Up @@ -155,12 +160,12 @@ class coro_rpc_client {
const inline static rpc_error connect_error = {errc::io_error,
"client has been closed"};
struct config {
uint32_t client_id = 0;
uint64_t client_id = get_global_client_id();
std::chrono::milliseconds timeout_duration =
std::chrono::milliseconds{5000};
std::string host;
std::string port;
bool enable_tcp_no_delay_ = true;
bool enable_tcp_no_delay = true;
#ifdef YLT_ENABLE_SSL
std::filesystem::path ssl_cert_path;
std::string ssl_domain;
Expand All @@ -172,7 +177,7 @@ class coro_rpc_client {
* @param io_context asio io_context, async event handler
*/
coro_rpc_client(asio::io_context::executor_type executor,
uint32_t client_id = 0)
uint64_t client_id = get_global_client_id())
: control_(std::make_shared<control_t>(executor, false)),
timer_(std::make_unique<coro_io::period_timer>(executor)) {
config_.client_id = client_id;
Expand All @@ -184,7 +189,7 @@ class coro_rpc_client {
*/
coro_rpc_client(
coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor(),
uint32_t client_id = 0)
uint64_t client_id = get_global_client_id())
: control_(
std::make_shared<control_t>(executor->get_asio_executor(), false)),
timer_(std::make_unique<coro_io::period_timer>(
Expand Down Expand Up @@ -424,7 +429,7 @@ class coro_rpc_client {
ELOGV(WARN, "client_id %d connect timeout", config_.client_id);
co_return errc::timed_out;
}
if (config_.enable_tcp_no_delay_ == true) {
if (config_.enable_tcp_no_delay == true) {
control_->socket_.set_option(asio::ip::tcp::no_delay(true), ec);
}

Expand Down Expand Up @@ -738,7 +743,7 @@ class coro_rpc_client {
call<func>(std::forward<Args>(args)...));
}
#endif

private:
template <auto func, typename... Args>
async_simple::coro::Lazy<rpc_error> send_request_for_impl(
auto duration, uint32_t &id, coro_io::period_timer &timer,
Expand Down
65 changes: 41 additions & 24 deletions include/ylt/coro_rpc/impl/coro_rpc_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <memory>
#include <mutex>
#include <system_error>
#include <thread>
#include <unordered_map>
#include <vector>
#include <ylt/easylog.hpp>
Expand Down Expand Up @@ -68,29 +69,36 @@ class coro_rpc_server_base {
* TODO: add doc
* @param thread_num the number of io_context.
* @param port the server port to listen.
* @param listen address of server
* @param conn_timeout_duration client connection timeout. 0 for no timeout.
* default no timeout.
* @param is_enable_tcp_no_delay is tcp socket allow
*/
coro_rpc_server_base(size_t thread_num, unsigned short port,
coro_rpc_server_base(size_t thread_num = std::thread::hardware_concurrency(),
unsigned short port = 9001,
std::string address = "0.0.0.0",
std::chrono::steady_clock::duration
conn_timeout_duration = std::chrono::seconds(0))
conn_timeout_duration = std::chrono::seconds(0),
bool is_enable_tcp_no_delay = true)
: pool_(thread_num),
acceptor_(pool_.get_executor()->get_asio_executor()),
port_(port),
conn_timeout_duration_(conn_timeout_duration),
flag_{stat::init} {
flag_{stat::init},
is_enable_tcp_no_delay_(is_enable_tcp_no_delay) {
init_address(std::move(address));
}

coro_rpc_server_base(size_t thread_num,
std::string address /* = "0.0.0.0:9001" */,
coro_rpc_server_base(size_t thread_num = std::thread::hardware_concurrency(),
std::string address = "0.0.0.0:9001",
std::chrono::steady_clock::duration
conn_timeout_duration = std::chrono::seconds(0))
conn_timeout_duration = std::chrono::seconds(0),
bool is_enable_tcp_no_delay = true)
: pool_(thread_num),
acceptor_(pool_.get_executor()->get_asio_executor()),
conn_timeout_duration_(conn_timeout_duration),
flag_{stat::init} {
flag_{stat::init},
is_enable_tcp_no_delay_(is_enable_tcp_no_delay) {
init_address(std::move(address));
}

Expand All @@ -99,7 +107,13 @@ class coro_rpc_server_base {
acceptor_(pool_.get_executor()->get_asio_executor()),
port_(config.port),
conn_timeout_duration_(config.conn_timeout_duration),
flag_{stat::init} {
flag_{stat::init},
is_enable_tcp_no_delay_(config.is_enable_tcp_no_delay) {
#ifdef YLT_ENABLE_SSL
if (config.ssl_config) {
init_ssl_context_helper(context_, config.ssl_config.value());
}
#endif
init_address(config.address);
}

Expand All @@ -109,7 +123,7 @@ class coro_rpc_server_base {
}

#ifdef YLT_ENABLE_SSL
void init_ssl_context(const ssl_configure &conf) {
void init_ssl(const ssl_configure &conf) {
use_ssl_ = init_ssl_context_helper(context_, conf);
}
#endif
Expand All @@ -122,19 +136,19 @@ class coro_rpc_server_base {
* @return error code if start failed, otherwise block until server stop.
*/
[[nodiscard]] coro_rpc::err_code start() noexcept {
auto ret = async_start();
if (ret) {
ret.value().wait();
return ret.value().value();
}
else {
return ret.error();
}
return async_start().get();
}

[[nodiscard]] coro_rpc::expected<async_simple::Future<coro_rpc::err_code>,
coro_rpc::err_code>
async_start() noexcept {
private:
async_simple::Future<coro_rpc::err_code> make_error_future(
coro_rpc::err_code &&err) {
async_simple::Promise<coro_rpc::err_code> p;
p.setValue(std::move(err));
return p.getFuture();
}

public:
async_simple::Future<coro_rpc::err_code> async_start() noexcept {
{
std::unique_lock lock(start_mtx_);
if (flag_ != stat::init) {
Expand All @@ -144,8 +158,8 @@ class coro_rpc_server_base {
else if (flag_ == stat::stop) {
ELOGV(INFO, "has stoped");
}
return coro_rpc::unexpected<coro_rpc::err_code>{
coro_rpc::errc::server_has_ran};
return make_error_future(
coro_rpc::err_code{coro_rpc::errc::server_has_ran});
}
errc_ = listen();
if (!errc_) {
Expand Down Expand Up @@ -177,7 +191,7 @@ class coro_rpc_server_base {
return std::move(future);
}
else {
return coro_rpc::unexpected<coro_rpc::err_code>{errc_};
return make_error_future(coro_rpc::err_code{errc_});
}
}

Expand Down Expand Up @@ -387,7 +401,9 @@ class coro_rpc_server_base {

int64_t conn_id = ++conn_id_;
ELOGV(INFO, "new client conn_id %d coming", conn_id);
socket.set_option(asio::ip::tcp::no_delay(true), error);
if (is_enable_tcp_no_delay_) {
socket.set_option(asio::ip::tcp::no_delay(true), error);
}
auto conn = std::make_shared<coro_connection>(executor, std::move(socket),
conn_timeout_duration_);
conn->set_quit_callback(
Expand Down Expand Up @@ -459,6 +475,7 @@ class coro_rpc_server_base {

std::atomic<uint16_t> port_;
std::string address_;
bool is_enable_tcp_no_delay_;
coro_rpc::err_code errc_ = {};
std::chrono::steady_clock::duration conn_timeout_duration_;

Expand Down
15 changes: 9 additions & 6 deletions include/ylt/coro_rpc/impl/default_config/coro_rpc_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,22 @@

namespace coro_rpc {

namespace config {
struct coro_rpc_config_base {
uint16_t port = 8801;
struct config_base {
bool is_enable_tcp_no_delay = true;
uint16_t port = 9001;
unsigned thread_num = std::thread::hardware_concurrency();
std::chrono::steady_clock::duration conn_timeout_duration =
std::chrono::seconds{0};
std::string address = "0.0.0.0";
#ifdef YLT_ENABLE_SSL
std::optional<ssl_configure> ssl_config = std::nullopt;
#endif
};

struct coro_rpc_default_config : public coro_rpc_config_base {
struct config_t : public config_base {
using rpc_protocol = coro_rpc::protocol::coro_rpc_protocol;
using executor_pool_t = coro_io::io_context_pool;
};
} // namespace config

using coro_rpc_server = coro_rpc_server_base<config::coro_rpc_default_config>;
using coro_rpc_server = coro_rpc_server_base<config_t>;
} // namespace coro_rpc
12 changes: 6 additions & 6 deletions src/coro_io/tests/test_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ TEST_CASE("test RR") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
auto hosts =
std::vector<std::string_view>{"127.0.0.1:8801", "localhost:8801"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts);
Expand Down Expand Up @@ -62,10 +62,10 @@ TEST_CASE("test WRR") {

coro_rpc::coro_rpc_server server1(1, 8801);
auto res = server1.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
coro_rpc::coro_rpc_server server2(1, 8802);
auto res2 = server2.async_start();
REQUIRE_MESSAGE(res2, "server start failed");
REQUIRE_MESSAGE(!res2.hasResult(), "server start failed");

async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
auto hosts =
Expand Down Expand Up @@ -119,7 +119,7 @@ TEST_CASE("test Random") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
auto hosts =
std::vector<std::string_view>{"127.0.0.1:8801", "localhost:8801"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(
Expand Down Expand Up @@ -148,7 +148,7 @@ TEST_CASE("test single host") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
auto hosts = std::vector<std::string_view>{"127.0.0.1:8801"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts);
for (int i = 0; i < 100; ++i) {
Expand All @@ -168,7 +168,7 @@ TEST_CASE("test send_request config") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 9813);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
auto hosts = std::vector<std::string_view>{"127.0.0.1:9813"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts);
for (int i = 0; i < 100; ++i) {
Expand Down
10 changes: 5 additions & 5 deletions src/coro_io/tests/test_client_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ TEST_CASE("test client pool") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
auto is_started = server.async_start();
REQUIRE(is_started);
REQUIRE(is_started.hasResult() == false);
auto pool = coro_io::client_pool<coro_rpc::coro_rpc_client>::create(
"127.0.0.1:8801", {.max_connection = 100,
.idle_timeout = 300ms,
Expand All @@ -114,7 +114,7 @@ TEST_CASE("test idle timeout yield") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
auto is_started = server.async_start();
REQUIRE(is_started);
REQUIRE(!is_started.hasResult());
auto pool = coro_io::client_pool<coro_rpc::coro_rpc_client>::create(
"127.0.0.1:8801", {.max_connection = 100,
.idle_queue_per_max_clear_count = 1,
Expand Down Expand Up @@ -142,7 +142,7 @@ TEST_CASE("test reconnect") {
async_simple::Promise<async_simple::Unit> p;
coro_io::sleep_for(700ms).start([&server, &p](auto &&) {
auto server_is_started = server.async_start();
REQUIRE(server_is_started);
REQUIRE(!server_is_started.hasResult());
});

auto res = co_await event(100, *pool, cv, lock);
Expand Down Expand Up @@ -177,7 +177,7 @@ TEST_CASE("test reconnect retry wait time exclude reconnect cost time") {
async_simple::Promise<async_simple::Unit> p;
coro_io::sleep_for(350ms).start([&server, &p](auto &&) {
auto server_is_started = server.async_start();
REQUIRE(server_is_started);
REQUIRE(!server_is_started.hasResult());
});
auto res = co_await event<mock_client>(100, *pool, cv, lock);
CHECK(res);
Expand All @@ -196,7 +196,7 @@ TEST_CASE("test collect_free_client") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
auto is_started = server.async_start();
REQUIRE(is_started);
REQUIRE(!is_started.hasResult());
auto pool = coro_io::client_pool<coro_rpc::coro_rpc_client>::create(
"127.0.0.1:8801", {.max_connection = 100, .idle_timeout = 300ms});

Expand Down
4 changes: 2 additions & 2 deletions src/coro_rpc/benchmark/data_gen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ int main() {
coro_rpc::coro_rpc_server server(std::thread::hardware_concurrency(), 0);
register_handlers(server);
auto started = server.async_start();
if (!started) {
if (started.hasResult()) {
ELOGV(ERROR, "server started failed");
return -1;
}
Expand Down Expand Up @@ -118,7 +118,7 @@ int main() {

server.stop();

started->wait();
started.wait();

pool.stop();
thd.join();
Expand Down
2 changes: 1 addition & 1 deletion src/coro_rpc/examples/base_examples/rpc_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void async_echo_by_callback(
/* rpc function runs in global io thread pool */
coro_io::post([conn, data]() mutable {
/* send work to global non-io thread pool */
auto *ctx = conn.get_context();
auto *ctx = conn.get_context_info();
conn.response_msg(data); /*response here*/
}).start([](auto &&) {
});
Expand Down
2 changes: 1 addition & 1 deletion src/coro_rpc/examples/base_examples/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ int main() {
server2.register_handler<echo>();
// async start server
auto res = server2.async_start();
assert(res.has_value());
assert(!res.hasResult());

// sync start server & sync await server stop
return !server.start();
Expand Down
Loading

0 comments on commit 5f3bd02

Please sign in to comment.