Skip to content

Commit

Permalink
[coro_rpc] remove rpc_head's type check (qicosmos#191)
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle authored Feb 8, 2023
1 parent 31a9510 commit d974e36
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 63 deletions.
29 changes: 12 additions & 17 deletions include/coro_rpc/coro_rpc/coro_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {
template <typename server_config, typename Socket>
async_simple::coro::Lazy<void> start_impl(
internal::router<server_config> &router, Socket &socket) noexcept {
char head_[REQ_HEAD_LEN];
req_header req_head;
while (true) {
reset_timer();
auto ret = co_await asio_util::async_read(
socket, asio::buffer(head_, REQ_HEAD_LEN));
socket, asio::buffer((char *)&req_head, REQ_HEAD_LEN));
cancel_timer();
// `co_await async_read` uses asio::async_read underlying.
// If eof occurred, the bytes_transferred of `co_await async_read` must
Expand All @@ -131,15 +131,6 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {
co_return;
}
assert(ret.second == REQ_HEAD_LEN);
req_header req_head{};
auto errc = struct_pack::deserialize_to(req_head, head_, REQ_HEAD_LEN);
if (errc != struct_pack::errc::ok) [[unlikely]] {
ELOGV(ERROR, "%s, %s",
std::make_error_code(std::errc::protocol_error).message().data(),
"deserialize error");
close();
co_return;
}

#ifdef UNIT_TEST_INJECT
client_id_ = req_head.seq_num;
Expand Down Expand Up @@ -209,7 +200,9 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {
resp_head.err_code = static_cast<uint8_t>(err);
resp_head.length = body_buf.size();

auto header_buf = struct_pack::serialize<std::string>(resp_head);
std::string header_buf;
header_buf.resize(RESP_HEAD_LEN);
std::memcpy(header_buf.data(), &resp_head, RESP_HEAD_LEN);

#ifdef UNIT_TEST_INJECT
if (g_action == inject_action::close_socket_after_send_length) {
Expand Down Expand Up @@ -258,13 +251,15 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {
}

auto body_buf = struct_pack::serialize<std::string>(ret);
resp_header resp_header{};
resp_header resp_head{};

resp_header.magic = magic_number;
resp_header.err_code = 0;
resp_header.length = body_buf.size();
resp_head.magic = magic_number;
resp_head.err_code = 0;
resp_head.length = body_buf.size();

auto header_buf = struct_pack::serialize<std::string>(resp_header);
std::string header_buf;
header_buf.resize(RESP_HEAD_LEN);
std::memcpy(header_buf.data(), &resp_head, RESP_HEAD_LEN);

response(std::move(header_buf), std::move(body_buf), shared_from_this())
.via(&executor_)
Expand Down
22 changes: 6 additions & 16 deletions include/coro_rpc/coro_rpc/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,20 +461,10 @@ class coro_rpc_client {
co_return r;
}
#endif
char head[RESP_HEAD_LEN];
ret = co_await asio_util::async_read(socket,
asio::buffer(head, RESP_HEAD_LEN));
resp_header header;
ret = co_await asio_util::async_read(
socket, asio::buffer((char *)&header, RESP_HEAD_LEN));
if (!ret.first) {
resp_header header{};
auto errc = struct_pack::deserialize_to(header, head, RESP_HEAD_LEN);
if (errc != struct_pack::errc::ok) [[unlikely]] {
ELOGV(ERROR, "deserialize rpc header failed");
close();
r = rpc_result<R>{
unexpect_t{},
rpc_error{std::errc::io_error, struct_pack::error_message(errc)}};
co_return r;
}
uint32_t body_len = header.length;
if (body_len > read_buf_.size()) {
read_buf_.resize(body_len);
Expand All @@ -486,7 +476,7 @@ class coro_rpc_client {
std::ofstream file(
benchmark_file_path + std::string{get_func_name<func>()} + ".out",
std::ofstream::binary | std::ofstream::out);
file << std::string_view{std::begin(head), RESP_HEAD_LEN};
file << std::string_view{(char *)&header, RESP_HEAD_LEN};
file << std::string_view{(char *)read_buf_.data(), body_len};
file.close();
#endif
Expand Down Expand Up @@ -537,7 +527,8 @@ class coro_rpc_client {
}
std::memcpy(buffer.data() + REQ_HEAD_LEN, &id, FUNCTION_ID_LEN);

req_header header{magic_number};
auto &header = *(req_header *)buffer.data();
header.magic = magic_number;
#ifdef UNIT_TEST_INJECT
header.seq_num = client_id_;
if (g_action == inject_action::client_send_bad_magic_num) {
Expand All @@ -552,7 +543,6 @@ class coro_rpc_client {
#ifdef UNIT_TEST_INJECT
}
#endif
struct_pack::serialize_to((char *)buffer.data(), REQ_HEAD_LEN, header);
return buffer;
}

Expand Down
38 changes: 16 additions & 22 deletions include/coro_rpc/coro_rpc/rpc_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ namespace coro_rpc {
*
* memory layout
* ```
* ┌────────┬───────────┬────────────────┬──────────┬─────────┬─────────┐
* │ magic │ version │ serialize_type │ msg_type │ seq_num │ length │
* ├────────┼───────────┼────────────────┼──────────┼─────────┼─────────┤
* │ 1 │ 1 │ 1 │ 1 │ 4 │ 4 │
* └────────┴───────────┴────────────────┴──────────┴─────────┴─────────┘
* ┌────────┬───────────┬────────────────┬──────────┬─────────┬─────────┐─────────┐
* │ magic │ version │ serialize_type │ msg_type │ seq_num │ length │reserved
* │
* ├────────┼───────────┼────────────────┼──────────┼─────────┼─────────┤─────────┤
* │ 1 │ 1 │ 1 │ 1 │ 4 │ 4 │ 4 │
* └────────┴───────────┴────────────────┴──────────┴─────────┴─────────┘─────────┘
* ```
*/
struct req_header {
Expand All @@ -49,27 +50,20 @@ struct req_header {
uint8_t msg_type; //!< message type
uint32_t seq_num; //!< sequence number
uint32_t length; //!< length of RPC body
uint32_t reserved; //!< reserved field
};

struct resp_header {
uint8_t magic; //!< magic number
uint8_t version; //!< rpc protocol version
uint8_t err_code; //!< message type
uint8_t msg_type; //!< message type
uint32_t seq_num; //!< sequence number
uint32_t length; //!< length of RPC body
uint8_t magic; //!< magic number
uint8_t version; //!< rpc protocol version
uint8_t err_code; //!< message type
uint8_t msg_type; //!< message type
uint32_t seq_num; //!< sequence number
uint32_t length; //!< length of RPC body
uint32_t reserved; //!< reserved field
};
} // namespace coro_rpc
namespace struct_pack {
template <>
constexpr inline auto enable_type_info<coro_rpc::req_header> =
type_info_config::disable;

template <>
constexpr inline auto enable_type_info<coro_rpc::resp_header> =
type_info_config::disable;

}; // namespace struct_pack
namespace coro_rpc {
#if __cpp_lib_expected >= 202202L && __cplusplus > 202002L
template <class T, class E>
Expand All @@ -90,10 +84,10 @@ using unexpected = tl::unexpected<T>;
using unexpect_t = tl::unexpect_t;
#endif

constexpr auto REQ_HEAD_LEN = struct_pack::get_needed_size(req_header{});
constexpr auto REQ_HEAD_LEN = sizeof(resp_header{});
static_assert(REQ_HEAD_LEN == 16);

constexpr auto RESP_HEAD_LEN = struct_pack::get_needed_size(resp_header{});
constexpr auto RESP_HEAD_LEN = sizeof(resp_header{});
static_assert(RESP_HEAD_LEN == 16);

constexpr int FUNCTION_ID_LEN = 4;
Expand Down
13 changes: 5 additions & 8 deletions src/coro_rpc/tests/test_coro_rpc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "ServerTester.hpp"
#include "async_simple/coro/Lazy.h"
#include "coro_rpc/coro_rpc/rpc_protocol.h"
#include "doctest.h"
#include "rpc_api.hpp"
#include "struct_pack/struct_pack.hpp"
Expand Down Expand Up @@ -308,12 +309,11 @@ TEST_CASE("test server write queue") {
std::vector<std::byte> buffer;
buffer.resize(offset);
std::memcpy(buffer.data() + REQ_HEAD_LEN, &id, FUNCTION_ID_LEN);
req_header header{magic_number};
auto &header = *(req_header *)buffer.data();
header.magic = magic_number;
header.seq_num = g_client_id++;
ELOGV(INFO, "client_id %d begin to connect %d", header.seq_num, 8820);
header.length = buffer.size() - REQ_HEAD_LEN;
constexpr auto size = struct_pack::get_needed_size(header);
struct_pack::serialize_to((char *)buffer.data(), size, header);
asio::io_context io_context;
std::thread thd([&io_context]() {
asio::io_context::work work(io_context);
Expand All @@ -330,16 +330,13 @@ TEST_CASE("test server write queue") {
CHECK(err.second == buffer.size());
}
for (int i = 0; i < 10; ++i) {
std::byte resp_len_buf[RESP_HEAD_LEN];
req_header header;
std::monostate r;
auto buf = struct_pack::serialize<std::string>(r);
std::string buffer_read;
buffer_read.resize(buf.size());
read(socket, asio::buffer(resp_len_buf, RESP_HEAD_LEN));
read(socket, asio::buffer((char *)&header, RESP_HEAD_LEN));

req_header header{};
[[maybe_unused]] auto errc = struct_pack::deserialize_to(
header, (const char *)resp_len_buf, RESP_HEAD_LEN);
uint32_t body_len = header.length;
CHECK(body_len == buf.size());
read(socket, asio::buffer(buffer_read, body_len));
Expand Down

0 comments on commit d974e36

Please sign in to comment.