Skip to content

Commit

Permalink
Add support of the CLUSTER REPLICAS command (#2244)
Browse files Browse the repository at this point in the history
  • Loading branch information
proost committed Apr 13, 2024
1 parent 135dbc3 commit 6aabd80
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 2 deletions.
42 changes: 42 additions & 0 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,48 @@ Status Cluster::GetClusterNodes(std::string *nodes_str) {
return Status::OK();
}

StatusOr<std::string> Cluster::GetReplicas(const std::string &node_id) {
if (version_ < 0) {
return {Status::ClusterDown, errClusterNoInitialized};
}

auto item = nodes_.find(node_id);
if (item == nodes_.end()) {
return {Status::InvalidArgument, errInvalidNodeID};
}

auto node = item->second;
if (node->role != kClusterMaster) {
return {Status::InvalidArgument, errNoMasterNode};
}

auto now = util::GetTimeStampMS();
std::string replicas_desc;
for (const auto &replica_id : node->replicas) {
auto n = nodes_.find(replica_id);
if (n == nodes_.end()) {
continue;
}

auto replica = n->second;

std::string node_str;
// ID, host, port
node_str.append(
fmt::format("{} {}:{}@{} ", replica_id, replica->host, replica->port, replica->port + kClusterPortIncr));

// Flags
node_str.append(fmt::format("slave {} ", node_id));

// Ping sent, pong received, config epoch, link status
node_str.append(fmt::format("{} {} {} connected", now - 1, now, version_));

replicas_desc.append(node_str + "\n");
}

return replicas_desc;
}

std::string Cluster::getNodeIDBySlot(int slot) const {
if (slot < 0 || slot >= kClusterSlots || !slots_nodes_[slot]) return "";
return slots_nodes_[slot]->id;
Expand Down
1 change: 1 addition & 0 deletions src/cluster/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class Cluster {
explicit Cluster(Server *srv, std::vector<std::string> binds, int port);
Status SetClusterNodes(const std::string &nodes_str, int64_t version, bool force);
Status GetClusterNodes(std::string *nodes_str);
StatusOr<std::string> GetReplicas(const std::string &node_id);
Status SetNodeId(const std::string &node_id);
Status SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const std::string &node_id, int64_t version);
Status SetSlotMigrated(int slot, const std::string &ip_port);
Expand Down
12 changes: 11 additions & 1 deletion src/commands/cmd_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ class CommandCluster : public Commander {
return Status::OK();
}

return {Status::RedisParseErr, "CLUSTER command, CLUSTER INFO|NODES|SLOTS|KEYSLOT|RESET"};
if (subcommand_ == "replicas" && args_.size() == 3) return Status::OK();

return {Status::RedisParseErr, "CLUSTER command, CLUSTER INFO|NODES|SLOTS|KEYSLOT|RESET|REPLICAS"};
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -113,6 +115,14 @@ class CommandCluster : public Commander {
} else {
return {Status::RedisExecErr, s.Msg()};
}
} else if (subcommand_ == "replicas") {
auto node_id = args_[2];
StatusOr<std::string> s = srv->cluster->GetReplicas(node_id);
if (s.IsOK()) {
*output = conn->VerbatimString("txt", s.GetValue());
} else {
return {Status::RedisExecErr, s.Msg()};
}
} else {
return {Status::RedisExecErr, "Invalid cluster command options"};
}
Expand Down
46 changes: 46 additions & 0 deletions tests/cppunit/cluster_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,49 @@ TEST_F(ClusterTest, ClusterParseSlotRanges) {
slots.clear();
}
}

