Skip to content

Commit

Permalink
Fix wrongly try to rewrite the namespace in the cluster mode (#2221)
Browse files Browse the repository at this point in the history
This closes #2214

The namespace mechanism is NOT allowed in cluster mode, so it's
unnecessary to rewrite while the cluster mode is enabled. This
config rewrite behavior will cause the replication issue 
mentioned in #2214 when starting the cluster node.

The root cause is that the server will try to rewrite the namespace
into the rocksdb if the option `repl-namespace-enabled` is enabled.
So it will increase the server's rocksdb sequence and replication will
start with the wrong offset. We have checked if the role is a slave 
before rewriting, but the cluster replication is NOT set at that time(master-replica is good).

The good news is it only affects the cluster users who enabled
the option `repl-namespace-enabled`, so I guess almost no user
will do this since the namespace replication is meaningless to the cluster mode.

```
=== RUN   TestClusterReplication/Cluster_replication_should_work_normally_after_restart
    replication_test.go:88: 
        	Error Trace:	/Users/hulk/code/cxx/kvrocks/tests/gocase/integration/replication/replication_test.go:88
        	Error:      	Not equal: 
        	            	expected: "v1"
        	            	actual  : "v0"
```

And it works well after this patch.
  • Loading branch information
git-hulk committed Apr 5, 2024
1 parent bf5ba41 commit 7e1b797
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/server/namespace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ bool Namespace::IsAllowModify() const {

Status Namespace::LoadAndRewrite() {
auto config = storage_->GetConfig();
// Namespace is NOT allowed in the cluster mode, so we don't need to rewrite here.
if (config->cluster_enabled) return Status::OK();

// Load from the configuration file first
tokens_ = config->load_tokens;

Expand Down
3 changes: 3 additions & 0 deletions src/storage/redis_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
namespace redis {

rocksdb::Status PubSub::Publish(const Slice &channel, const Slice &value) {
if (storage_->GetConfig()->IsSlave()) {
return rocksdb::Status::NotSupported("can't publish to db in slave mode");
}
auto batch = storage_->GetWriteBatchBase();
batch->Put(pubsub_cf_handle_, channel, value);
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
Expand Down
3 changes: 3 additions & 0 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,9 @@ ObserverOrUniquePtr<rocksdb::WriteBatchBase> Storage::GetWriteBatchBase() {
}

Status Storage::WriteToPropagateCF(const std::string &key, const std::string &value) {
if (config_->IsSlave()) {
return {Status::NotOK, "cannot write to propagate column family in slave mode"};
}
auto batch = GetWriteBatchBase();
auto cf = GetCFHandle(kPropagateColumnFamilyName);
batch->Put(cf, key, value);
Expand Down
58 changes: 58 additions & 0 deletions tests/gocase/integration/replication/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,64 @@ import (
"github.com/stretchr/testify/require"
)

func TestClusterReplication(t *testing.T) {
ctx := context.Background()

masterSrv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
defer func() { masterSrv.Close() }()
masterClient := masterSrv.NewClient()
defer func() { require.NoError(t, masterClient.Close()) }()
masterNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterNodeID).Err())

replicaSrv := util.StartServer(t, map[string]string{
"cluster-enabled": "yes",
// enabled the replication namespace to reproduce the issue #2214
"repl-namespace-enabled": "yes",
})
defer func() { replicaSrv.Close() }()
replicaClient := replicaSrv.NewClient()
// allow to run the read-only command in the replica
require.NoError(t, replicaClient.ReadOnly(ctx).Err())
defer func() { require.NoError(t, replicaClient.Close()) }()
replicaNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODEID", replicaNodeID).Err())

clusterNodes := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383", masterNodeID, masterSrv.Port())
clusterNodes = fmt.Sprintf("%s\n%s 127.0.0.1 %d slave %s", clusterNodes, replicaNodeID, replicaSrv.Port(), masterNodeID)

require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())

t.Run("Cluster replication should work", func(t *testing.T) {
util.WaitForSync(t, replicaClient)
require.Equal(t, "slave", util.FindInfoEntry(replicaClient, "role"))
masterClient.Set(ctx, "k0", "v0", 0)
masterClient.LPush(ctx, "k1", "e0", "e1", "e2")
util.WaitForOffsetSync(t, masterClient, replicaClient)

require.Equal(t, "v0", replicaClient.Get(ctx, "k0").Val())
require.Equal(t, []string{"e2", "e1", "e0"}, replicaClient.LRange(ctx, "k1", 0, -1).Val())
})

t.Run("Cluster replication should work normally after restart(issue #2214)", func(t *testing.T) {
replicaSrv.Close()
masterClient.Set(ctx, "k0", "v1", 0)
masterClient.HSet(ctx, "k2", "f0", "v0", "f1", "v1")

// start the replica server again
replicaSrv.Start()
_ = replicaClient.Close()
replicaClient = replicaSrv.NewClient()
// allow to run the read-only command in the replica
require.NoError(t, replicaClient.ReadOnly(ctx).Err())

util.WaitForOffsetSync(t, masterClient, replicaClient)
require.Equal(t, "v1", replicaClient.Get(ctx, "k0").Val())
require.Equal(t, map[string]string{"f0": "v0", "f1": "v1"}, replicaClient.HGetAll(ctx, "k2").Val())
})
}

func TestReplicationWithHostname(t *testing.T) {
srvA := util.StartServer(t, map[string]string{})
defer srvA.Close()
Expand Down
3 changes: 3 additions & 0 deletions tests/gocase/util/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ func (s *KvrocksServer) close(keepDir bool) {

func (s *KvrocksServer) Restart() {
s.close(true)
s.Start()
}

func (s *KvrocksServer) Start() {
b := *binPath
require.NotEmpty(s.t, b, "please set the binary path by `-binPath`")
cmd := exec.Command(b)
Expand Down

0 comments on commit 7e1b797

Please sign in to comment.