Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coro_rpc] simply rpc logic #653

Merged
merged 20 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class callback_awaitor_base {
template <typename Op>
class callback_awaitor_impl {
public:
callback_awaitor_impl(Derived &awaitor, const Op &op) noexcept
callback_awaitor_impl(Derived &awaitor, Op &op) noexcept
: awaitor(awaitor), op(op) {}
constexpr bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle<> handle) noexcept {
Expand All @@ -73,7 +73,7 @@ class callback_awaitor_base {

private:
Derived &awaitor;
const Op &op;
Op &op;
};

public:
Expand Down Expand Up @@ -101,7 +101,7 @@ class callback_awaitor_base {
Derived *obj;
};
template <typename Op>
callback_awaitor_impl<Op> await_resume(const Op &op) noexcept {
callback_awaitor_impl<Op> await_resume(Op &&op) noexcept {
return callback_awaitor_impl<Op>{static_cast<Derived &>(*this), op};
}

Expand Down Expand Up @@ -316,7 +316,7 @@ inline async_simple::coro::Lazy<void> sleep_for(Duration d) {

template <typename R, typename Func, typename Executor>
struct post_helper {
void operator()(auto handler) const {
void operator()(auto handler) {
asio::dispatch(e, [this, handler]() {
try {
if constexpr (std::is_same_v<R, async_simple::Try<void>>) {
Expand Down
8 changes: 4 additions & 4 deletions include/ylt/coro_io/io_context_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#include <async_simple/Executor.h>
#include <async_simple/coro/Lazy.h>

#include <asio/dispatch.hpp>
#include <asio/io_context.hpp>
#include <asio/post.hpp>
#include <asio/steady_timer.hpp>
#include <atomic>
#include <future>
Expand Down Expand Up @@ -51,10 +51,10 @@ class ExecutorWrapper : public async_simple::Executor {

virtual bool schedule(Func func) override {
if constexpr (requires(ExecutorImpl e) { e.post(std::move(func)); }) {
executor_.post(std::move(func));
executor_.dispatch(std::move(func));
}
else {
asio::post(executor_, std::move(func));
asio::dispatch(executor_, std::move(func));
}

return true;
Expand All @@ -67,7 +67,7 @@ class ExecutorWrapper : public async_simple::Executor {
executor.post(std::move(func));
}
else {
asio::post(executor, std::move(func));
asio::dispatch(executor, std::move(func));
}
return true;
}
Expand Down
1 change: 0 additions & 1 deletion include/ylt/coro_rpc/coro_rpc_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,4 @@
* limitations under the License.
*/
#pragma once

#include "impl/protocol/coro_rpc_protocol.hpp"
96 changes: 18 additions & 78 deletions include/ylt/coro_rpc/impl/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "coro_connection.hpp"
#include "ylt/coro_rpc/impl/errno.h"
#include "ylt/util/type_traits.h"
#include "ylt/util/utils.hpp"

namespace coro_rpc {
/*!
Expand All @@ -43,14 +44,14 @@ class context_base {
typename rpc_protocol::req_header &get_req_head() { return self_->req_head_; }

bool check_status() {
auto old_flag = self_->has_response_.exchange(true);
if (old_flag != false)
auto old_flag = self_->status_.exchange(context_status::start_response);
if (old_flag != context_status::init)
AS_UNLIKELY {
ELOGV(ERROR, "response message more than one time");
return false;
}

if (has_closed())
if (self_->has_closed())
AS_UNLIKELY {
ELOGV(DEBUG, "response_msg failed: connection has been closed");
return false;
Expand All @@ -67,8 +68,7 @@ class context_base {
context_base(std::shared_ptr<context_info_t<rpc_protocol>> context_info)
: self_(std::move(context_info)) {
if (self_->conn_) {
self_->conn_->set_rpc_call_type(
coro_connection::rpc_call_type::callback_started);
self_->conn_->set_rpc_return_by_callback();
}
};
context_base() = default;
Expand All @@ -79,8 +79,10 @@ class context_base {
std::string_view error_msg) {
if (!check_status())
AS_UNLIKELY { return; };
self_->conn_->template response_error<rpc_protocol>(
error_code, error_msg, self_->req_head_, self_->is_delay_);
ELOGI << "rpc error in function:" << self_->get_rpc_function_name()
<< ". error code:" << error_code.ec << ". message : " << error_msg;
self_->conn_->template response_error<rpc_protocol>(error_code, error_msg,
self_->req_head_);
}
void response_error(coro_rpc::err_code error_code) {
response_error(error_code, error_code.message());
Expand All @@ -98,102 +100,40 @@ class context_base {
*/
template <typename... Args>
void response_msg(Args &&...args) {
if (!check_status())
AS_UNLIKELY { return; };
if constexpr (std::is_same_v<return_msg_type, void>) {
static_assert(sizeof...(args) == 0, "illegal args");
if (!check_status())
AS_UNLIKELY { return; };
std::visit(
[&]<typename serialize_proto>(const serialize_proto &) {
self_->conn_->template response_msg<rpc_protocol>(
serialize_proto::serialize(),
std::move(self_->resp_attachment_), self_->req_head_,
self_->is_delay_);
std::move(self_->resp_attachment_), self_->req_head_);
},
*rpc_protocol::get_serialize_protocol(self_->req_head_));
}
else {
static_assert(
requires { return_msg_type{std::forward<Args>(args)...}; },
"constructed return_msg_type failed by illegal args");

if (!check_status())
AS_UNLIKELY { return; };

return_msg_type ret{std::forward<Args>(args)...};
std::visit(
[&]<typename serialize_proto>(const serialize_proto &) {
self_->conn_->template response_msg<rpc_protocol>(
serialize_proto::serialize(ret),
std::move(self_->resp_attachment_), self_->req_head_,
self_->is_delay_);
std::move(self_->resp_attachment_), self_->req_head_);
},
*rpc_protocol::get_serialize_protocol(self_->req_head_));

// response_handler_(std::move(conn_), std::move(ret));
}
/*finish here*/
self_->status_ = context_status::finish_response;
}

/*!
* Check connection closed or not
*
* @return true if closed, otherwise false
*/
bool has_closed() const { return self_->conn_->has_closed(); }

/*!
* Close connection
*/
void close() { return self_->conn_->async_close(); }

/*!
* Get the unique connection ID
* @return connection id
*/
uint64_t get_connection_id() const noexcept {
return self_->conn_->get_connection_id();
}

/*!
* Set the response_attachment
* @return a ref of response_attachment
*/
void set_response_attachment(std::string attachment) {
set_response_attachment([attachment = std::move(attachment)] {
return std::string_view{attachment};
});
}

/*!
* Set the response_attachment
* @return a ref of response_attachment
*/
void set_response_attachment(std::function<std::string_view()> attachment) {
self_->resp_attachment_ = std::move(attachment);
}

/*!
* Get the request attachment
* @return connection id
*/
std::string_view get_request_attachment() const {
return self_->req_attachment_;
}

/*!
* Release the attachment
* @return connection id
*/
std::string release_request_attachment() {
return std::move(self_->req_attachment_);
}

void set_delay() {
self_->is_delay_ = true;
self_->conn_->set_rpc_call_type(
coro_connection::rpc_call_type::callback_with_delay);
const context_info_t<rpc_protocol> *get_context() const noexcept {
return self_.get();
}
std::any &tag() { return self_->conn_->tag(); }
const std::any &tag() const { return self_->conn_->tag(); }
context_info_t<rpc_protocol> *get_context() noexcept { return self_.get(); }
};

template <typename T>
Expand Down
Loading
Loading