Skip to content

Commit

Permalink
Add support of the ASKING command (#2273)
Browse files Browse the repository at this point in the history
  • Loading branch information
jjz921024 committed Apr 28, 2024
1 parent 83c0c0e commit 3ce7c50
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 3 deletions.
5 changes: 3 additions & 2 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -846,12 +846,13 @@ Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, cons
return Status::OK(); // I'm serving this slot
}

if (myself_ && myself_->importing_slot == slot && conn->IsImporting()) {
if (myself_ && myself_->importing_slot == slot &&
(conn->IsImporting() || conn->IsFlagEnabled(redis::Connection::kAsking))) {
// While data migrating, the topology of the destination node has not been changed.
// The destination node has to serve the requests from the migrating slot,
// although the slot is not belong to itself. Therefore, we record the importing slot
// and mark the importing connection to accept the importing data.
return Status::OK(); // I'm serving the importing connection
return Status::OK(); // I'm serving the importing connection or asking connection
}

if (myself_ && imported_slots_.count(slot)) {
Expand Down
12 changes: 11 additions & 1 deletion src/commands/cmd_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,19 @@ class CommandReadWrite : public Commander {
}
};

class CommandAsking : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
conn->EnableFlag(redis::Connection::kAsking);
*output = redis::SimpleString("OK");
return Status::OK();
}
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandCluster>("cluster", -2, "cluster no-script", 0, 0, 0, GenerateClusterFlag),
MakeCmdAttr<CommandClusterX>("clusterx", -2, "cluster no-script", 0, 0, 0, GenerateClusterFlag),
MakeCmdAttr<CommandReadOnly>("readonly", 1, "cluster no-multi", 0, 0, 0),
MakeCmdAttr<CommandReadWrite>("readwrite", 1, "cluster no-multi", 0, 0, 0), )
MakeCmdAttr<CommandReadWrite>("readwrite", 1, "cluster no-multi", 0, 0, 0),
MakeCmdAttr<CommandAsking>("asking", 1, "cluster", 0, 0, 0), )

} // namespace redis
6 changes: 6 additions & 0 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ std::string Connection::GetFlags() const {
if (IsFlagEnabled(kSlave)) flags.append("S");
if (IsFlagEnabled(kCloseAfterReply)) flags.append("c");
if (IsFlagEnabled(kMonitor)) flags.append("M");
if (IsFlagEnabled(kAsking)) flags.append("A");
if (!subscribe_channels_.empty() || !subscribe_patterns_.empty()) flags.append("P");
if (flags.empty()) flags = "N";
return flags;
Expand Down Expand Up @@ -504,6 +505,11 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
}
}

// reset the ASKING flag after executing the next query
if (IsFlagEnabled(kAsking)) {
DisableFlag(kAsking);
}

// We don't execute commands, but queue them, ant then execute in EXEC command
if (is_multi_exec && !in_exec_ && !(cmd_flags & kCmdMulti)) {
multi_cmds_.emplace_back(cmd_tokens);
Expand Down
1 change: 1 addition & 0 deletions src/server/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class Connection : public EvbufCallbackBase<Connection> {
kCloseAsync = 1 << 7,
kMultiExec = 1 << 8,
kReadOnly = 1 << 9,
kAsking = 1 << 10,
};

explicit Connection(bufferevent *bev, Worker *owner);
Expand Down
41 changes: 41 additions & 0 deletions tests/gocase/integration/slotimport/slotimport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,44 @@ func TestImportedServer(t *testing.T) {
require.Zero(t, rdbB.Exists(ctx, slotKey).Val())
})
}

func TestServiceImportingSlot(t *testing.T) {
ctx := context.Background()

mockID0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
mockSrv0Host := "127.0.0.1"
mockSrv0Port := 6666

srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
defer func() { srv1.Close() }()
rdb1 := srv1.NewClient()
defer func() { require.NoError(t, rdb1.Close()) }()
id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())

clusterNodes := fmt.Sprintf("%s %s %d master - 0-8191\n", mockID0, mockSrv0Host, mockSrv0Port)
clusterNodes += fmt.Sprintf("%s %s %d master - 8192-16383\n", id1, srv1.Host(), srv1.Port())
require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())

slotNum := 1
require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import", slotNum, 0).Val())

// create a new client that is not importing
cli := srv1.NewClient()
slotKey := util.SlotTable[slotNum]

t.Run("IMPORT - query a key in importing slot without asking", func(t *testing.T) {
util.ErrorRegexp(t, cli.Type(ctx, slotKey).Err(), fmt.Sprintf("MOVED %d.*%d.*", slotNum, mockSrv0Port))
})

t.Run("IMPORT - query a key in importing slot after asking", func(t *testing.T) {
require.Equal(t, "OK", cli.Do(ctx, "asking").Val())
require.NoError(t, cli.Type(ctx, slotKey).Err())
})

t.Run("IMPORT - asking flag will be reset after executing", func(t *testing.T) {
require.Equal(t, "OK", cli.Do(ctx, "asking").Val())
require.NoError(t, cli.Type(ctx, slotKey).Err())
util.ErrorRegexp(t, cli.Type(ctx, slotKey).Err(), fmt.Sprintf("MOVED %d.*%d.*", slotNum, mockSrv0Port))
})
}

0 comments on commit 3ce7c50

Please sign in to comment.