Skip to content

Commit

Permalink
Fix should use the minimum compatible RDB version when dumping the pa…
Browse files Browse the repository at this point in the history
…yload (#2252)

Currently, we're using the maximum RDB version(12) when dumping the payload which is not allowed in the old Redis versions(before Redis 7), so it will throw the error:

```
127.0.0.1:6379> RESTORE a 0 "\x00\xc0{\x0c\x00\x83\x94g!\xfaP\xf9\xf0"
(error) ERR DUMP payload version or checksum are wrong
```

This PR changes the payload's RDB version to 6 to make it work with the old Redis versions. And after applying this PR, it works well in Redis 4/6:

```
127.0.0.1:6379> RESTORE a 0 "\x00\xc0{\x06\x00\xde\x0f;a\xf5/[*"
OK
127.0.0.1:6379> get a
"123"

127.0.0.1:6379> RESTORE a 0 "\x00\xc0{\x06\x00\xde\x0f;a\xf5/[*"
OK
127.0.0.1:6379> get a
"123"
```

Fixes #2251
  • Loading branch information
git-hulk committed Apr 16, 2024
1 parent aa7c745 commit 4ecb35b
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 25 deletions.
8 changes: 5 additions & 3 deletions src/storage/rdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -700,9 +700,11 @@ Status RDB::Dump(const std::string &key, const RedisType type) {
* RDB version and CRC are both in little endian.
*/

/* RDB version */
buf[0] = MaxRDBVersion & 0xff;
buf[1] = (MaxRDBVersion >> 8) & 0xff;
// We should choose the minimum RDB version for compatibility consideration.
// For the current DUMP implementation, it was supported since from the Redis 2.6,
// so we choose the RDB version of Redis 2.6 as the minimum version.
buf[0] = MinRDBVersion & 0xff;
buf[1] = (MinRDBVersion >> 8) & 0xff;
s = stream_->Write((const char *)buf, 2);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
Expand Down
4 changes: 4 additions & 0 deletions src/storage/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ constexpr const int QuickListNodeContainerPlain = 1;
constexpr const int QuickListNodeContainerPacked = 2;

constexpr const int MaxRDBVersion = 12; // The current max rdb version supported by redis.
// Min Redis RDB version supported by Kvrocks, we choose 6 because it's the first version
// that supports the DUMP command.
constexpr int MinRDBVersion = 6;

class RdbStream;

using RedisObjValue =
Expand Down
83 changes: 61 additions & 22 deletions tests/gocase/unit/dump/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package dump

import (
"context"
"fmt"
"testing"

"github.com/apache/kvrocks/tests/gocase/util"
Expand All @@ -36,11 +37,20 @@ func TestDump_String(t *testing.T) {
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()

require.NoError(t, rdb.Del(ctx, "dump_test_key1", "dump_test_key2").Err())
require.NoError(t, rdb.Set(ctx, "dump_test_key1", "hello,world!", 0).Err())
require.NoError(t, rdb.Set(ctx, "dump_test_key2", "654321", 0).Err())
require.Equal(t, "\x00\x0chello,world!\x0c\x00N~\xe6\xc8\xd38h\x17", rdb.Dump(ctx, "dump_test_key1").Val())
require.Equal(t, "\x00\xc2\xf1\xfb\t\x00\x0c\x00gSN\xfd\xf2y\xa2\x9d", rdb.Dump(ctx, "dump_test_key2").Val())
keyValues := map[string]string{
"test_string_key0": "hello,world!",
"test_string_key1": "654321",
}
for key, value := range keyValues {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Set(ctx, key, value, 0).Err())
serialized, err := rdb.Dump(ctx, key).Result()
require.NoError(t, err)

restoredKey := fmt.Sprintf("restore_%s", key)
require.NoError(t, rdb.RestoreReplace(ctx, restoredKey, 0, serialized).Err())
require.Equal(t, value, rdb.Get(ctx, restoredKey).Val())
}
}

func TestDump_Hash(t *testing.T) {
Expand All @@ -51,9 +61,21 @@ func TestDump_Hash(t *testing.T) {
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()

require.NoError(t, rdb.Del(ctx, "dump_test_key1").Err())
require.NoError(t, rdb.HMSet(ctx, "dump_test_key1", "name", "redis tutorial", "description", "redis basic commands for caching", "likes", 20, "visitors", 23000).Err())
require.Equal(t, "\x04\x04\x0bdescription redis basic commands for caching\x05likes\xc0\x14\x04name\x0eredis tutorial\bvisitors\xc1\xd8Y\x0c\x008\x96\xa68b\xebuQ", rdb.Dump(ctx, "dump_test_key1").Val())
key := "test_hash_key"
fields := map[string]string{
"name": "redis tutorial",
"description": "redis basic commands for caching",
"likes": "20",
"visitors": "23000",
}
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.HMSet(ctx, key, fields).Err())
serialized, err := rdb.Dump(ctx, key).Result()
require.NoError(t, err)

restoredKey := fmt.Sprintf("restore_%s", key)
require.NoError(t, rdb.RestoreReplace(ctx, restoredKey, 0, serialized).Err())
require.EqualValues(t, fields, rdb.HGetAll(ctx, restoredKey).Val())
}

func TestDump_ZSet(t *testing.T) {
Expand All @@ -64,10 +86,17 @@ func TestDump_ZSet(t *testing.T) {
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()

zMember := []redis.Z{{Member: "kvrocks1", Score: 1}, {Member: "kvrocks2", Score: 2}, {Member: "kvrocks3", Score: 3}}
require.NoError(t, rdb.Del(ctx, "dump_test_key1").Err())
require.NoError(t, rdb.ZAdd(ctx, "dump_test_key1", zMember...).Err())
require.Equal(t, "\x05\x03\bkvrocks3\x00\x00\x00\x00\x00\x00\b@\bkvrocks2\x00\x00\x00\x00\x00\x00\x00@\bkvrocks1\x00\x00\x00\x00\x00\x00\xf0?\x0c\x00L_7\xd3\xd4\xc9\xf4\xe4", rdb.Dump(ctx, "dump_test_key1").Val())
memberScores := []redis.Z{{Member: "kvrocks1", Score: 1}, {Member: "kvrocks2", Score: 2}, {Member: "kvrocks3", Score: 3}}
key := "test_zset_key"
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.ZAdd(ctx, key, memberScores...).Err())
serialized, err := rdb.Dump(ctx, key).Result()
require.NoError(t, err)

restoredKey := fmt.Sprintf("restore_%s", key)
require.NoError(t, rdb.RestoreReplace(ctx, restoredKey, 0, serialized).Err())

require.EqualValues(t, memberScores, rdb.ZRangeWithScores(ctx, restoredKey, 0, -1).Val())
}

func TestDump_List(t *testing.T) {
Expand All @@ -78,11 +107,16 @@ func TestDump_List(t *testing.T) {
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()

require.NoError(t, rdb.Del(ctx, "dump_test_key1").Err())
require.NoError(t, rdb.RPush(ctx, "dump_test_key1", "kvrocks1").Err())
require.NoError(t, rdb.RPush(ctx, "dump_test_key1", "kvrocks2").Err())
require.NoError(t, rdb.RPush(ctx, "dump_test_key1", "kvrocks3").Err())
require.Equal(t, "\x12\x03\x01\bkvrocks1\x01\bkvrocks2\x01\bkvrocks3\x0c\x00\xa8\xf9S\x986\x98\xaf\xcd", rdb.Dump(ctx, "dump_test_key1").Val())
elements := []string{"kvrocks1", "kvrocks2", "kvrocks3"}
key := "test_list_key"
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.RPush(ctx, key, elements).Err())
serialized, err := rdb.Dump(ctx, key).Result()
require.NoError(t, err)

restoredKey := fmt.Sprintf("restore_%s", key)
require.NoError(t, rdb.RestoreReplace(ctx, restoredKey, 0, serialized).Err())
require.EqualValues(t, elements, rdb.LRange(ctx, restoredKey, 0, -1).Val())
}

func TestDump_Set(t *testing.T) {
Expand All @@ -93,9 +127,14 @@ func TestDump_Set(t *testing.T) {
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()

require.NoError(t, rdb.Del(ctx, "dump_test_key1").Err())
require.NoError(t, rdb.SAdd(ctx, "dump_test_key1", "kvrocks1").Err())
require.NoError(t, rdb.SAdd(ctx, "dump_test_key1", "kvrocks2").Err())
require.NoError(t, rdb.SAdd(ctx, "dump_test_key1", "kvrocks3").Err())
require.Equal(t, "\x02\x03\bkvrocks1\bkvrocks2\bkvrocks3\x0c\x00\xfdP\xc9\x95sS\x87\x18", rdb.Dump(ctx, "dump_test_key1").Val())
members := []string{"kvrocks1", "kvrocks2", "kvrocks3"}
key := "test_set_key"
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.SAdd(ctx, key, members).Err())
serialized, err := rdb.Dump(ctx, key).Result()
require.NoError(t, err)

restoredKey := fmt.Sprintf("restore_%s", key)
require.NoError(t, rdb.RestoreReplace(ctx, restoredKey, 0, serialized).Err())
require.ElementsMatch(t, members, rdb.SMembers(ctx, restoredKey).Val())
}

0 comments on commit 4ecb35b

Please sign in to comment.