Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge io_uring branch to presice_signal #44

Open
wants to merge 18 commits into
base: precise_signal
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ option(BUILD_UNIT_TESTS "Whether to build unit tests" OFF)
option(BUILD_BRPC_TOOLS "Whether to build brpc tools" ON)
option(DOWNLOAD_GTEST "Download and build a fresh copy of googletest. Requires Internet access." ON)

option(IO_URING_ENABLED "Enable IO uring based network" OFF)
if (IO_URING_ENABLED)
message("IO_URING_ENABLED: " ${IO_URING_ENABLED})
add_compile_definitions(IO_URING_ENABLED)
endif()

# Enable MACOSX_RPATH. Run "cmake --help-policy CMP0042" for policy details.
if(POLICY CMP0042)
cmake_policy(SET CMP0042 NEW)
Expand Down Expand Up @@ -203,6 +209,15 @@ if(NOT PROTOC_LIB)
message(FATAL_ERROR "Fail to find protoc lib")
endif()

if(IO_URING_ENABLED)
find_path(URING_INCLUDE_PATH NAMES liburing.h)
find_library(URING_LIB NAMES uring)
if ((NOT URING_INCLUDE_PATH) OR (NOT URING_LIB))
message(FATAL_ERROR "Fail to find liburing")
endif()
include_directories(${URING_INCLUDE_PATH})
endif()

