diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index 1244ea4a8fc..74aec5d0b8a 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -846,12 +846,13 @@ Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, cons return Status::OK(); // I'm serving this slot } - if (myself_ && myself_->importing_slot == slot && conn->IsImporting()) { + if (myself_ && myself_->importing_slot == slot && + (conn->IsImporting() || conn->IsFlagEnabled(redis::Connection::kAsking))) { // While data migrating, the topology of the destination node has not been changed. // The destination node has to serve the requests from the migrating slot, // although the slot is not belong to itself. Therefore, we record the importing slot // and mark the importing connection to accept the importing data. - return Status::OK(); // I'm serving the importing connection + return Status::OK(); // I'm serving the importing connection or asking connection } if (myself_ && imported_slots_.count(slot)) { diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc index e371e78f1cf..8ada4e8eec0 100644 --- a/src/commands/cmd_cluster.cc +++ b/src/commands/cmd_cluster.cc @@ -338,9 +338,19 @@ class CommandReadWrite : public Commander { } }; +class CommandAsking : public Commander { + public: + Status Execute(Server *srv, Connection *conn, std::string *output) override { + conn->EnableFlag(redis::Connection::kAsking); + *output = redis::SimpleString("OK"); + return Status::OK(); + } +}; + REDIS_REGISTER_COMMANDS(MakeCmdAttr("cluster", -2, "cluster no-script", 0, 0, 0, GenerateClusterFlag), MakeCmdAttr("clusterx", -2, "cluster no-script", 0, 0, 0, GenerateClusterFlag), MakeCmdAttr("readonly", 1, "cluster no-multi", 0, 0, 0), - MakeCmdAttr("readwrite", 1, "cluster no-multi", 0, 0, 0), ) + MakeCmdAttr("readwrite", 1, "cluster no-multi", 0, 0, 0), + MakeCmdAttr("asking", 1, "cluster", 0, 0, 0), ) } // namespace redis diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc index a470f225996..4a84f0a7dc7 100644 --- a/src/server/redis_connection.cc +++ b/src/server/redis_connection.cc @@ -220,6 +220,7 @@ std::string Connection::GetFlags() const { if (IsFlagEnabled(kSlave)) flags.append("S"); if (IsFlagEnabled(kCloseAfterReply)) flags.append("c"); if (IsFlagEnabled(kMonitor)) flags.append("M"); + if (IsFlagEnabled(kAsking)) flags.append("A"); if (!subscribe_channels_.empty() || !subscribe_patterns_.empty()) flags.append("P"); if (flags.empty()) flags = "N"; return flags; @@ -504,6 +505,11 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { } } + // reset the ASKING flag after executing the next query + if (IsFlagEnabled(kAsking)) { + DisableFlag(kAsking); + } + // We don't execute commands, but queue them, ant then execute in EXEC command if (is_multi_exec && !in_exec_ && !(cmd_flags & kCmdMulti)) { multi_cmds_.emplace_back(cmd_tokens); diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h index c206a1583bc..f3015fcfd6c 100644 --- a/src/server/redis_connection.h +++ b/src/server/redis_connection.h @@ -46,6 +46,7 @@ class Connection : public EvbufCallbackBase { kCloseAsync = 1 << 7, kMultiExec = 1 << 8, kReadOnly = 1 << 9, + kAsking = 1 << 10, }; explicit Connection(bufferevent *bev, Worker *owner); diff --git a/tests/gocase/integration/slotimport/slotimport_test.go b/tests/gocase/integration/slotimport/slotimport_test.go index 1d427b807fd..a3566cabfe6 100644 --- a/tests/gocase/integration/slotimport/slotimport_test.go +++ b/tests/gocase/integration/slotimport/slotimport_test.go @@ -172,3 +172,44 @@ func TestImportedServer(t *testing.T) { require.Zero(t, rdbB.Exists(ctx, slotKey).Val()) }) } + +func TestServiceImportingSlot(t *testing.T) { + ctx := context.Background() + + mockID0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + mockSrv0Host := "127.0.0.1" + mockSrv0Port := 6666 + + srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { srv1.Close() }() + rdb1 := srv1.NewClient() + defer func() { require.NoError(t, rdb1.Close()) }() + id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-8191\n", mockID0, mockSrv0Host, mockSrv0Port) + clusterNodes += fmt.Sprintf("%s %s %d master - 8192-16383\n", id1, srv1.Host(), srv1.Port()) + require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + slotNum := 1 + require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import", slotNum, 0).Val()) + + // create a new client that is not importing + cli := srv1.NewClient() + slotKey := util.SlotTable[slotNum] + + t.Run("IMPORT - query a key in importing slot without asking", func(t *testing.T) { + util.ErrorRegexp(t, cli.Type(ctx, slotKey).Err(), fmt.Sprintf("MOVED %d.*%d.*", slotNum, mockSrv0Port)) + }) + + t.Run("IMPORT - query a key in importing slot after asking", func(t *testing.T) { + require.Equal(t, "OK", cli.Do(ctx, "asking").Val()) + require.NoError(t, cli.Type(ctx, slotKey).Err()) + }) + + t.Run("IMPORT - asking flag will be reset after executing", func(t *testing.T) { + require.Equal(t, "OK", cli.Do(ctx, "asking").Val()) + require.NoError(t, cli.Type(ctx, slotKey).Err()) + util.ErrorRegexp(t, cli.Type(ctx, slotKey).Err(), fmt.Sprintf("MOVED %d.*%d.*", slotNum, mockSrv0Port)) + }) +}