Skip to content

Commit

Permalink
[coro_io][improve]Improve post (#500)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Nov 21, 2023
1 parent 377fa9f commit a37a8d3
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 55 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ std::vector<char> buffer = struct_pack::serialize(person1);
auto person2 = deserialize<person>(buffer);
```
struct_pack is very faster.
struct_pack is very fast.
![](https://alibaba.github.io/yalantinglibs/assets/struct_pack_bench_serialize.4ffb0ce6.png)
Expand Down
47 changes: 35 additions & 12 deletions include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#endif

#include <asio/connect.hpp>
#include <asio/dispatch.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/read.hpp>
#include <asio/read_until.hpp>
Expand Down Expand Up @@ -282,20 +283,42 @@ inline async_simple::coro::Lazy<void> sleep_for(Duration d) {
}
}

template <typename Func>
inline async_simple::coro::Lazy<void> post(
Func func,
coro_io::ExecutorWrapper<> *e = coro_io::get_global_block_executor()) {
callback_awaitor<void> awaitor;

co_return co_await awaitor.await_resume(
[e, func = std::move(func)](auto handler) {
auto executor = e->get_asio_executor();
asio::post(executor, [=, func = std::move(func)]() {
template <typename R, typename Func>
struct post_helper {
void operator()(auto handler) const {
asio::dispatch(e->get_asio_executor(), [this, handler]() {
try {
if constexpr (std::is_same_v<R, async_simple::Try<void>>) {
func();
handler.resume();
});
});
}
else {
auto r = func();
handler.set_value_then_resume(std::move(r));
}
} catch (const std::exception &e) {
R er;
er.setException(std::current_exception());
handler.set_value_then_resume(std::move(er));
}
});
}
coro_io::ExecutorWrapper<> *e;
Func func;
};

template <typename Func>
inline async_simple::coro::Lazy<
async_simple::Try<typename util::function_traits<Func>::return_type>>
post(Func func,
coro_io::ExecutorWrapper<> *e = coro_io::get_global_block_executor()) {
using R =
async_simple::Try<typename util::function_traits<Func>::return_type>;

callback_awaitor<R> awaitor;

post_helper<R, Func> helper{e, std::move(func)};
co_return co_await awaitor.await_resume(helper);
}

template <typename Socket, typename AsioBuffer>
Expand Down
8 changes: 5 additions & 3 deletions include/ylt/coro_rpc/impl/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ struct get_type_t<async_simple::coro::Lazy<T>> {
template <auto func>
inline auto get_return_type() {
using T = decltype(func);
using param_type = function_parameters_t<T>;
using return_type = typename get_type_t<function_return_type_t<T>>::type;
using param_type = util::function_parameters_t<T>;
using return_type =
typename get_type_t<util::function_return_type_t<T>>::type;
if constexpr (std::is_void_v<param_type>) {
if constexpr (std::is_void_v<return_type>) {
return;
Expand All @@ -171,7 +172,8 @@ inline auto get_return_type() {
}
else {
using First = std::tuple_element_t<0, param_type>;
constexpr bool is_conn = is_specialization<First, context_base>::value;
constexpr bool is_conn =
util::is_specialization<First, context_base>::value;

if constexpr (is_conn) {
using U = typename First::return_type;
Expand Down
25 changes: 13 additions & 12 deletions include/ylt/coro_rpc/impl/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,41 +486,42 @@ class coro_rpc_client {
template <auto func, typename... Args>
void static_check() {
using Function = decltype(func);
using param_type = function_parameters_t<Function>;
using param_type = util::function_parameters_t<Function>;
if constexpr (!std::is_void_v<param_type>) {
using First = std::tuple_element_t<0, param_type>;
constexpr bool is_conn = requires { typename First::return_type; };

if constexpr (std::is_member_function_pointer_v<Function>) {
using Self = class_type_t<Function>;
using Self = util::class_type_t<Function>;
if constexpr (is_conn) {
static_assert(is_invocable<Function, Self, First, Args...>::value,
"called rpc function and arguments are not match");
static_assert(
util::is_invocable<Function, Self, First, Args...>::value,
"called rpc function and arguments are not match");
}
else {
static_assert(is_invocable<Function, Self, Args...>::value,
static_assert(util::is_invocable<Function, Self, Args...>::value,
"called rpc function and arguments are not match");
}
}
else {
if constexpr (is_conn) {
static_assert(is_invocable<Function, First, Args...>::value,
static_assert(util::is_invocable<Function, First, Args...>::value,
"called rpc function and arguments are not match");
}
else {
static_assert(is_invocable<Function, Args...>::value,
static_assert(util::is_invocable<Function, Args...>::value,
"called rpc function and arguments are not match");
}
}
}
else {
if constexpr (std::is_member_function_pointer_v<Function>) {
using Self = class_type_t<Function>;
static_assert(is_invocable<Function, Self, Args...>::value,
using Self = util::class_type_t<Function>;
static_assert(util::is_invocable<Function, Self, Args...>::value,
"called rpc function and arguments are not match");
}
else {
static_assert(is_invocable<Function, Args...>::value,
static_assert(util::is_invocable<Function, Args...>::value,
"called rpc function and arguments are not match");
}
}
Expand Down Expand Up @@ -661,7 +662,7 @@ class coro_rpc_client {
std::vector<std::byte> buffer;
std::size_t offset = coro_rpc_protocol::REQ_HEAD_LEN;
if constexpr (sizeof...(Args) > 0) {
using arg_types = function_parameters_t<decltype(func)>;
using arg_types = util::function_parameters_t<decltype(func)>;
pack_to<arg_types>(buffer, offset, std::forward<Args>(args)...);
}
else {
Expand Down Expand Up @@ -723,7 +724,7 @@ class coro_rpc_client {
auto get_func_args() {
using First = std::tuple_element_t<0, FuncArgs>;
constexpr bool has_conn_v = requires { typename First::return_type; };
return get_args<has_conn_v, FuncArgs>();
return util::get_args<has_conn_v, FuncArgs>();
}

template <typename... FuncArgs, typename Buffer, typename... Args>
Expand Down
5 changes: 3 additions & 2 deletions include/ylt/coro_rpc/impl/coro_rpc_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,13 @@ class coro_rpc_server_base {
*/

template <auto first, auto... functions>
void register_handler(class_type_t<decltype(first)> *self) {
void register_handler(util::class_type_t<decltype(first)> *self) {
router_.template register_handler<first, functions...>(self);
}

template <auto first>
void register_handler(class_type_t<decltype(first)> *self, const auto &key) {
void register_handler(util::class_type_t<decltype(first)> *self,
const auto &key) {
router_.template register_handler<first>(self, key);
}

Expand Down
14 changes: 8 additions & 6 deletions include/ylt/coro_rpc/impl/router.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,9 @@ class router {
AS_UNLIKELY { ELOGV(CRITICAL, "null connection!"); }

constexpr auto name = get_func_name<func>();
using return_type = function_return_type_t<decltype(func)>;
if constexpr (is_specialization_v<return_type, async_simple::coro::Lazy>) {
using return_type = util::function_return_type_t<decltype(func)>;
if constexpr (util::is_specialization_v<return_type,
async_simple::coro::Lazy>) {
auto it = coro_handlers_.emplace(
key,
[self](
Expand Down Expand Up @@ -179,10 +180,11 @@ class router {
void regist_one_handler_impl(const route_key &key) {
static_assert(!std::is_member_function_pointer_v<decltype(func)>,
"register member function but lack of the parent object");
using return_type = function_return_type_t<decltype(func)>;
using return_type = util::function_return_type_t<decltype(func)>;

constexpr auto name = get_func_name<func>();
if constexpr (is_specialization_v<return_type, async_simple::coro::Lazy>) {
if constexpr (util::is_specialization_v<return_type,
async_simple::coro::Lazy>) {
auto it = coro_handlers_.emplace(
key,
[](std::string_view data, rpc_context<rpc_protocol> &context_info,
Expand Down Expand Up @@ -350,13 +352,13 @@ class router {
*/

template <auto first, auto... func>
void register_handler(class_type_t<decltype(first)> *self) {
void register_handler(util::class_type_t<decltype(first)> *self) {
regist_one_handler<first>(self);
(regist_one_handler<func>(self), ...);
}

template <auto func>
void register_handler(class_type_t<decltype(func)> *self,
void register_handler(util::class_type_t<decltype(func)> *self,
const route_key &key) {
regist_one_handler_impl<func>(self, key);
}
Expand Down
14 changes: 7 additions & 7 deletions include/ylt/coro_rpc/impl/rpc_execute.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ inline std::optional<std::string> execute(
std::string_view data, rpc_context<rpc_protocol> &context_info,
Self *self = nullptr) {
using T = decltype(func);
using param_type = function_parameters_t<T>;
using return_type = function_return_type_t<T>;
using param_type = util::function_parameters_t<T>;
using return_type = util::function_return_type_t<T>;

if constexpr (!std::is_void_v<param_type>) {
using First = std::tuple_element_t<0, param_type>;
Expand All @@ -69,7 +69,7 @@ inline std::optional<std::string> execute(
constexpr bool has_coro_conn_v =
std::is_convertible_v<context_base<conn_return_type, rpc_protocol>,
First>;
auto args = get_args<has_coro_conn_v, param_type>();
auto args = util::get_args<has_coro_conn_v, param_type>();

bool is_ok = true;
constexpr size_t size = std::tuple_size_v<decltype(args)>;
Expand Down Expand Up @@ -155,9 +155,9 @@ inline async_simple::coro::Lazy<std::optional<std::string>> execute_coro(
std::string_view data, rpc_context<rpc_protocol> &context_info,
Self *self = nullptr) {
using T = decltype(func);
using param_type = function_parameters_t<T>;
using return_type =
typename get_type_t<typename function_return_type_t<T>::ValueType>::type;
using param_type = util::function_parameters_t<T>;
using return_type = typename get_type_t<
typename util::function_return_type_t<T>::ValueType>::type;

if constexpr (!std::is_void_v<param_type>) {
using First = std::tuple_element_t<0, param_type>;
Expand All @@ -170,7 +170,7 @@ inline async_simple::coro::Lazy<std::optional<std::string>> execute_coro(
using conn_return_type = decltype(get_return_type<is_conn, First>());
constexpr bool has_coro_conn_v =
std::is_same_v<context_base<conn_return_type, rpc_protocol>, First>;
auto args = get_args<has_coro_conn_v, param_type>();
auto args = util::get_args<has_coro_conn_v, param_type>();

bool is_ok = true;
constexpr size_t size = std::tuple_size_v<decltype(args)>;
Expand Down
4 changes: 2 additions & 2 deletions include/ylt/util/type_traits.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ using remove_cvref_t = typename remove_cvref<T>::type;
#endif
#endif

namespace coro_rpc {
namespace util {
template <typename Function>
struct function_traits;

Expand Down Expand Up @@ -213,4 +213,4 @@ struct is_specialization<Ref<Args...>, Ref> : std::true_type {};
template <typename Test, template <typename...> class Ref>
inline constexpr bool is_specialization_v = is_specialization<Test, Ref>::value;

} // namespace coro_rpc
} // namespace util
2 changes: 2 additions & 0 deletions src/coro_rpc/tests/test_coro_rpc_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ TEST_CASE("testing client call timeout") {
CHECK_MESSAGE(val.error().code == std::errc::timed_out, val.error().msg);
g_action = inject_action::nothing;
}
#ifdef __GNUC__
SUBCASE("read timeout") {
g_action = {};
coro_rpc_server server(2, 8801);
Expand All @@ -494,6 +495,7 @@ TEST_CASE("testing client call timeout") {
auto val = syncAwait(ret);
CHECK_MESSAGE(val.error().code == std::errc::timed_out, val.error().msg);
}
#endif
g_action = {};
}
std::errc init_acceptor(auto& acceptor_, auto port_) {
Expand Down
10 changes: 5 additions & 5 deletions src/coro_rpc/tests/test_router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ struct RPC_trait<void> {
};
using coro_rpc_protocol = coro_rpc::protocol::coro_rpc_protocol;
template <auto func>
rpc_result<function_return_type_t<decltype(func)>, coro_rpc_protocol>
rpc_result<util::function_return_type_t<decltype(func)>, coro_rpc_protocol>
get_result(const auto &pair) {
auto &&[rpc_errc, buffer] = pair;
using T = function_return_type_t<decltype(func)>;
using return_type =
rpc_result<function_return_type_t<decltype(func)>, coro_rpc_protocol>;
using T = util::function_return_type_t<decltype(func)>;
using return_type = rpc_result<util::function_return_type_t<decltype(func)>,
coro_rpc_protocol>;
rpc_return_type_t<T> ret;
struct_pack::errc ec;
coro_rpc_protocol::rpc_error err;
Expand Down Expand Up @@ -144,7 +144,7 @@ auto test_route(auto ctx, Args &&...args) {
template <auto func, typename... Args>
void test_route_and_check(auto conn, Args &&...args) {
auto pair = test_route<func>(conn, std::forward<Args>(args)...);
using R = function_return_type_t<decltype(func)>;
using R = util::function_return_type_t<decltype(func)>;
check_result<R>(pair, coro_rpc_protocol::RESP_HEAD_LEN);
}
} // namespace test_util
Expand Down
4 changes: 2 additions & 2 deletions src/util/tests/test_meta_string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ struct Plus {
};

constexpr void function_traits_test_return_type() {
using namespace coro_rpc;
using namespace util;
static_assert(std::is_same_v<function_return_type_t<int(int)>, int>);
auto f1 = []() {
};
Expand Down Expand Up @@ -208,7 +208,7 @@ constexpr void function_traits_test_return_type() {
}

constexpr void function_traits_test_last_parameters_type() {
using namespace coro_rpc;
using namespace util;
static_assert(std::is_same_v<last_parameters_type_t<int(int&&)>, int>);
auto f1 = [](int&) {
};
Expand Down
8 changes: 5 additions & 3 deletions website/docs/zh/coro_rpc/coro_rpc_doc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class coro_rpc_server {
* @param self 成员函数对应的对象指针
*/
template <auto first, auto... func>
void register_handler(class_type_t<decltype(first)> *self);
void register_handler(util::class_type_t<decltype(first)> *self);
};

/*!
Expand Down Expand Up @@ -229,7 +229,8 @@ class coro_rpc_client {
* @return rpc调用结果
*/
template <auto func, typename... Args>
async_simple::coro::Lazy<rpc_result<function_return_type_t<decltype(func)>>>
async_simple::coro::Lazy<
rpc_result<util::function_return_type_t<decltype(func)>>>
call(Args &&...args);

/*!
Expand All @@ -241,7 +242,8 @@ class coro_rpc_client {
* @return rpc调用结果
*/
template <auto func, typename... Args>
async_simple::coro::Lazy<rpc_result<function_return_type_t<decltype(func)>>>
async_simple::coro::Lazy<
rpc_result<util::function_return_type_t<decltype(func)>>>
call_for(const auto &duration, Args &&...args);

/*!
Expand Down

0 comments on commit a37a8d3

Please sign in to comment.