Skip to content

Commit

Permalink
add example
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle committed Mar 29, 2024
1 parent a2e2c17 commit 02fcffc
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 111 deletions.
8 changes: 4 additions & 4 deletions include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class callback_awaitor_base {
template <typename Op>
class callback_awaitor_impl {
public:
callback_awaitor_impl(Derived &awaitor, const Op &op) noexcept
callback_awaitor_impl(Derived &awaitor, Op &op) noexcept
: awaitor(awaitor), op(op) {}
constexpr bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle<> handle) noexcept {
Expand All @@ -73,7 +73,7 @@ class callback_awaitor_base {

private:
Derived &awaitor;
const Op &op;
Op &op;
};

public:
Expand Down Expand Up @@ -101,7 +101,7 @@ class callback_awaitor_base {
Derived *obj;
};
template <typename Op>
callback_awaitor_impl<Op> await_resume(const Op &op) noexcept {
callback_awaitor_impl<Op> await_resume(Op &&op) noexcept {
return callback_awaitor_impl<Op>{static_cast<Derived &>(*this), op};
}

Expand Down Expand Up @@ -316,7 +316,7 @@ inline async_simple::coro::Lazy<void> sleep_for(Duration d) {

template <typename R, typename Func, typename Executor>
struct post_helper {
void operator()(auto handler) const {
void operator()(auto handler) {
asio::dispatch(e, [this, handler]() {
try {
if constexpr (std::is_same_v<R, async_simple::Try<void>>) {
Expand Down
1 change: 1 addition & 0 deletions include/ylt/coro_rpc/impl/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,7 @@ class coro_rpc_client {
}
}
else {

ec = struct_pack::deserialize_to(err, buffer);
if SP_LIKELY (!ec) {
ELOGV(WARNING,"deserilaize rpc result failed");
Expand Down
6 changes: 5 additions & 1 deletion include/ylt/coro_rpc/impl/expected.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ using unexpected = tl::unexpected<T>;
using unexpect_t = tl::unexpect_t;
#endif

template <typename T, typename rpc_protocol>
namespace protocol {
struct coro_rpc_protocol;
}

template <typename T, typename rpc_protocol = coro_rpc::protocol::coro_rpc_protocol>
using rpc_result = expected<T, typename rpc_protocol::rpc_error>;

} // namespace coro_rpc
39 changes: 18 additions & 21 deletions src/coro_rpc/examples/base_examples/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,40 +32,37 @@ Lazy<void> show_rpc_call() {
[[maybe_unused]] auto ec = co_await client.connect("127.0.0.1", "8801");
assert(!ec);

auto ret = co_await client.call<hello_world>();
assert(ret.value() == "hello_world"s);
auto ret = co_await client.call<echo>("hello");
assert(ret.value() == "hello");

ret = co_await client.call<coroutine_echo>("42");
assert(ret.value()=="42");

ret = co_await client.call<async_echo_by_callback>("hi");
assert(ret.value()=="hi");

ret = co_await client.call<async_echo_by_coroutine>("hey");
assert(ret.value()=="hey");

client.set_req_attachment("This is attachment.");
auto ret_void = co_await client.call<echo_with_attachment>();
assert(client.get_resp_attachment() == "This is attachment.");

client.set_req_attachment("This is attachment2.");
ret_void = co_await client.call<echo_with_attachment2>();
assert(client.get_resp_attachment() == "This is attachment2.");

auto ret_int = co_await client.call<A_add_B>(12, 30);
assert(ret_int.value() == 42);

ret = co_await client.call<coro_echo>("coro_echo");
assert(ret.value() == "coro_echo"s);

ret = co_await client.call<hello_with_delay>("hello_with_delay"s);
assert(ret.value() == "hello_with_delay"s);

ret = co_await client.call<nested_echo>("nested_echo"s);
assert(ret.value() == "nested_echo"s);

ret = co_await client.call<&HelloService::hello>();
assert(ret.value() == "HelloService::hello"s);

ret = co_await client.call<&HelloService::hello_with_delay>(
"HelloService::hello_with_delay"s);
assert(ret.value() == "HelloService::hello_with_delay"s);
// TODO: fix return error
// ret_void = co_await client.call<return_error_by_context>();

// assert(ret.error().code.val() == 404);
// assert(ret.error().msg == "404 Not Found.");

ret = co_await client.call<return_error>();
// ret_void = co_await client.call<return_error_by_exception>();

assert(ret.error().code.val() == 404);
assert(ret.error().msg == "404 Not Found.");
// assert(ret.error().code.val() == 404);

ret = co_await client.call<rpc_with_state_by_tag>();
assert(ret.value() == "1");
Expand Down
117 changes: 52 additions & 65 deletions src/coro_rpc/examples/base_examples/rpc_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,96 +21,83 @@
#include <ylt/coro_rpc/coro_rpc_client.hpp>
#include <ylt/easylog.hpp>

#include "async_simple/coro/Lazy.h"
#include "async_simple/coro/Sleep.h"
#include "ylt/coro_io/client_pool.hpp"
#include "ylt/coro_io/coro_io.hpp"
#include "ylt/coro_rpc/impl/coro_rpc_client.hpp"
#include "ylt/coro_rpc/impl/errno.h"
#include "ylt/coro_rpc/impl/expected.hpp"
#include "ylt/coro_rpc/impl/protocol/coro_rpc_protocol.hpp"

using namespace coro_rpc;
using namespace async_simple::coro;
using namespace std::chrono_literals;

std::string hello_world() {
ELOGV(INFO, "call helloworld");
return "hello_world";
std::string_view echo(std::string_view data) {
ELOGV(INFO, "call echo");
return data;
}

bool return_bool_hello_world() { return true; }

int A_add_B(int a, int b) {
ELOGV(INFO, "call A+B");
return a + b;
}

void echo_with_attachment(coro_rpc::context<void> conn) {
ELOGV(INFO, "call echo_with_attachment");
std::string str = conn.get_context()->release_request_attachment();
conn.get_context()->set_response_attachment(std::move(str));
conn.response_msg();
Lazy<std::string_view> coroutine_echo(std::string_view data) {
ELOGV(INFO, "call coroutine_echo");
co_await coro_io::sleep_for(1s);
co_return data;
}

void echo_with_attachment2(coro_rpc::context<void> conn) {
ELOGV(INFO, "call echo_with_attachment2");
std::string_view str = conn.get_context()->get_request_attachment();
// The live time of attachment is same as coro_rpc::context
conn.get_context()->set_response_attachment([str, conn] {
return str;
});
conn.response_msg();
void async_echo_by_callback(coro_rpc::context<std::string_view/*rpc response data here*/> conn, std::string_view/*rpc request data here*/ data) {
ELOGV(INFO, "call async_echo_by_callback");
/* rpc function runs in global io thread pool */
coro_io::post([conn,data] () mutable {
/* send work to global non-io thread pool */
auto *ctx = conn.get_context();
conn.response_msg(data); /*response here*/
}).start([](auto&&){});
}

std::string echo(std::string_view sv) { return std::string{sv}; }

async_simple::coro::Lazy<std::string> coro_echo(std::string_view sv) {
ELOGV(INFO, "call coro_echo");
co_await coro_io::sleep_for(std::chrono::milliseconds(100));
ELOGV(INFO, "after sleep for a while");
co_return std::string{sv};
Lazy<std::string_view> async_echo_by_coroutine(std::string_view sv) {
ELOGV(INFO, "call async_echo_by_coroutine");
co_await coro_io::sleep_for(std::chrono::milliseconds(100)); //sleeping
co_return sv;
}

void hello_with_delay(context</*response type:*/ std::string> conn,
std::string hello) {
ELOGV(INFO, "call HelloServer hello_with_delay");
// create a new thread
std::thread([conn = std::move(conn), hello = std::move(hello)]() mutable {
// do some heavy work in this thread that won't block the io-thread,
std::cout << "running heavy work..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds{1});
// Remember response before connection destruction! Or the connect will
// be closed.
conn.response_msg(hello);
}).detach();
Lazy<void> echo_with_attachment() {
ELOGV(INFO, "call echo_with_attachment");
auto ctx=co_await coro_rpc::get_context();
ctx->set_response_attachment(ctx->get_request_attachment()); /*zero-copy by string_view*/
}

async_simple::coro::Lazy<std::string> nested_echo(std::string_view sv) {
Lazy<std::string_view> nested_echo(std::string_view sv) {
ELOGV(INFO, "start nested echo");
coro_rpc::coro_rpc_client client(co_await coro_io::get_current_executor());
[[maybe_unused]] auto ec = co_await client.connect("127.0.0.1", "8802");
assert(!ec);
/*get a client by global client pool*/
auto client=coro_io::g_clients_pool<coro_rpc::coro_rpc_client>().at("127.0.0.1:8802");
assert(client!=nullptr);
ELOGV(INFO, "connect another server");
auto ret = co_await client.call<echo>(sv);
assert(ret.value() == sv);
ELOGV(INFO, "get echo result from another server");
co_return std::string{sv};
auto ret = co_await client->send_request([sv](coro_rpc_client & client) {
return client.call<echo>(sv);
});
co_return ret.value().value();
}

std::string HelloService::hello() {
std::string_view HelloService::hello() {
ELOGV(INFO, "call HelloServer::hello");
return "HelloService::hello";
}

void HelloService::hello_with_delay(
coro_rpc::context</*response type:*/ std::string> conn, std::string hello) {
ELOGV(INFO, "call HelloServer::hello_with_delay");
std::thread([conn = std::move(conn), hello = std::move(hello)]() mutable {
conn.response_msg("HelloService::hello_with_delay");
}).detach();
return;
void return_error_by_context(coro_rpc::context<void> conn) {
conn.response_error(coro_rpc::err_code{404}, "404 Not Found.");
}

void return_error(coro_rpc::context<std::string> conn) {
conn.response_error(coro_rpc::err_code{404}, "404 Not Found.");
void return_error_by_exception() {
throw coro_rpc::err_code{404};
}
void rpc_with_state_by_tag(coro_rpc::context<std::string> conn) {
if (!conn.get_context()->tag().has_value()) {
conn.get_context()->tag() = uint64_t{0};

Lazy<std::string> rpc_with_state_by_tag() {
auto *ctx = co_await coro_rpc::get_context();
if (!ctx->tag().has_value()) {
ctx->tag() = std::uint64_t{0};
}
auto &cnter = std::any_cast<uint64_t &>(conn.get_context()->tag());
auto &cnter = std::any_cast<uint64_t &>(ctx->tag());
ELOGV(INFO, "call count: %d", ++cnter);
conn.response_msg(std::to_string(cnter));
co_return std::to_string(cnter);
}
24 changes: 10 additions & 14 deletions src/coro_rpc/examples/base_examples/rpc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,17 @@
#include <string_view>
#include <ylt/coro_rpc/coro_rpc_context.hpp>

std::string hello_world();
bool return_bool_hello_world();
int A_add_B(int a, int b);
void hello_with_delay(coro_rpc::context<std::string> conn, std::string hello);
std::string echo(std::string_view sv);
void echo_with_attachment(coro_rpc::context<void> conn);
void echo_with_attachment2(coro_rpc::context<void> conn);
void return_error(coro_rpc::context<std::string> conn);
void rpc_with_state_by_tag(coro_rpc::context<std::string> conn);
async_simple::coro::Lazy<std::string> coro_echo(std::string_view sv);
async_simple::coro::Lazy<std::string> nested_echo(std::string_view sv);
std::string_view echo(std::string_view data);
async_simple::coro::Lazy<std::string_view> coroutine_echo(std::string_view data);
void async_echo_by_callback(coro_rpc::context<std::string_view/*rpc response data here*/> conn, std::string_view/*rpc request data here*/ data);
async_simple::coro::Lazy<std::string_view> async_echo_by_coroutine(std::string_view sv);
async_simple::coro::Lazy<void> echo_with_attachment();
async_simple::coro::Lazy<std::string_view> nested_echo(std::string_view sv);
void return_error_by_context(coro_rpc::context<void> conn);
void return_error_by_exception();
class HelloService {
public:
std::string hello();
void hello_with_delay(coro_rpc::context<std::string> conn, std::string hello);
std::string_view hello();
};

async_simple::coro::Lazy<std::string> rpc_with_state_by_tag();
#endif // CORO_RPC_RPC_API_HPP
8 changes: 2 additions & 6 deletions src/coro_rpc/examples/base_examples/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,14 @@ int main() {

coro_rpc_server server2{/*thread=*/1, /*port=*/8802};

server.register_handler<return_bool_hello_world>();

// regist normal function for rpc
server.register_handler<hello_world, A_add_B, hello_with_delay, echo,
nested_echo, coro_echo, echo_with_attachment,
echo_with_attachment2, return_error,
rpc_with_state_by_tag>();
server.register_handler<echo,coroutine_echo,async_echo_by_callback,async_echo_by_coroutine,echo_with_attachment,nested_echo,return_error_by_context,return_error_by_exception,rpc_with_state_by_tag>();

// regist member function for rpc
HelloService hello_service;
server
.register_handler<&HelloService::hello, &HelloService::hello_with_delay>(
.register_handler<&HelloService::hello>(
&hello_service);

server2.register_handler<echo>();
Expand Down

0 comments on commit 02fcffc

Please sign in to comment.