Skip to content

Commit

Permalink
[coro_rpc] simply rpc logic (#653)
Browse files Browse the repository at this point in the history
* [coro_rpc] simply response mode

* use asio::dispatch instead of asio::post in executor

* coroutine rpc funtion start with delay

* [coro_rpc] disable context<void> as lazy rpc function parameter.

* add support for context_info

* support throw exception to return error. add test.

* fix bug

* add example

* add test for async

* [coro_rpc] refact error handle

* [coro_rpc] context support get_rpc_function_name. add log when rpc error happened.

* move rpc_error to better namespace. add support for rpc function throw rpc_error

* remove support for throw errc/err_code

* fix format

* fix format

* add example

* add support for normal function get context

* fix compile

* fix format
  • Loading branch information
poor-circle authored Apr 3, 2024
1 parent 09aefcb commit dfe3544
Show file tree
Hide file tree
Showing 28 changed files with 809 additions and 628 deletions.
8 changes: 4 additions & 4 deletions include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class callback_awaitor_base {
template <typename Op>
class callback_awaitor_impl {
public:
callback_awaitor_impl(Derived &awaitor, const Op &op) noexcept
callback_awaitor_impl(Derived &awaitor, Op &op) noexcept
: awaitor(awaitor), op(op) {}
constexpr bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle<> handle) noexcept {
Expand All @@ -73,7 +73,7 @@ class callback_awaitor_base {

private:
Derived &awaitor;
const Op &op;
Op &op;
};

public:
Expand Down Expand Up @@ -101,7 +101,7 @@ class callback_awaitor_base {
Derived *obj;
};
template <typename Op>
callback_awaitor_impl<Op> await_resume(const Op &op) noexcept {
callback_awaitor_impl<Op> await_resume(Op &&op) noexcept {
return callback_awaitor_impl<Op>{static_cast<Derived &>(*this), op};
}

Expand Down Expand Up @@ -316,7 +316,7 @@ inline async_simple::coro::Lazy<void> sleep_for(Duration d) {

template <typename R, typename Func, typename Executor>
struct post_helper {
void operator()(auto handler) const {
void operator()(auto handler) {
asio::dispatch(e, [this, handler]() {
try {
if constexpr (std::is_same_v<R, async_simple::Try<void>>) {
Expand Down
8 changes: 4 additions & 4 deletions include/ylt/coro_io/io_context_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#include <async_simple/Executor.h>
#include <async_simple/coro/Lazy.h>

#include <asio/dispatch.hpp>
#include <asio/io_context.hpp>
#include <asio/post.hpp>
#include <asio/steady_timer.hpp>
#include <atomic>
#include <future>
Expand Down Expand Up @@ -51,10 +51,10 @@ class ExecutorWrapper : public async_simple::Executor {

virtual bool schedule(Func func) override {
if constexpr (requires(ExecutorImpl e) { e.post(std::move(func)); }) {
executor_.post(std::move(func));
executor_.dispatch(std::move(func));
}
else {
asio::post(executor_, std::move(func));
asio::dispatch(executor_, std::move(func));
}

return true;
Expand All @@ -67,7 +67,7 @@ class ExecutorWrapper : public async_simple::Executor {
executor.post(std::move(func));
}
else {
asio::post(executor, std::move(func));
asio::dispatch(executor, std::move(func));
}
return true;
}
Expand Down
1 change: 0 additions & 1 deletion include/ylt/coro_rpc/coro_rpc_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,4 @@
* limitations under the License.
*/
#pragma once

#include "impl/protocol/coro_rpc_protocol.hpp"
96 changes: 18 additions & 78 deletions include/ylt/coro_rpc/impl/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "coro_connection.hpp"
#include "ylt/coro_rpc/impl/errno.h"
#include "ylt/util/type_traits.h"
#include "ylt/util/utils.hpp"

namespace coro_rpc {
/*!
Expand All @@ -43,14 +44,14 @@ class context_base {
typename rpc_protocol::req_header &get_req_head() { return self_->req_head_; }

bool check_status() {
auto old_flag = self_->has_response_.exchange(true);
if (old_flag != false)
auto old_flag = self_->status_.exchange(context_status::start_response);
if (old_flag != context_status::init)
AS_UNLIKELY {
ELOGV(ERROR, "response message more than one time");
return false;
}

if (has_closed())
if (self_->has_closed())
AS_UNLIKELY {
ELOGV(DEBUG, "response_msg failed: connection has been closed");
return false;
Expand All @@ -67,8 +68,7 @@ class context_base {
context_base(std::shared_ptr<context_info_t<rpc_protocol>> context_info)
: self_(std::move(context_info)) {
if (self_->conn_) {
self_->conn_->set_rpc_call_type(
coro_connection::rpc_call_type::callback_started);
self_->conn_->set_rpc_return_by_callback();
}
};
context_base() = default;
Expand All @@ -79,8 +79,10 @@ class context_base {
std::string_view error_msg) {
if (!check_status())
AS_UNLIKELY { return; };
self_->conn_->template response_error<rpc_protocol>(
error_code, error_msg, self_->req_head_, self_->is_delay_);
ELOGI << "rpc error in function:" << self_->get_rpc_function_name()
<< ". error code:" << error_code.ec << ". message : " << error_msg;
self_->conn_->template response_error<rpc_protocol>(error_code, error_msg,
self_->req_head_);
}
void response_error(coro_rpc::err_code error_code) {
response_error(error_code, error_code.message());
Expand All @@ -98,102 +100,40 @@ class context_base {
*/
template <typename... Args>
void response_msg(Args &&...args) {
if (!check_status())
AS_UNLIKELY { return; };
if constexpr (std::is_same_v<return_msg_type, void>) {
static_assert(sizeof...(args) == 0, "illegal args");
if (!check_status())
AS_UNLIKELY { return; };
std::visit(
[&]<typename serialize_proto>(const serialize_proto &) {
self_->conn_->template response_msg<rpc_protocol>(
serialize_proto::serialize(),
std::move(self_->resp_attachment_), self_->req_head_,
self_->is_delay_);
std::move(self_->resp_attachment_), self_->req_head_);
},
*rpc_protocol::get_serialize_protocol(self_->req_head_));
}
else {
static_assert(
requires { return_msg_type{std::forward<Args>(args)...}; },
"constructed return_msg_type failed by illegal args");

if (!check_status())
AS_UNLIKELY { return; };

return_msg_type ret{std::forward<Args>(args)...};
std::visit(
[&]<typename serialize_proto>(const serialize_proto &) {
self_->conn_->template response_msg<rpc_protocol>(
serialize_proto::serialize(ret),
std::move(self_->resp_attachment_), self_->req_head_,
self_->is_delay_);
std::move(self_->resp_attachment_), self_->req_head_);
},
*rpc_protocol::get_serialize_protocol(self_->req_head_));

// response_handler_(std::move(conn_), std::move(ret));
}
/*finish here*/
self_->status_ = context_status::finish_response;
}

/*!
* Check connection closed or not
*
* @return true if closed, otherwise false
*/
bool has_closed() const { return self_->conn_->has_closed(); }

/*!
* Close connection
*/
void close() { return self_->conn_->async_close(); }

/*!
* Get the unique connection ID
* @return connection id
*/
uint64_t get_connection_id() const noexcept {
return self_->conn_->get_connection_id();
}

/*!
* Set the response_attachment
* @return a ref of response_attachment
*/
void set_response_attachment(std::string attachment) {
set_response_attachment([attachment = std::move(attachment)] {
return std::string_view{attachment};
});
}

/*!
* Set the response_attachment
* @return a ref of response_attachment
*/
void set_response_attachment(std::function<std::string_view()> attachment) {
self_->resp_attachment_ = std::move(attachment);
}

/*!
* Get the request attachment
* @return connection id
*/
std::string_view get_request_attachment() const {
return self_->req_attachment_;
}

/*!
* Release the attachment
* @return connection id
*/
std::string release_request_attachment() {
return std::move(self_->req_attachment_);
}

void set_delay() {
self_->is_delay_ = true;
self_->conn_->set_rpc_call_type(
coro_connection::rpc_call_type::callback_with_delay);
const context_info_t<rpc_protocol> *get_context() const noexcept {
return self_.get();
}
std::any &tag() { return self_->conn_->tag(); }
const std::any &tag() const { return self_->conn_->tag(); }
context_info_t<rpc_protocol> *get_context() noexcept { return self_.get(); }
};

template <typename T>
Expand Down
Loading

0 comments on commit dfe3544

Please sign in to comment.