Skip to content

Commit

Permalink
fix conversation
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle committed May 17, 2024
1 parent 8387a82 commit 0f7f35a
Showing 1 changed file with 17 additions and 17 deletions.
34 changes: 17 additions & 17 deletions include/ylt/coro_rpc/impl/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ struct rpc_return_type<void> {
using type = std::monostate;
};

struct rpc_resp_buffer {
struct resp_body {
std::string read_buf_;
std::string resp_attachment_buf_;
};
Expand All @@ -97,27 +97,27 @@ template <typename T>
struct async_rpc_result_value_t {
private:
T result_;
rpc_resp_buffer buffer_;
resp_body buffer_;

public:
async_rpc_result_value_t(T &&result, rpc_resp_buffer &&buffer)
async_rpc_result_value_t(T &&result, resp_body &&buffer)
: result_(std::move(result)), buffer_(std::move(buffer)) {}
async_rpc_result_value_t(T &&result) : result_(std::move(result)) {}
T &result() noexcept { return result_; }
const T &result() const noexcept { return result_; }
std::string_view attachment() const noexcept {
return buffer_.resp_attachment_buf_;
}
rpc_resp_buffer release_buffer() { return std::move(buffer_); }
resp_body release_buffer() { return std::move(buffer_); }
};

template <>
struct async_rpc_result_value_t<void> {
rpc_resp_buffer buffer_;
resp_body buffer_;
std::string_view attachment() const noexcept {
return buffer_.resp_attachment_buf_;
}
rpc_resp_buffer release_buffer() { return std::move(buffer_); }
resp_body release_buffer() { return std::move(buffer_); }
};

template <typename T>
Expand Down Expand Up @@ -586,7 +586,7 @@ class coro_rpc_client {
template <typename T>
static rpc_result<T> handle_response_buffer(std::string_view buffer,
uint8_t rpc_errc,
bool &should_close) {
bool &has_error) {
rpc_return_type_t<T> ret;
struct_pack::err_code ec;
rpc_error err;
Expand All @@ -607,7 +607,7 @@ class coro_rpc_client {
err.val() = rpc_errc;
ec = struct_pack::deserialize_to(err.msg, buffer);
if SP_LIKELY (!ec) {
should_close = true;
has_error = true;
return rpc_result<T>{unexpect_t{}, std::move(err)};
}
}
Expand All @@ -618,7 +618,7 @@ class coro_rpc_client {
}
}
}
should_close = true;
has_error = true;
// deserialize failed.
ELOGV(WARNING, "deserilaize rpc result failed");
err = {errc::invalid_rpc_result, "failed to deserialize rpc return value"};
Expand Down Expand Up @@ -665,7 +665,7 @@ class coro_rpc_client {
}

struct async_rpc_raw_result_value_type {
rpc_resp_buffer buffer_;
resp_body buffer_;
uint8_t errc_;
};

Expand All @@ -680,7 +680,7 @@ class coro_rpc_client {
handler_t(std::unique_ptr<coro_io::period_timer> &&timer,
async_simple::Promise<async_rpc_raw_result> &&promise)
: timer_(std::move(timer)), promise_(std::move(promise)) {}
void operator()(rpc_resp_buffer &&buffer, uint8_t rpc_errc) {
void operator()(resp_body &&buffer, uint8_t rpc_errc) {
timer_->cancel();
promise_.setValue(async_rpc_raw_result{
async_rpc_raw_result_value_type{std::move(buffer), rpc_errc}});
Expand All @@ -701,7 +701,7 @@ class coro_rpc_client {
std::atomic<bool> has_closed_ = false;
coro_io::ExecutorWrapper<> executor_;
std::unordered_map<uint32_t, handler_t> response_handler_table_;
rpc_resp_buffer resp_buffer_;
resp_body resp_buffer_;
asio::ip::tcp::socket socket_;
std::atomic<uint32_t> recving_cnt_ = 0;
control_t(asio::io_context::executor_type executor, bool is_timeout)
Expand Down Expand Up @@ -860,7 +860,7 @@ class coro_rpc_client {
}

template <typename T>
static async_simple::coro::Lazy<async_rpc_result<T>> get_deserializer(
static async_simple::coro::Lazy<async_rpc_result<T>> deserialize_rpc_result(
async_simple::Future<async_rpc_raw_result> future,
std::weak_ptr<control_t> watcher) {
auto ret_ = co_await std::move(future);
Expand All @@ -878,11 +878,11 @@ class coro_rpc_client {
}
}

bool should_close = false;
bool has_error = false;
auto &ret = std::get<0>(ret_);
auto result = handle_response_buffer<T>(ret.buffer_.read_buf_, ret.errc_,
should_close);
if (should_close) {
has_error);
if (has_error) {
if (auto w = watcher.lock(); w) {
close_socket(std::move(w));
}
Expand Down Expand Up @@ -992,7 +992,7 @@ class coro_rpc_client {
#endif
}
guard.release();
co_return get_deserializer<rpc_return_t>(
co_return deserialize_rpc_result<rpc_return_t>(
std::move(future), std::weak_ptr<control_t>{control_});
}
}
Expand Down

0 comments on commit 0f7f35a

Please sign in to comment.