if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
set(OPENSSL_ROOT_DIR
"/usr/local/opt/openssl" # Homebrew installed OpenSSL
Expand Down Expand Up @@ -251,6 +266,10 @@ if(WITH_SNAPPY)
set(BRPC_PRIVATE_LIBS "${BRPC_PRIVATE_LIBS} -lsnappy")
endif()

if(IO_URING_ENABLED)
set(DYNAMIC_LIB ${DYNAMIC_LIB} ${URING_LIB})
endif()

if(CMAKE_SYSTEM_NAME STREQUAL "Linux")
set(DYNAMIC_LIB ${DYNAMIC_LIB} rt)
set(BRPC_PRIVATE_LIBS "${BRPC_PRIVATE_LIBS} -lrt")
Expand Down
8 changes: 8 additions & 0 deletions src/brpc/acceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,17 @@ void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) {
#else
{
#endif
#ifndef IO_URING_ENABLED
options.on_edge_triggered_events = InputMessenger::OnNewMessages;
#else
options.on_edge_triggered_events = InputMessenger::OnNewMessagesFromRing;
#endif
}
options.use_rdma = am->_use_rdma;
#ifdef IO_URING_ENABLED
options.bound_gid_ = acception->recv_num_;
acception->recv_num_++;
#endif
if (Socket::Create(options, &socket_id) != 0) {
LOG(ERROR) << "Fail to create Socket";
continue;
Expand Down
3 changes: 3 additions & 0 deletions src/brpc/event_dispatcher_epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
#endif

#include "bthread/task_control.h"
#include "bthread/task_group.h"
#include <unordered_map>

extern "C" {
extern void bthread_flush();
extern bthread::TaskControl* bthread_get_task_control();
};

namespace bthread {
Expand Down
54 changes: 54 additions & 0 deletions src/brpc/input_messenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "brpc/protocol.h" // ListProtocols
#include "brpc/rdma/rdma_endpoint.h"
#include "brpc/input_messenger.h"
#include "brpc/socket.h"


namespace brpc {
Expand Down Expand Up @@ -393,6 +394,59 @@ void InputMessenger::OnNewMessages(Socket* m) {
m->SetEOF();
}
}
#ifdef IO_URING_ENABLED
void InputMessenger::OnNewMessagesFromRing(Socket *m) {
// Notes:
// - If the socket has only one message, the message will be parsed and
// processed in this bthread. nova-pbrpc and http works in this way.
// - If the socket has several messages, all messages will be parsed (
// meaning cutting from butil::IOBuf. serializing from protobuf is part of
// "process") in this bthread. All messages except the last one will be
// processed in separate bthreads. To minimize the overhead, scheduling
// is batched(notice the BTHREAD_NOSIGNAL and bthread_flush).
// - Verify will always be called in this bthread at most once and before
// any process.
InputMessenger *messenger = static_cast<InputMessenger *>(m->user());

// Notice that all *return* no matter successful or not will run last
// message, even if the socket is about to be closed. This should be
// OK in most cases.
InputMessageClosure last_msg;
bool read_eof = false;
const int64_t received_us = butil::cpuwide_time_us();
const int64_t base_realtime = butil::gettimeofday_us() - received_us;

const ssize_t nr = m->inbound_nw_;
if (nr <= 0) {
if (0 == nr) {
// Set `read_eof' flag and proceed to feed EOF into `Protocol'
// (implied by m->_read_buf.empty), which may produce a new
// `InputMessageBase' under some protocols such as HTTP
LOG_IF(WARNING, FLAGS_log_connection_close)
<< *m << " was closed by remote side";
read_eof = true;
} else {
int err_code = -nr;
if (err_code != EAGAIN && err_code != EINTR && err_code != ENOBUFS) {
PLOG(WARNING) << "Fail to read from " << *m;
m->SetFailed(err_code, "Fail to read from %s: %s",
m->description().c_str(), berror(err_code));
}
return;
}
}

if (m->_rdma_state == Socket::RDMA_OFF &&
messenger->ProcessNewMessage(m, nr, read_eof, received_us, base_realtime,
last_msg) < 0) {
return;
}

if (read_eof) {
m->SetEOF();
}
}
#endif

InputMessenger::InputMessenger(size_t capacity)
: _handlers(NULL)
Expand Down
5 changes: 4 additions & 1 deletion src/brpc/input_messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,13 @@ friend class rdma::RdmaEndpoint;
// Channel nor Server.
int AddNonProtocolHandler(const InputMessageHandler& handler);

protected:
public:
// Load data from m->fd() into m->read_buf, cut off new messages and
// call callbacks.
static void OnNewMessages(Socket* m);
#ifdef IO_URING_ENABLED
static void OnNewMessagesFromRing(Socket *m);
#endif

private:
class InputMessageClosure {
Expand Down
33 changes: 32 additions & 1 deletion src/brpc/policy/redis_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "brpc/redis_command.h"
#include "brpc/policy/redis_protocol.h"
#include "bthread/task_group.h"
#include "bthread/ring_write_buf_pool.h"

namespace bthread {
extern BAIDU_THREAD_LOCAL TaskGroup *tls_task_group;
Expand Down Expand Up @@ -176,6 +177,10 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
cur_group->update_ext_proc_(1);
cur_task->SetBoundGroup(cur_group);
}
#ifdef IO_URING_ENABLED
auto [ring_buf, ring_buf_idx] = cur_group->GetRingWriteBuf();
appender.set_ring_buffer(ring_buf, RingWriteBufferPool::buf_length);
#endif

err = ctx->parser.Consume(*source, &current_args, &ctx->arena);
if (err != PARSE_OK) {
Expand All @@ -201,12 +206,38 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
}

cur_task->SetBoundGroup(NULL);

butil::IOBuf sendbuf;
appender.move_to(sendbuf);
#ifdef IO_URING_ENABLED
uint32_t ring_buf_size = appender.ring_buffer_size();
if (ring_buf_size > 0) {
CHECK(sendbuf.empty());
socket->SetFixedWriteLen(ring_buf_size);
int ret = cur_group->SocketFixedWrite(socket, ring_buf_idx);
if (ret != 0) {
// If the fixed buffer write is not submitted,
// falls back to the old socket write.
sendbuf.append(ring_buf, ring_buf_size);
cur_group->RecycleRingWriteBuf(ring_buf_idx);
} else {
// The fixed buffer write is submitted successfully. The ring buffer
// will be recycled after the IO uring finishes the write request.
}
} else if (ring_buf != nullptr) {
cur_group->RecycleRingWriteBuf(ring_buf_idx);
}

if (ring_buf_size == 0) {
DLOG(INFO) << "Redis socket write not using fixed buffer.";
}
#endif

if (!sendbuf.empty()) {
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
#ifdef IO_URING_ENABLED
wopt.write_through_ring = true;
#endif
LOG_IF(WARNING, socket->Write(&sendbuf, &wopt) != 0)
<< "Fail to send redis reply";
}
Expand Down
97 changes: 97 additions & 0 deletions src/brpc/redis_command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,103 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
return PARSE_OK;
}

// ParseError RedisCommandParser::Consume(std::string_view buf,
// size_t &consume_offset,
// std::vector<butil::StringPiece>* args,
// butil::Arena* arena) {
// // const char* pfc = (const char*)buf.fetch1();
// const char* pfc = buf.empty() ? nullptr : buf.data();
// if (pfc == NULL) {
// return PARSE_ERROR_NOT_ENOUGH_DATA;
// }
// // '*' stands for array "*<size>\r\n<sub-reply1><sub-reply2>..."
// if (!_parsing_array && *pfc != '*') {
// return PARSE_ERROR_TRY_OTHERS;
// }
// // '$' stands for bulk string "$<length>\r\n<string>\r\n"
// if (_parsing_array && *pfc != '$') {
// return PARSE_ERROR_ABSOLUTELY_WRONG;
// }
// char intbuf[32]; // enough for fc + 64-bit decimal + \r\n
// // const size_t ncopied = buf.copy_to(intbuf, sizeof(intbuf) - 1);
// const size_t ncopied = std::min(buf.size(), sizeof(intbuf) - 1);
// memcpy(intbuf, buf.data(), ncopied);
// intbuf[ncopied] = '\0';
// const size_t crlf_pos = butil::StringPiece(intbuf, ncopied).find("\r\n");
// if (crlf_pos == butil::StringPiece::npos) { // not enough data
// return PARSE_ERROR_NOT_ENOUGH_DATA;
// }
// char* endptr = NULL;
// int64_t value = strtoll(intbuf + 1/*skip fc*/, &endptr, 10);
// if (endptr != intbuf + crlf_pos) {
// LOG(ERROR) << '`' << intbuf + 1 << "' is not a valid 64-bit decimal";
// return PARSE_ERROR_ABSOLUTELY_WRONG;
// }
// if (value < 0) {
// LOG(ERROR) << "Invalid len=" << value << " in redis command";
// return PARSE_ERROR_ABSOLUTELY_WRONG;
// }
// if (!_parsing_array) {
// // buf.pop_front(crlf_pos + 2/*CRLF*/);
// size_t step = crlf_pos + 2/*CRLF*/;
// consume_offset += step;
// buf = buf.substr(step);
// _parsing_array = true;
// _length = value;
// _index = 0;
// _args.resize(value);
// return Consume(buf, consume_offset, args, arena);
// }
// CHECK(_index < _length) << "a complete command has been parsed. "
// "impl of RedisCommandParser::Parse is buggy";
// const int64_t len = value; // `value' is length of the string
// if (len < 0) {
// LOG(ERROR) << "string in command is nil!";
// return PARSE_ERROR_ABSOLUTELY_WRONG;
// }
// if (len > (int64_t)std::numeric_limits<uint32_t>::max()) {
// LOG(ERROR) << "string in command is too long! max length=2^32-1,"
// " actually=" << len;
// return PARSE_ERROR_ABSOLUTELY_WRONG;
// }
// if (buf.size() < crlf_pos + 2 + (size_t)len + 2/*CRLF*/) {
// return PARSE_ERROR_NOT_ENOUGH_DATA;
// }
// // buf.pop_front(crlf_pos + 2/*CRLF*/);
// size_t step = crlf_pos + 2/*CRLF*/;
// consume_offset += step;
// buf = buf.substr(step);
// char* d = (char*)arena->allocate((len/8 + 1) * 8);
// // buf.cutn(d, len);
// size_t cutn = std::min((size_t)len, buf.size());
// memcpy(d, buf.data(), cutn);
// consume_offset += cutn;
// buf = buf.substr(cutn);
// d[len] = '\0';
// _args[_index].set(d, len);
// if (_index == 0) {
// // convert it to lowercase when it is command name
// for (int i = 0; i < len; ++i) {
// d[i] = ::tolower(d[i]);
// }
// }
// char crlf[2];
// // buf.cutn(crlf, sizeof(crlf));
// crlf[0] = buf[0];
// crlf[1] = buf[1];
// buf = buf.substr(2);
// if (crlf[0] != '\r' || crlf[1] != '\n') {
// LOG(ERROR) << "string in command is not ended with CRLF";
// return PARSE_ERROR_ABSOLUTELY_WRONG;
// }
// if (++_index < _length) {
// return Consume(buf, consume_offset, args, arena);
// }
// args->swap(_args);
// Reset();
// return PARSE_OK;
// }

void RedisCommandParser::Reset() {
_parsing_array = false;
_length = 0;
Expand Down
3 changes: 3 additions & 0 deletions src/brpc/redis_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class RedisCommandParser {
// in `arena'.
ParseError Consume(butil::IOBuf& buf, std::vector<butil::StringPiece>* args,
butil::Arena* arena);
// ParseError Consume(std::string_view buf, size_t &consume_offset,
// std::vector<butil::StringPiece> *args,
// butil::Arena *arena);
size_t ParsedArgsSize();

private:
Expand Down
2 changes: 2 additions & 0 deletions src/brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
#include "brpc/builtin/common.h" // GetProgramName
#include "brpc/details/tcmalloc_extension.h"
#include "brpc/rdma/rdma_helper.h"
#include "bthread/task_control.h"

inline std::ostream& operator<<(std::ostream& os, const timeval& tm) {
const char old_fill = os.fill();
Expand All @@ -85,6 +86,7 @@ inline std::ostream& operator<<(std::ostream& os, const timeval& tm) {

extern "C" {
void* bthread_get_assigned_data();
bthread::TaskControl* bthread_get_task_control();
}

namespace brpc {
Expand Down
Loading
Loading