Skip to content

Commit

Permalink
Fix the replica should remove the master link when receiving the CLUS…
Browse files Browse the repository at this point in the history
…TER RESET command (#2259)

This PR also allows using SOFT|HARD in the CLUSTER RESET command,
but it has the same meaning in the Kvrocks cluster since keeping
the cluster's current epoch(version) is meaningless to it.
  • Loading branch information
git-hulk committed Apr 20, 2024
1 parent 1b18b79 commit 3dc16f3
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 9 deletions.
6 changes: 6 additions & 0 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,8 @@ Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, cons
fmt::format("MOVED {} {}:{}", slot, slots_nodes_[slot]->host, slots_nodes_[slot]->port)};
}

// Only HARD mode is meaningful to the Kvrocks cluster,
// so it will force clearing all information after resetting.
Status Cluster::Reset() {
if (srv_->slot_migrator && srv_->slot_migrator->GetMigratingSlot() != -1) {
return {Status::NotOK, "Can't reset cluster while migrating slot"};
Expand All @@ -881,6 +883,10 @@ Status Cluster::Reset() {
if (!srv_->storage->IsEmptyDB()) {
return {Status::NotOK, "Can't reset cluster while database is not empty"};
}
if (srv_->IsSlave()) {
auto s = srv_->RemoveMaster();
if (!s.IsOK()) return s;
}

version_ = -1;
size_ = 0;
Expand Down
11 changes: 9 additions & 2 deletions src/commands/cmd_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,17 @@ class CommandCluster : public Commander {
Status Parse(const std::vector<std::string> &args) override {
subcommand_ = util::ToLower(args[1]);

if (args.size() == 2 &&
(subcommand_ == "nodes" || subcommand_ == "slots" || subcommand_ == "info" || subcommand_ == "reset"))
if (args.size() == 2 && (subcommand_ == "nodes" || subcommand_ == "slots" || subcommand_ == "info"))
return Status::OK();

// CLUSTER RESET [HARD|SOFT]
if (subcommand_ == "reset" && (args_.size() == 2 || args_.size() == 3)) {
if (args_.size() == 3 && !util::EqualICase(args_[2], "hard") && !util::EqualICase(args_[2], "soft")) {
return {Status::RedisParseErr, errInvalidSyntax};
}
return Status::OK();
}

if (subcommand_ == "keyslot" && args_.size() == 3) return Status::OK();

if (subcommand_ == "import") {
Expand Down
32 changes: 25 additions & 7 deletions tests/gocase/integration/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,33 +473,51 @@ func TestClusterReset(t *testing.T) {
id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())

srv2 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
defer func() { srv2.Close() }()
rdb2 := srv2.NewClientWithOption(&redis.Options{PoolSize: 1})
defer func() { require.NoError(t, rdb2.Close()) }()
id2 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx02"
require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())

clusterNodes := fmt.Sprintf("%s %s %d master - 0-8191\n", id0, srv0.Host(), srv0.Port())
clusterNodes += fmt.Sprintf("%s %s %d master - 8192-16383", id1, srv1.Host(), srv1.Port())
clusterNodes += fmt.Sprintf("%s %s %d master - 8192-16383\n", id1, srv1.Host(), srv1.Port())
clusterNodes += fmt.Sprintf("%s %s %d slave %s", id2, srv2.Host(), srv2.Port(), id1)
require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
require.NoError(t, rdb2.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())

t.Run("cannot reset cluster if the db is not empty", func(t *testing.T) {
key := util.SlotTable[0]
require.NoError(t, rdb0.Set(ctx, key, "value", 0).Err())
require.Contains(t, rdb0.Do(ctx, "cluster", "reset").Err(), "Can't reset cluster while database is not empty")
require.Contains(t, rdb0.ClusterResetHard(ctx).Err(), "Can't reset cluster while database is not empty")
require.NoError(t, rdb0.Del(ctx, key).Err())
require.NoError(t, rdb0.Do(ctx, "cluster", "reset").Err())
require.NoError(t, rdb0.ClusterResetSoft(ctx).Err())
require.EqualValues(t, "-1", rdb0.Do(ctx, "clusterx", "version").Val())
// reset the cluster topology to avoid breaking other test cases
require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
})

t.Run("replica should become master after reset", func(t *testing.T) {
require.Eventually(t, func() bool {
return util.FindInfoEntry(rdb2, "role") == "slave"
}, 5*time.Second, 50*time.Millisecond)
require.NoError(t, rdb2.ClusterResetHard(ctx).Err())
require.Equal(t, "master", util.FindInfoEntry(rdb2, "role"))
require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
})

t.Run("cannot reset cluster if the db is importing the slot", func(t *testing.T) {
slotNum := 1
require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import", slotNum, 0).Val())
clusterInfo := rdb1.ClusterInfo(ctx).Val()
require.Contains(t, clusterInfo, "importing_slot: 1")
require.Contains(t, clusterInfo, "import_state: start")
require.Contains(t, rdb1.Do(ctx, "cluster", "reset").Err(), "Can't reset cluster while importing slot")
require.Contains(t, rdb1.ClusterResetHard(ctx).Err(), "Can't reset cluster while importing slot")
require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import", slotNum, 1).Val())
clusterInfo = rdb1.ClusterInfo(ctx).Val()
require.Contains(t, clusterInfo, "import_state: success")
require.NoError(t, rdb0.Do(ctx, "cluster", "reset").Err())
require.NoError(t, rdb0.ClusterResetHard(ctx).Err())
require.EqualValues(t, "-1", rdb0.Do(ctx, "clusterx", "version").Val())
// reset the cluster topology to avoid breaking other test cases
require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
Expand All @@ -517,7 +535,7 @@ func TestClusterReset(t *testing.T) {
clusterInfo := rdb0.ClusterInfo(ctx).Val()
require.Contains(t, clusterInfo, "migrating_slot: 2")
require.Contains(t, clusterInfo, "migrating_state: start")
require.Contains(t, rdb0.Do(ctx, "cluster", "reset").Err(), "Can't reset cluster while migrating slot")
require.Contains(t, rdb0.ClusterResetHard(ctx).Err(), "Can't reset cluster while migrating slot")

// wait for the migration to finish
require.Eventually(t, func() bool {
Expand All @@ -528,7 +546,7 @@ func TestClusterReset(t *testing.T) {
// the keys are removed from the source node right now.
require.NoError(t, rdb0.FlushAll(ctx).Err())

require.NoError(t, rdb0.Do(ctx, "cluster", "reset").Err())
require.NoError(t, rdb0.ClusterResetHard(ctx).Err())
require.EqualValues(t, "-1", rdb0.Do(ctx, "clusterx", "version").Val())
// reset the cluster topology to avoid breaking other test cases
require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
Expand Down

0 comments on commit 3dc16f3

Please sign in to comment.