TEST_F(ClusterTest, GetReplicas) {
auto config = storage_->GetConfig();
// don't start workers
config->workers = 0;
Server server(storage_.get(), config);
// we don't need the server resource, so just stop it once it's started
server.Stop();
server.Join();

const std::string nodes =
"7dbee3d628f04cc5d763b36e92b10533e627a1d0 127.0.0.1 6480 slave 159dde1194ebf5bfc5a293dff839c3d1476f2a49\n"
"159dde1194ebf5bfc5a293dff839c3d1476f2a49 127.0.0.1 6479 master - 8192-16383\n"
"bb2e5b3c5282086df51eff6b3e35519aede96fa6 127.0.0.1 6379 master - 0-8191";

Cluster cluster(&server, {"127.0.0.1"}, 6379);
Status s = cluster.SetClusterNodes(nodes, 2, false);
ASSERT_TRUE(s.IsOK());

auto with_replica = cluster.GetReplicas("159dde1194ebf5bfc5a293dff839c3d1476f2a49");
ASSERT_TRUE(s.IsOK());

std::vector<std::string> replicas = util::Split(with_replica.GetValue(), "\n");
for (const auto &replica : replicas) {
std::vector<std::string> replica_fields = util::Split(replica, " ");

ASSERT_TRUE(replica_fields.size() == 8);
ASSERT_TRUE(replica_fields[0] == "7dbee3d628f04cc5d763b36e92b10533e627a1d0");
ASSERT_TRUE(replica_fields[1] == "127.0.0.1:6480@16480");
ASSERT_TRUE(replica_fields[2] == "slave");
ASSERT_TRUE(replica_fields[3] == "159dde1194ebf5bfc5a293dff839c3d1476f2a49");
ASSERT_TRUE(replica_fields[7] == "connected");
}

auto without_replica = cluster.GetReplicas("bb2e5b3c5282086df51eff6b3e35519aede96fa6");
ASSERT_TRUE(without_replica.IsOK());
ASSERT_EQ(without_replica.GetValue(), "");

auto replica_node = cluster.GetReplicas("7dbee3d628f04cc5d763b36e92b10533e627a1d0");
ASSERT_FALSE(replica_node.IsOK());
ASSERT_EQ(replica_node.Msg(), "The node isn't a master");

auto unknown_node = cluster.GetReplicas("1234567890");
ASSERT_FALSE(unknown_node.IsOK());
ASSERT_EQ(unknown_node.Msg(), "Invalid cluster node id");
}
58 changes: 57 additions & 1 deletion tests/gocase/integration/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
"testing"
"time"

"github.com/apache/kvrocks/tests/gocase/util"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"

"github.com/apache/kvrocks/tests/gocase/util"
)

func TestDisableCluster(t *testing.T) {
Expand Down Expand Up @@ -131,6 +132,61 @@ func TestClusterNodes(t *testing.T) {
})
}

func TestClusterReplicas(t *testing.T) {
srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
defer srv.Close()

ctx := context.Background()
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()

nodes := ""

master1ID := "bb2e5b3c5282086df51eff6b3e35519aede96fa6"
master1Node := fmt.Sprintf("%s %s %d master - 0-8191", master1ID, srv.Host(), srv.Port())
nodes += master1Node + "\n"

master2ID := "159dde1194ebf5bfc5a293dff839c3d1476f2a49"
master2Node := fmt.Sprintf("%s %s %d master - 8192-16383", master2ID, srv.Host(), srv.Port())
nodes += master2Node + "\n"

replica2ID := "7dbee3d628f04cc5d763b36e92b10533e627a1d0"
replica2Node := fmt.Sprintf("%s %s %d slave %s", replica2ID, srv.Host(), srv.Port(), master2ID)
nodes += replica2Node

require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODES", nodes, "2").Err())
require.EqualValues(t, "2", rdb.Do(ctx, "clusterx", "version").Val())

t.Run("with replicas", func(t *testing.T) {
replicas, err := rdb.Do(ctx, "cluster", "replicas", "159dde1194ebf5bfc5a293dff839c3d1476f2a49").Text()
require.NoError(t, err)
fields := strings.Split(replicas, " ")
require.Len(t, fields, 8)
require.Equal(t, fmt.Sprintf("%s@%d", srv.HostPort(), srv.Port()+10000), fields[1])
require.Equal(t, "slave", fields[2])
require.Equal(t, master2ID, fields[3])
require.Equal(t, "connected\n", fields[7])
})

t.Run("without replicas", func(t *testing.T) {
replicas, err := rdb.Do(ctx, "cluster", "replicas", "bb2e5b3c5282086df51eff6b3e35519aede96fa6").Text()
require.NoError(t, err)
require.Empty(t, replicas)
})

t.Run("send command to replica", func(t *testing.T) {
err := rdb.Do(ctx, "cluster", "replicas", "7dbee3d628f04cc5d763b36e92b10533e627a1d0").Err()
require.Error(t, err)
require.Contains(t, err.Error(), "The node isn't a master")
})

t.Run("unknown node", func(t *testing.T) {
err := rdb.Do(ctx, "cluster", "replicas", "unknown").Err()
require.Error(t, err)
require.Contains(t, err.Error(), "Invalid cluster node id")
})
}

func TestClusterDumpAndLoadClusterNodesInfo(t *testing.T) {
srv1 := util.StartServer(t, map[string]string{
"bind": "0.0.0.0",
Expand Down

0 comments on commit 6aabd80

Please sign in to comment.