Skip to content

Commit

Permalink
chore: Introduce small buffer in redis parser
Browse files Browse the repository at this point in the history
This is needed in order to eliminate cases where we return INPUT_PENDING but do not consume the whole string by rejecting just several bytes.
This should simplify buffer management for the caller, so that if they pass a string that
did not result in complete parsed request, at least the whole string is consumed and can be discarded.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Dec 18, 2024
1 parent 76afb2e commit 992fbe8
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 24 deletions.
69 changes: 60 additions & 9 deletions src/facade/redis_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
#include "facade/redis_parser.h"

#include <absl/strings/escaping.h>
#include <absl/strings/numbers.h>

#include "base/logging.h"
Expand All @@ -18,6 +19,9 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
*consumed = 0;
res->clear();

DVLOG(2) << "Parsing: "
<< absl::CHexEscape(string_view{reinterpret_cast<const char*>(str.data()), str.size()});

if (state_ == CMD_COMPLETE_S) {
if (InitStart(str[0], res)) {
// We recognized a non-INLINE state, starting with a special char.
Expand Down Expand Up @@ -76,13 +80,16 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
}

if (resultc.first == INPUT_PENDING) {
DCHECK(str.empty());
StashState(res);
}
return resultc.first;
}

if (resultc.first == OK) {
DCHECK(cached_expr_);
DCHECK_EQ(0, small_len_);

if (res != cached_expr_) {
DCHECK(!stash_.empty());

Expand Down Expand Up @@ -233,15 +240,24 @@ auto RedisParser::ParseLen(Buffer str, int64_t* res) -> ResultConsumed {
const char* s = reinterpret_cast<const char*>(str.data());
const char* pos = reinterpret_cast<const char*>(memchr(s, '\n', str.size()));
if (!pos) {
Result r = INPUT_PENDING;
if (str.size() >= 32) {
if (str.size() + small_len_ < sizeof(small_buf_)) {
memcpy(small_buf_ + small_len_, str.data(), str.size());
small_len_ += str.size();
return {INPUT_PENDING, str.size()};
}
LOG(WARNING) << "Unexpected format " << string_view{s, str.size()};
r = BAD_ARRAYLEN;
return ResultConsumed{BAD_ARRAYLEN, 0};
}
return {r, 0};
}

unsigned consumed = pos - s + 1;
if (small_len_ > 0) {
memcpy(small_buf_ + small_len_, str.data(), consumed);
small_len_ += consumed;
s = small_buf_;
pos = small_buf_ + small_len_ - 1;
small_len_ = 0;
}

if (pos[-1] != '\r') {
return {BAD_ARRAYLEN, consumed};
}
Expand Down Expand Up @@ -322,8 +338,10 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {

unsigned min_len = 2 + int(arg_c_ != '_');

if (str.size() < min_len) {
return {INPUT_PENDING, 0};
if (small_len_ + str.size() < min_len) {
memcpy(small_buf_ + small_len_, str.data(), str.size());
small_len_ += str.size();
return {INPUT_PENDING, str.size()};
}

if (arg_c_ == '$') {
Expand Down Expand Up @@ -354,14 +372,22 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
DCHECK(!server_mode_);
if (arg_c_ == '_') { // Resp3 NIL
// '\r','\n'
if (str[0] != '\r' || str[1] != '\n') {
// '_','\r','\n'
DCHECK_GE(small_len_ + str.size(), 2u);
DCHECK_LT(small_len_, 2);

unsigned consumed = 2 - small_len_;
for (unsigned i = 0; i < consumed; ++i) {
small_buf_[small_len_ + i] = str[i];
}
if (small_buf_[0] != '\r' || small_buf_[1] != '\n') {
return {BAD_STRING, 0};
}

cached_expr_->emplace_back(RespExpr::NIL);
cached_expr_->back().u = Buffer{};
HandleFinishArg();
return {OK, 2};
return {OK, consumed};
}

if (arg_c_ == '*') {
Expand Down Expand Up @@ -425,6 +451,26 @@ auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed {

uint32_t consumed = 0;

if (small_len_ > 0) {
DCHECK(!is_broken_token_);
DCHECK_EQ(bulk_len_, 0u);

if (bulk_len_ == 0) {
DCHECK_EQ(small_len_, 1);
DCHECK_GE(str.size(), 1u);
if (small_buf_[0] != '\r' || str[0] != '\n') {
return {BAD_STRING, 0};
}
consumed = bulk_len_ + 2;
small_len_ = 0;
HandleFinishArg();

return {OK, 1};
}
}

DCHECK_EQ(small_len_, 0);

if (str.size() >= bulk_len_) {
consumed = bulk_len_;
if (bulk_len_) {
Expand All @@ -446,6 +492,10 @@ auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed {
}
HandleFinishArg();
return {OK, consumed + 2};
} else if (str.size() == 1) {
small_buf_[0] = str[0];
consumed++;
small_len_ = 1;
}
return {INPUT_PENDING, consumed};
}
Expand Down Expand Up @@ -490,6 +540,7 @@ void RedisParser::HandleFinishArg() {
}
cached_expr_ = parse_stack_.back().second;
}
small_len_ = 0;
}

void RedisParser::ExtendLastString(Buffer str) {
Expand Down
5 changes: 3 additions & 2 deletions src/facade/redis_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ class RedisParser {
* part of str because parser caches the intermediate state internally according to 'consumed'
* result.
*
* Note: A parser does not always guarantee progress, i.e. if a small buffer was passed it may
* returns INPUT_PENDING with consumed == 0.
*
*/

Expand Down Expand Up @@ -99,7 +97,9 @@ class RedisParser {
State state_ = CMD_COMPLETE_S;
bool is_broken_token_ = false; // true, if a token (inline or bulk) is broken during the parsing.
bool server_mode_ = true;
uint8_t small_len_ = 0;
char arg_c_ = 0;

uint32_t bulk_len_ = 0;
uint32_t last_stashed_level_ = 0, last_stashed_index_ = 0;
uint32_t max_arr_len_;
Expand All @@ -114,6 +114,7 @@ class RedisParser {

using Blob = std::vector<uint8_t>;
std::vector<Blob> buf_stash_;
char small_buf_[32];
};

} // namespace facade
33 changes: 20 additions & 13 deletions src/facade/redis_parser_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ TEST_F(RedisParserTest, ClientMode) {

ASSERT_EQ(RedisParser::OK, Parse("-ERR foo bar\r\n"));
EXPECT_THAT(args_, ElementsAre(ErrArg("ERR foo")));

ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("_"));
EXPECT_EQ(1, consumed_);
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\r"));
EXPECT_EQ(1, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\n"));
EXPECT_EQ(1, consumed_);
EXPECT_THAT(args_, ElementsAre(ArgType(RespExpr::NIL)));
ASSERT_EQ(RedisParser::OK, Parse("*2\r\n_\r\n_\r\n"));
}

TEST_F(RedisParserTest, Hierarchy) {
Expand Down Expand Up @@ -183,13 +192,13 @@ TEST_F(RedisParserTest, LargeBulk) {
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(half));
ASSERT_EQ(512, consumed_);
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\r"));
ASSERT_EQ(0, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\r\n"));
ASSERT_EQ(2, consumed_);
ASSERT_EQ(1, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\n"));
EXPECT_EQ(1, consumed_);

string part1 = absl::StrCat(prefix, half);
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(part1));
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(half));
EXPECT_EQ(RedisParser::INPUT_PENDING, Parse(part1));
EXPECT_EQ(RedisParser::INPUT_PENDING, Parse(half));
ASSERT_EQ(RedisParser::OK, Parse("\r\n"));

prefix = "*1\r\n$270000000\r\n";
Expand Down Expand Up @@ -245,24 +254,22 @@ TEST_F(RedisParserTest, UsedMemory) {

TEST_F(RedisParserTest, Eol) {
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*1\r"));
EXPECT_EQ(1, consumed_);
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("1\r\n$5\r\n"));
EXPECT_EQ(7, consumed_);
EXPECT_EQ(3, consumed_);
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\n$5\r\n"));
EXPECT_EQ(5, consumed_);
}

TEST_F(RedisParserTest, BulkSplit) {
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*1\r\n$4\r\nSADD"));
ASSERT_EQ(12, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\r\n"));
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*1\r\n$4\r\nSADD\r"));
ASSERT_EQ(13, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\n"));
}

TEST_F(RedisParserTest, InlineSplit) {
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\n"));
EXPECT_EQ(1, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\nPING\n\n"));
EXPECT_EQ(6, consumed_);
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\n"));
EXPECT_EQ(1, consumed_);
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("P"));
ASSERT_EQ(RedisParser::OK, Parse("ING\n"));
}
Expand Down

0 comments on commit 992fbe8

Please sign in to comment.