From 6aabd8050a7d2cb26dee78906d448514f79fff26 Mon Sep 17 00:00:00 2001 From: Hyeonho Kim Date: Sat, 13 Apr 2024 18:39:10 +0900 Subject: [PATCH] Add support of the CLUSTER REPLICAS command (#2244) --- src/cluster/cluster.cc | 42 ++++++++++++++ src/cluster/cluster.h | 1 + src/commands/cmd_cluster.cc | 12 +++- tests/cppunit/cluster_test.cc | 46 +++++++++++++++ .../integration/cluster/cluster_test.go | 58 ++++++++++++++++++- 5 files changed, 157 insertions(+), 2 deletions(-) diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index 8e16270cbf2..72a7f15a338 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -469,6 +469,48 @@ Status Cluster::GetClusterNodes(std::string *nodes_str) { return Status::OK(); } +StatusOr Cluster::GetReplicas(const std::string &node_id) { + if (version_ < 0) { + return {Status::ClusterDown, errClusterNoInitialized}; + } + + auto item = nodes_.find(node_id); + if (item == nodes_.end()) { + return {Status::InvalidArgument, errInvalidNodeID}; + } + + auto node = item->second; + if (node->role != kClusterMaster) { + return {Status::InvalidArgument, errNoMasterNode}; + } + + auto now = util::GetTimeStampMS(); + std::string replicas_desc; + for (const auto &replica_id : node->replicas) { + auto n = nodes_.find(replica_id); + if (n == nodes_.end()) { + continue; + } + + auto replica = n->second; + + std::string node_str; + // ID, host, port + node_str.append( + fmt::format("{} {}:{}@{} ", replica_id, replica->host, replica->port, replica->port + kClusterPortIncr)); + + // Flags + node_str.append(fmt::format("slave {} ", node_id)); + + // Ping sent, pong received, config epoch, link status + node_str.append(fmt::format("{} {} {} connected", now - 1, now, version_)); + + replicas_desc.append(node_str + "\n"); + } + + return replicas_desc; +} + std::string Cluster::getNodeIDBySlot(int slot) const { if (slot < 0 || slot >= kClusterSlots || !slots_nodes_[slot]) return ""; return slots_nodes_[slot]->id; diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h index 79e0d38e99e..c98ea668082 100644 --- a/src/cluster/cluster.h +++ b/src/cluster/cluster.h @@ -71,6 +71,7 @@ class Cluster { explicit Cluster(Server *srv, std::vector binds, int port); Status SetClusterNodes(const std::string &nodes_str, int64_t version, bool force); Status GetClusterNodes(std::string *nodes_str); + StatusOr GetReplicas(const std::string &node_id); Status SetNodeId(const std::string &node_id); Status SetSlotRanges(const std::vector &slot_ranges, const std::string &node_id, int64_t version); Status SetSlotMigrated(int slot, const std::string &ip_port); diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc index 5831fa50dc6..04382ac1710 100644 --- a/src/commands/cmd_cluster.cc +++ b/src/commands/cmd_cluster.cc @@ -49,7 +49,9 @@ class CommandCluster : public Commander { return Status::OK(); } - return {Status::RedisParseErr, "CLUSTER command, CLUSTER INFO|NODES|SLOTS|KEYSLOT|RESET"}; + if (subcommand_ == "replicas" && args_.size() == 3) return Status::OK(); + + return {Status::RedisParseErr, "CLUSTER command, CLUSTER INFO|NODES|SLOTS|KEYSLOT|RESET|REPLICAS"}; } Status Execute(Server *srv, Connection *conn, std::string *output) override { @@ -113,6 +115,14 @@ class CommandCluster : public Commander { } else { return {Status::RedisExecErr, s.Msg()}; } + } else if (subcommand_ == "replicas") { + auto node_id = args_[2]; + StatusOr s = srv->cluster->GetReplicas(node_id); + if (s.IsOK()) { + *output = conn->VerbatimString("txt", s.GetValue()); + } else { + return {Status::RedisExecErr, s.Msg()}; + } } else { return {Status::RedisExecErr, "Invalid cluster command options"}; } diff --git a/tests/cppunit/cluster_test.cc b/tests/cppunit/cluster_test.cc index 810a1ca197b..e70c3136f62 100644 --- a/tests/cppunit/cluster_test.cc +++ b/tests/cppunit/cluster_test.cc @@ -340,3 +340,49 @@ TEST_F(ClusterTest, ClusterParseSlotRanges) { slots.clear(); } } + +TEST_F(ClusterTest, GetReplicas) { + auto config = storage_->GetConfig(); + // don't start workers + config->workers = 0; + Server server(storage_.get(), config); + // we don't need the server resource, so just stop it once it's started + server.Stop(); + server.Join(); + + const std::string nodes = + "7dbee3d628f04cc5d763b36e92b10533e627a1d0 127.0.0.1 6480 slave 159dde1194ebf5bfc5a293dff839c3d1476f2a49\n" + "159dde1194ebf5bfc5a293dff839c3d1476f2a49 127.0.0.1 6479 master - 8192-16383\n" + "bb2e5b3c5282086df51eff6b3e35519aede96fa6 127.0.0.1 6379 master - 0-8191"; + + Cluster cluster(&server, {"127.0.0.1"}, 6379); + Status s = cluster.SetClusterNodes(nodes, 2, false); + ASSERT_TRUE(s.IsOK()); + + auto with_replica = cluster.GetReplicas("159dde1194ebf5bfc5a293dff839c3d1476f2a49"); + ASSERT_TRUE(s.IsOK()); + + std::vector replicas = util::Split(with_replica.GetValue(), "\n"); + for (const auto &replica : replicas) { + std::vector replica_fields = util::Split(replica, " "); + + ASSERT_TRUE(replica_fields.size() == 8); + ASSERT_TRUE(replica_fields[0] == "7dbee3d628f04cc5d763b36e92b10533e627a1d0"); + ASSERT_TRUE(replica_fields[1] == "127.0.0.1:6480@16480"); + ASSERT_TRUE(replica_fields[2] == "slave"); + ASSERT_TRUE(replica_fields[3] == "159dde1194ebf5bfc5a293dff839c3d1476f2a49"); + ASSERT_TRUE(replica_fields[7] == "connected"); + } + + auto without_replica = cluster.GetReplicas("bb2e5b3c5282086df51eff6b3e35519aede96fa6"); + ASSERT_TRUE(without_replica.IsOK()); + ASSERT_EQ(without_replica.GetValue(), ""); + + auto replica_node = cluster.GetReplicas("7dbee3d628f04cc5d763b36e92b10533e627a1d0"); + ASSERT_FALSE(replica_node.IsOK()); + ASSERT_EQ(replica_node.Msg(), "The node isn't a master"); + + auto unknown_node = cluster.GetReplicas("1234567890"); + ASSERT_FALSE(unknown_node.IsOK()); + ASSERT_EQ(unknown_node.Msg(), "Invalid cluster node id"); +} diff --git a/tests/gocase/integration/cluster/cluster_test.go b/tests/gocase/integration/cluster/cluster_test.go index 3c4e27fe6a1..a26dc74ad89 100644 --- a/tests/gocase/integration/cluster/cluster_test.go +++ b/tests/gocase/integration/cluster/cluster_test.go @@ -26,9 +26,10 @@ import ( "testing" "time" - "github.com/apache/kvrocks/tests/gocase/util" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" + + "github.com/apache/kvrocks/tests/gocase/util" ) func TestDisableCluster(t *testing.T) { @@ -131,6 +132,61 @@ func TestClusterNodes(t *testing.T) { }) } +func TestClusterReplicas(t *testing.T) { + srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer srv.Close() + + ctx := context.Background() + rdb := srv.NewClient() + defer func() { require.NoError(t, rdb.Close()) }() + + nodes := "" + + master1ID := "bb2e5b3c5282086df51eff6b3e35519aede96fa6" + master1Node := fmt.Sprintf("%s %s %d master - 0-8191", master1ID, srv.Host(), srv.Port()) + nodes += master1Node + "\n" + + master2ID := "159dde1194ebf5bfc5a293dff839c3d1476f2a49" + master2Node := fmt.Sprintf("%s %s %d master - 8192-16383", master2ID, srv.Host(), srv.Port()) + nodes += master2Node + "\n" + + replica2ID := "7dbee3d628f04cc5d763b36e92b10533e627a1d0" + replica2Node := fmt.Sprintf("%s %s %d slave %s", replica2ID, srv.Host(), srv.Port(), master2ID) + nodes += replica2Node + + require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODES", nodes, "2").Err()) + require.EqualValues(t, "2", rdb.Do(ctx, "clusterx", "version").Val()) + + t.Run("with replicas", func(t *testing.T) { + replicas, err := rdb.Do(ctx, "cluster", "replicas", "159dde1194ebf5bfc5a293dff839c3d1476f2a49").Text() + require.NoError(t, err) + fields := strings.Split(replicas, " ") + require.Len(t, fields, 8) + require.Equal(t, fmt.Sprintf("%s@%d", srv.HostPort(), srv.Port()+10000), fields[1]) + require.Equal(t, "slave", fields[2]) + require.Equal(t, master2ID, fields[3]) + require.Equal(t, "connected\n", fields[7]) + }) + + t.Run("without replicas", func(t *testing.T) { + replicas, err := rdb.Do(ctx, "cluster", "replicas", "bb2e5b3c5282086df51eff6b3e35519aede96fa6").Text() + require.NoError(t, err) + require.Empty(t, replicas) + }) + + t.Run("send command to replica", func(t *testing.T) { + err := rdb.Do(ctx, "cluster", "replicas", "7dbee3d628f04cc5d763b36e92b10533e627a1d0").Err() + require.Error(t, err) + require.Contains(t, err.Error(), "The node isn't a master") + }) + + t.Run("unknown node", func(t *testing.T) { + err := rdb.Do(ctx, "cluster", "replicas", "unknown").Err() + require.Error(t, err) + require.Contains(t, err.Error(), "Invalid cluster node id") + }) +} + func TestClusterDumpAndLoadClusterNodesInfo(t *testing.T) { srv1 := util.StartServer(t, map[string]string{ "bind": "0.0.0.0",