diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index ed3930787fa..1244ea4a8fc 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -871,6 +871,8 @@ Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, cons fmt::format("MOVED {} {}:{}", slot, slots_nodes_[slot]->host, slots_nodes_[slot]->port)}; } +// Only HARD mode is meaningful to the Kvrocks cluster, +// so it will force clearing all information after resetting. Status Cluster::Reset() { if (srv_->slot_migrator && srv_->slot_migrator->GetMigratingSlot() != -1) { return {Status::NotOK, "Can't reset cluster while migrating slot"}; @@ -881,6 +883,10 @@ Status Cluster::Reset() { if (!srv_->storage->IsEmptyDB()) { return {Status::NotOK, "Can't reset cluster while database is not empty"}; } + if (srv_->IsSlave()) { + auto s = srv_->RemoveMaster(); + if (!s.IsOK()) return s; + } version_ = -1; size_ = 0; diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc index df0c45efcd0..e371e78f1cf 100644 --- a/src/commands/cmd_cluster.cc +++ b/src/commands/cmd_cluster.cc @@ -32,10 +32,17 @@ class CommandCluster : public Commander { Status Parse(const std::vector &args) override { subcommand_ = util::ToLower(args[1]); - if (args.size() == 2 && - (subcommand_ == "nodes" || subcommand_ == "slots" || subcommand_ == "info" || subcommand_ == "reset")) + if (args.size() == 2 && (subcommand_ == "nodes" || subcommand_ == "slots" || subcommand_ == "info")) return Status::OK(); + // CLUSTER RESET [HARD|SOFT] + if (subcommand_ == "reset" && (args_.size() == 2 || args_.size() == 3)) { + if (args_.size() == 3 && !util::EqualICase(args_[2], "hard") && !util::EqualICase(args_[2], "soft")) { + return {Status::RedisParseErr, errInvalidSyntax}; + } + return Status::OK(); + } + if (subcommand_ == "keyslot" && args_.size() == 3) return Status::OK(); if (subcommand_ == "import") { diff --git a/tests/gocase/integration/cluster/cluster_test.go b/tests/gocase/integration/cluster/cluster_test.go index a26dc74ad89..fad7d0128f7 100644 --- a/tests/gocase/integration/cluster/cluster_test.go +++ b/tests/gocase/integration/cluster/cluster_test.go @@ -473,33 +473,51 @@ func TestClusterReset(t *testing.T) { id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err()) + srv2 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { srv2.Close() }() + rdb2 := srv2.NewClientWithOption(&redis.Options{PoolSize: 1}) + defer func() { require.NoError(t, rdb2.Close()) }() + id2 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx02" + require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err()) + clusterNodes := fmt.Sprintf("%s %s %d master - 0-8191\n", id0, srv0.Host(), srv0.Port()) - clusterNodes += fmt.Sprintf("%s %s %d master - 8192-16383", id1, srv1.Host(), srv1.Port()) + clusterNodes += fmt.Sprintf("%s %s %d master - 8192-16383\n", id1, srv1.Host(), srv1.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", id2, srv2.Host(), srv2.Port(), id1) require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, rdb2.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) t.Run("cannot reset cluster if the db is not empty", func(t *testing.T) { key := util.SlotTable[0] require.NoError(t, rdb0.Set(ctx, key, "value", 0).Err()) - require.Contains(t, rdb0.Do(ctx, "cluster", "reset").Err(), "Can't reset cluster while database is not empty") + require.Contains(t, rdb0.ClusterResetHard(ctx).Err(), "Can't reset cluster while database is not empty") require.NoError(t, rdb0.Del(ctx, key).Err()) - require.NoError(t, rdb0.Do(ctx, "cluster", "reset").Err()) + require.NoError(t, rdb0.ClusterResetSoft(ctx).Err()) require.EqualValues(t, "-1", rdb0.Do(ctx, "clusterx", "version").Val()) // reset the cluster topology to avoid breaking other test cases require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) }) + t.Run("replica should become master after reset", func(t *testing.T) { + require.Eventually(t, func() bool { + return util.FindInfoEntry(rdb2, "role") == "slave" + }, 5*time.Second, 50*time.Millisecond) + require.NoError(t, rdb2.ClusterResetHard(ctx).Err()) + require.Equal(t, "master", util.FindInfoEntry(rdb2, "role")) + require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + }) + t.Run("cannot reset cluster if the db is importing the slot", func(t *testing.T) { slotNum := 1 require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import", slotNum, 0).Val()) clusterInfo := rdb1.ClusterInfo(ctx).Val() require.Contains(t, clusterInfo, "importing_slot: 1") require.Contains(t, clusterInfo, "import_state: start") - require.Contains(t, rdb1.Do(ctx, "cluster", "reset").Err(), "Can't reset cluster while importing slot") + require.Contains(t, rdb1.ClusterResetHard(ctx).Err(), "Can't reset cluster while importing slot") require.Equal(t, "OK", rdb1.Do(ctx, "cluster", "import", slotNum, 1).Val()) clusterInfo = rdb1.ClusterInfo(ctx).Val() require.Contains(t, clusterInfo, "import_state: success") - require.NoError(t, rdb0.Do(ctx, "cluster", "reset").Err()) + require.NoError(t, rdb0.ClusterResetHard(ctx).Err()) require.EqualValues(t, "-1", rdb0.Do(ctx, "clusterx", "version").Val()) // reset the cluster topology to avoid breaking other test cases require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) @@ -517,7 +535,7 @@ func TestClusterReset(t *testing.T) { clusterInfo := rdb0.ClusterInfo(ctx).Val() require.Contains(t, clusterInfo, "migrating_slot: 2") require.Contains(t, clusterInfo, "migrating_state: start") - require.Contains(t, rdb0.Do(ctx, "cluster", "reset").Err(), "Can't reset cluster while migrating slot") + require.Contains(t, rdb0.ClusterResetHard(ctx).Err(), "Can't reset cluster while migrating slot") // wait for the migration to finish require.Eventually(t, func() bool { @@ -528,7 +546,7 @@ func TestClusterReset(t *testing.T) { // the keys are removed from the source node right now. require.NoError(t, rdb0.FlushAll(ctx).Err()) - require.NoError(t, rdb0.Do(ctx, "cluster", "reset").Err()) + require.NoError(t, rdb0.ClusterResetHard(ctx).Err()) require.EqualValues(t, "-1", rdb0.Do(ctx, "clusterx", "version").Val()) // reset the cluster topology to avoid breaking other test cases require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())