diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index d398ecbf8f98..0b481ae8c33a 100644 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -290,7 +290,7 @@ void MCReplyBuilder::SendRaw(std::string_view str) { void RedisReplyBuilderBase::SendNull() { ReplyScope scope(this); - resp3_ ? WritePieces(kNullStringR3) : WritePieces(kNullStringR2); + IsResp3() ? WritePieces(kNullStringR3) : WritePieces(kNullStringR2); } void RedisReplyBuilderBase::SendSimpleString(std::string_view str) { @@ -323,7 +323,7 @@ void RedisReplyBuilderBase::SendDouble(double val) { static_assert(ABSL_ARRAYSIZE(buf) < kMaxInlineSize, "Write temporary string from buf inline"); string_view val_str = FormatDouble(val, buf, ABSL_ARRAYSIZE(buf)); - if (!resp3_) + if (!IsResp3()) return SendBulkString(val_str); ReplyScope scope(this); @@ -422,7 +422,7 @@ void RedisReplyBuilder::SendScoredArray(ScoredArray arr, bool with_scores) { void RedisReplyBuilder::SendLabeledScoredArray(std::string_view arr_label, ScoredArray arr) { ReplyScope scope(this); - + StartArray(2); SendBulkString(arr_label); @@ -432,7 +432,6 @@ void RedisReplyBuilder::SendLabeledScoredArray(std::string_view arr_label, Score SendBulkString(str); SendDouble(score); } - } void RedisReplyBuilder::SendStored() { diff --git a/src/facade/reply_builder.h b/src/facade/reply_builder.h index 8c2bf7ee38c6..aba1ef41534f 100644 --- a/src/facade/reply_builder.h +++ b/src/facade/reply_builder.h @@ -22,6 +22,8 @@ enum class ReplyMode { FULL // All replies are recorded }; +enum class RespVersion { kResp2, kResp3 }; + // Base class for all reply builders. Offer a simple high level interface for controlling output // modes and sending basic response types. class SinkReplyBuilder { @@ -258,15 +260,19 @@ class RedisReplyBuilderBase : public SinkReplyBuilder { static std::string SerializeCommand(std::string_view command); bool IsResp3() const { - return resp3_; + return resp_ == RespVersion::kResp3; + } + + void SetRespVersion(RespVersion resp_version) { + resp_ = resp_version; } - void SetResp3(bool resp3) { - resp3_ = resp3; + RespVersion GetRespVersion() { + return resp_; } private: - bool resp3_ = false; + RespVersion resp_ = RespVersion::kResp2; }; // Non essential redis reply builder functions implemented on top of the base resp protocol diff --git a/src/facade/reply_builder_test.cc b/src/facade/reply_builder_test.cc index 8560bcde5ee0..f1a3c5b49b25 100644 --- a/src/facade/reply_builder_test.cc +++ b/src/facade/reply_builder_test.cc @@ -699,14 +699,14 @@ TEST_F(RedisReplyBuilderTest, BatchMode) { } TEST_F(RedisReplyBuilderTest, Resp3Double) { - builder_->SetResp3(true); + builder_->SetRespVersion(RespVersion::kResp3); builder_->SendDouble(5.5); ASSERT_TRUE(NoErrors()); ASSERT_EQ(str(), ",5.5\r\n"); } TEST_F(RedisReplyBuilderTest, Resp3NullString) { - builder_->SetResp3(true); + builder_->SetRespVersion(RespVersion::kResp3); builder_->SendNull(); ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "_\r\n"); @@ -715,13 +715,13 @@ TEST_F(RedisReplyBuilderTest, Resp3NullString) { TEST_F(RedisReplyBuilderTest, SendStringArrayAsMap) { const std::vector map_array{"k1", "v1", "k2", "v2"}; - builder_->SetResp3(false); + builder_->SetRespVersion(RespVersion::kResp2); builder_->SendBulkStrArr(map_array, builder_->MAP); ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "*4\r\n$2\r\nk1\r\n$2\r\nv1\r\n$2\r\nk2\r\n$2\r\nv2\r\n") << "SendStringArrayAsMap Resp2 Failed."; - builder_->SetResp3(true); + builder_->SetRespVersion(RespVersion::kResp3); builder_->SendBulkStrArr(map_array, builder_->MAP); ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "%2\r\n$2\r\nk1\r\n$2\r\nv1\r\n$2\r\nk2\r\n$2\r\nv2\r\n") @@ -731,13 +731,13 @@ TEST_F(RedisReplyBuilderTest, SendStringArrayAsMap) { TEST_F(RedisReplyBuilderTest, SendStringArrayAsSet) { const std::vector set_array{"e1", "e2", "e3"}; - builder_->SetResp3(false); + builder_->SetRespVersion(RespVersion::kResp2); builder_->SendBulkStrArr(set_array, builder_->SET); ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "*3\r\n$2\r\ne1\r\n$2\r\ne2\r\n$2\r\ne3\r\n") << "SendStringArrayAsSet Resp2 Failed."; - builder_->SetResp3(true); + builder_->SetRespVersion(RespVersion::kResp3); builder_->SendBulkStrArr(set_array, builder_->SET); ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "~3\r\n$2\r\ne1\r\n$2\r\ne2\r\n$2\r\ne3\r\n") @@ -748,26 +748,26 @@ TEST_F(RedisReplyBuilderTest, SendScoredArray) { const std::vector> scored_array{ {"e1", 1.1}, {"e2", 2.2}, {"e3", 3.3}}; - builder_->SetResp3(false); + builder_->SetRespVersion(RespVersion::kResp2); builder_->SendScoredArray(scored_array, false); ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "*3\r\n$2\r\ne1\r\n$2\r\ne2\r\n$2\r\ne3\r\n") << "Resp2 WITHOUT scores failed."; - builder_->SetResp3(true); + builder_->SetRespVersion(RespVersion::kResp3); builder_->SendScoredArray(scored_array, false); ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "*3\r\n$2\r\ne1\r\n$2\r\ne2\r\n$2\r\ne3\r\n") << "Resp3 WITHOUT scores failed."; - builder_->SetResp3(false); + builder_->SetRespVersion(RespVersion::kResp2); builder_->SendScoredArray(scored_array, true); ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "*6\r\n$2\r\ne1\r\n$3\r\n1.1\r\n$2\r\ne2\r\n$3\r\n2.2\r\n$2\r\ne3\r\n$3\r\n3.3\r\n") << "Resp3 WITHSCORES failed."; - builder_->SetResp3(true); + builder_->SetRespVersion(RespVersion::kResp3); builder_->SendScoredArray(scored_array, true); ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), @@ -779,7 +779,7 @@ TEST_F(RedisReplyBuilderTest, SendLabeledScoredArray) { const std::vector> scored_array{ {"e1", 1.1}, {"e2", 2.2}, {"e3", 3.3}}; - builder_->SetResp3(false); + builder_->SetRespVersion(RespVersion::kResp2); builder_->SendLabeledScoredArray("foobar", scored_array); ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), @@ -787,7 +787,7 @@ TEST_F(RedisReplyBuilderTest, SendLabeledScoredArray) { "2\r\n*2\r\n$2\r\ne3\r\n$3\r\n3.3\r\n") << "Resp3 failed.\n"; - builder_->SetResp3(true); + builder_->SetRespVersion(RespVersion::kResp3); builder_->SendLabeledScoredArray("foobar", scored_array); ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), @@ -850,8 +850,8 @@ TEST_F(RedisReplyBuilderTest, BasicCapture) { big_arr_cb, }; - crb.SetResp3(true); - builder_->SetResp3(true); + crb.SetRespVersion(RespVersion::kResp3); + builder_->SetRespVersion(RespVersion::kResp3); // Run generator functions on both a regular redis builder // and the capturing builder with its capture applied. @@ -864,7 +864,7 @@ TEST_F(RedisReplyBuilderTest, BasicCapture) { EXPECT_EQ(expected, actual); } - builder_->SetResp3(false); + builder_->SetRespVersion(RespVersion::kResp2); } TEST_F(RedisReplyBuilderTest, FormatDouble) { @@ -889,17 +889,17 @@ TEST_F(RedisReplyBuilderTest, VerbatimString) { // test resp3 std::string str = "A simple string!"; - builder_->SetResp3(true); + builder_->SetRespVersion(RespVersion::kResp3); builder_->SendVerbatimString(str, RedisReplyBuilder::VerbatimFormat::TXT); ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "=20\r\ntxt:A simple string!\r\n") << "Resp3 VerbatimString TXT failed."; - builder_->SetResp3(true); + builder_->SetRespVersion(RespVersion::kResp3); builder_->SendVerbatimString(str, RedisReplyBuilder::VerbatimFormat::MARKDOWN); ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "=20\r\nmkd:A simple string!\r\n") << "Resp3 VerbatimString TXT failed."; - builder_->SetResp3(false); + builder_->SetRespVersion(RespVersion::kResp2); builder_->SendVerbatimString(str); ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "$16\r\nA simple string!\r\n") << "Resp3 VerbatimString TXT failed."; diff --git a/src/facade/reply_capture.h b/src/facade/reply_capture.h index 1ffa4fd7cfce..7712992350e1 100644 --- a/src/facade/reply_capture.h +++ b/src/facade/reply_capture.h @@ -43,8 +43,9 @@ class CapturingReplyBuilder : public RedisReplyBuilder { struct SimpleString : public std::string {}; // SendSimpleString struct BulkString : public std::string {}; // SendBulkString - CapturingReplyBuilder(ReplyMode mode = ReplyMode::FULL) + CapturingReplyBuilder(ReplyMode mode = ReplyMode::FULL, RespVersion resp_v = RespVersion::kResp2) : RedisReplyBuilder{nullptr}, reply_mode_{mode}, stack_{}, current_{} { + SetRespVersion(resp_v); } using Payload = std::variantshard_id()]; DCHECK(!sinfo.cmds.empty()); auto* local_tx = sinfo.local_tx.get(); - facade::CapturingReplyBuilder crb; + facade::CapturingReplyBuilder crb(ReplyMode::FULL, resp_v); ConnectionContext local_cntx{cntx_, local_tx}; if (cntx_->conn()) { local_cntx.skip_acl_validation = cntx_->conn()->IsPrivileged(); @@ -244,14 +245,15 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { cntx_->cid = base_cid_; auto cb = [this](ShardId sid) { return !sharded_[sid].cmds.empty(); }; tx->PrepareSquashedMultiHop(base_cid_, cb); - tx->ScheduleSingleHop([this](auto* tx, auto* es) { return SquashedHopCb(tx, es); }); + tx->ScheduleSingleHop( + [this, rb](auto* tx, auto* es) { return SquashedHopCb(tx, es, rb->GetRespVersion()); }); } else { #if 1 fb2::BlockingCounter bc(num_shards); DVLOG(1) << "Squashing " << num_shards << " " << tx->DebugId(); - auto cb = [this, tx, bc]() mutable { - this->SquashedHopCb(tx, EngineShard::tlocal()); + auto cb = [this, tx, bc, rb]() mutable { + this->SquashedHopCb(tx, EngineShard::tlocal(), rb->GetRespVersion()); bc->Dec(); }; @@ -261,8 +263,9 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { } bc->Wait(); #else - shard_set->RunBlockingInParallel([this, tx](auto* es) { SquashedHopCb(tx, es); }, - [this](auto sid) { return !sharded_[sid].cmds.empty(); }); + shard_set->RunBlockingInParallel( + [this, tx, rb](auto* es) { SquashedHopCb(tx, es, rb->GetRespVersion()); }, + [this](auto sid) { return !sharded_[sid].cmds.empty(); }); #endif } diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index ccea7f7a2c71..e269cc52256e 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -61,7 +61,8 @@ class MultiCommandSquasher { bool ExecuteStandalone(facade::RedisReplyBuilder* rb, StoredCmd* cmd); // Callback that runs on shards during squashed hop. - facade::OpStatus SquashedHopCb(Transaction* parent_tx, EngineShard* es); + facade::OpStatus SquashedHopCb(Transaction* parent_tx, EngineShard* es, + facade::RespVersion resp_v); // Execute all currently squashed commands. Return false if aborting on error. bool ExecuteSquashed(facade::RedisReplyBuilder* rb); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 83c06f5a532a..9efcb2b73874 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2696,10 +2696,10 @@ void ServerFamily::Hello(CmdArgList args, const CommandContext& cmd_cntx) { int proto_version = 2; if (is_resp3) { proto_version = 3; - rb->SetResp3(true); + rb->SetRespVersion(RespVersion::kResp3); } else { // Issuing hello 2 again is valid and should switch back to RESP2 - rb->SetResp3(false); + rb->SetRespVersion(RespVersion::kResp2); } SinkReplyBuilder::ReplyAggregator agg(rb);