Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support redis74 hash with ttl #71

Merged
merged 3 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/RDBCli/Callbacks/KeysOnlyCallback.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void HandleModuleData(byte[] key, ulong opCode, byte[] data)
{
}

public void HSet(byte[] key, byte[] field, byte[] value)
public void HSet(byte[] key, byte[] field, byte[] value, long expiry = 0)
{
}

Expand Down
2 changes: 1 addition & 1 deletion src/RDBCli/Callbacks/MemoryCallback.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void HandleModuleData(byte[] key, ulong opCode, byte[] data)
{
}

public void HSet(byte[] key, byte[] field, byte[] value)
public void HSet(byte[] key, byte[] field, byte[] value, long expiry = 0)
{
var lenOfElem = ElementLength(field) + ElementLength(value);
if (lenOfElem > _currentRecord.LenOfLargestElem)
Expand Down
25 changes: 22 additions & 3 deletions src/RDBParser/BinaryReaderRDBParser.Base.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,11 @@ public void ReadObject(BinaryReader br, byte[] key, int encType, long expiry, In
{
ReadStream(br, encType);
}
else if (encType == Constant.DataType.HASH_LISTPACK)
else if (encType == Constant.DataType.HASH_LISTPACK
|| encType == Constant.DataType.HASH_LISTPACK_EX
|| encType == Constant.DataType.HASH_LISTPACK_EX_PRE_GA)
{
ReadHashFromListPack(br);
ReadHashFromListPack(br, encType);
}
else if (encType == Constant.DataType.ZSET_LISTPACK)
{
Expand All @@ -125,6 +127,11 @@ public void ReadObject(BinaryReader br, byte[] key, int encType, long expiry, In
{
ReadSetFromListPack(br);
}
else if(encType == Constant.DataType.HASH_METADATA_PRE_GA
|| encType == Constant.DataType.HASH_METADATA)
{
ReadHashMetadata(br, encType);
}
else
{
throw new RDBParserException($"Invalid object type {encType} for {key} ");
Expand Down Expand Up @@ -205,8 +212,15 @@ private void SkipObject(BinaryReader br, int encType)
{
SkipStream(br, encType);
}
else if (encType == Constant.DataType.HASH_LISTPACK)
else if (encType == Constant.DataType.HASH_LISTPACK
|| encType == Constant.DataType.HASH_LISTPACK_EX
|| encType == Constant.DataType.HASH_LISTPACK_EX_PRE_GA)
{
if (encType == Constant.DataType.HASH_LISTPACK_EX)
{
_ = br.ReadUInt64();
}

skip = 1;
}
else if (encType == Constant.DataType.ZSET_LISTPACK)
Expand All @@ -217,6 +231,11 @@ private void SkipObject(BinaryReader br, int encType)
{
skip = 1;
}
else if (encType == Constant.DataType.HASH_METADATA_PRE_GA
|| encType == Constant.DataType.HASH_METADATA)
{
SkipHashMetadata(br, encType);
}
else
{
throw new RDBParserException($"Invalid object type {encType} for {_key} ");
Expand Down
72 changes: 72 additions & 0 deletions src/RDBParser/BinaryReaderRDBParser.HashMetadata.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
using System.IO;

namespace RDBParser
{
public partial class BinaryReaderRDBParser : IRDBParser
{
private const ulong EB_EXPIRE_TIME_MAX = 0x0000FFFFFFFFFFFF;
private const ulong EB_EXPIRE_TIME_INVALID = EB_EXPIRE_TIME_MAX + 1;

private void ReadHashMetadata(BinaryReader br, int encType)
{
// https://github.com/redis/redis/blob/c9d29f6a918c335bc1778d9f68e521c1bbb36a0f/src/rdb.c#L2265
ulong minExpire = EB_EXPIRE_TIME_INVALID;
if (encType == Constant.DataType.HASH_METADATA)
{
minExpire = br.ReadUInt64();
}

var len = br.ReadLength();

Info info = new Info();
info.Idle = _idle;
info.Freq = _freq;
info.Encoding = Constant.ObjEncoding.HT;
info.SizeOfValue = (int)len;
_callback.StartHash(_key, (int)len, _expiry, info);

ulong expireAt = 0;
while (len > 0)
{
var ttl = br.ReadLength();

if (encType == Constant.DataType.HASH_METADATA)
{
expireAt = ttl != 0 ? ttl + minExpire - 1 : 0;
}
else
{
expireAt = ttl;
}

var field = br.ReadStr();
var value = br.ReadStr();

_callback.HSet(_key, field, value, (long)expireAt);

len--;
}

_callback.EndHash(_key);
}

private void SkipHashMetadata(BinaryReader br, int encType)
{
// https://github.com/redis/redis/blob/c9d29f6a918c335bc1778d9f68e521c1bbb36a0f/src/rdb.c#L2265
if (encType == Constant.DataType.HASH_METADATA)
{
_ = br.ReadUInt64();
}

var len = br.ReadLength();
while (len > 0)
{
_ = br.ReadLength();
_ = br.ReadStr();
_ = br.ReadStr();

len--;
}
}
}
}
32 changes: 26 additions & 6 deletions src/RDBParser/BinaryReaderRDBParser.ListPack.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,14 @@ public partial class BinaryReaderRDBParser
}
}

private void ReadHashFromListPack(BinaryReader br)
private void ReadHashFromListPack(BinaryReader br, int encType)
{
// If Hash TTLs, Load next/min expiration time before the `encoded`
if (encType == Constant.DataType.HASH_LISTPACK_EX)
{
_ = br.ReadUInt64();
}

// https://github.com/redis/redis/blob/7.0.8/src/rdb.c#L1703
// https://github.com/redis/redis/blob/7.0.8/src/listpack.c#L1282
// <total_bytes><size><entry><entry>..<entry><end>
Expand All @@ -164,24 +170,38 @@ private void ReadHashFromListPack(BinaryReader br)
var bytes = lpGetTotalBytes(rd);
// <size>
var numEle = lpGetNumElements(rd);
if (numEle % 2 != 0) throw new RDBParserException($"Expected even number of elements, but found {numEle} for key {_key}");

var numEntries = (ushort)(numEle / 2);
var isListpackEx = encType == Constant.DataType.HASH_LISTPACK_EX || encType == Constant.DataType.HASH_LISTPACK_EX_PRE_GA;
var tupleLen = isListpackEx ? 3 : 2;

if (numEle % tupleLen != 0) throw new RDBParserException($"Unexpected number of elements, isListpackEx = {isListpackEx}, found {numEle} for key {_key}");
var numEntries = (ushort)(numEle / tupleLen);


Info info = new Info();
info.Idle = _idle;
info.Freq = _freq;
info.Encoding = Constant.ObjEncoding.LISTPACK;
info.Encoding = isListpackEx ? Constant.ObjEncoding.LISTPACK_EX : Constant.ObjEncoding.LISTPACK;
info.SizeOfValue = rawString.Length;
_callback.StartHash(_key, numEntries, _expiry, info);

// <entry>
// simple hash field-value pair
// hash with TTL field-value-ttl pair
for (int i = 0; i < numEntries; i++)
{
// <encode><val><backlen>
var field = ReadListPackEntry(rd);
var value = ReadListPackEntry(rd);
_callback.HSet(_key, field.data, value.data);

long ttl = 0;
if(isListpackEx)
{
var tmp = ReadListPackEntry(rd);
ttl = long.Parse(Encoding.UTF8.GetString(tmp.data));
}

_callback.HSet(_key, field.data, value.data, ttl);
}

var lpEnd = rd.ReadByte();
Expand Down
4 changes: 2 additions & 2 deletions src/RDBParser/Callbacks/DefaultConsoleReaderCallBack.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ public void HandleModuleData(byte[] key, ulong opCode, byte[] data)
Console.WriteLine($"HandleModuleData, Key={GetString(key)}, opCode={opCode}, data={GetString(data)}");
}

public void HSet(byte[] key, byte[] field, byte[] value)
public void HSet(byte[] key, byte[] field, byte[] value, long expiry = 0)
{
Console.WriteLine($"HSet, Key={GetString(key)}, Field={GetString(field)}, Value={GetString(value)}");
Console.WriteLine($"HSet, Key={GetString(key)}, Field={GetString(field)}, Value={GetString(value)}, expire={expiry}");
}

public void RPush(byte[] key, byte[] value)
Expand Down
3 changes: 2 additions & 1 deletion src/RDBParser/Callbacks/IReaderCallback.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public interface IReaderCallback
/// <param name="key">the redis key for this hash</param>
/// <param name="field">a string</param>
/// <param name="value">the value to store for this field</param>
void HSet(byte[] key, byte[] field, byte[] value);
/// <param name="expiry">a `datetime` object. 0 means the object does not expire, work for >7.4</param>
void HSet(byte[] key, byte[] field, byte[] value, long expiry = 0);

/// <summary>
/// Called when there are no more elements in the hash
Expand Down
2 changes: 1 addition & 1 deletion src/RDBParser/Callbacks/NoOpReaderCallBack.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void HandleModuleData(byte[] key, ulong opCode, byte[] data)
{
}

public void HSet(byte[] key, byte[] field, byte[] value)
public void HSet(byte[] key, byte[] field, byte[] value, long expiry = 0)
{
}

Expand Down
6 changes: 4 additions & 2 deletions src/RDBParser/Constant.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ public static class RdbVersion

// Redis 6.2.14 9
// Redis 7.0-rc1~~7.0.15 10
// Redis 7.2-rc1~~7.4.0 11
// Redis 7.2-rc1~~7.2.5 11
// Valkey 7.2.4-rc1~~8.0.0 11
public const int Max = 11;
// Redis 7.4-rc1~~7.4.0 12
public const int Max = 12;
}

public static class MagicCount
Expand Down Expand Up @@ -154,6 +155,7 @@ public static class ObjEncoding
public const string SKIPLIST = "skiplist";
public const string QUICKLIST = "quicklist";
public const string LISTPACK = "listpack";
public const string LISTPACK_EX = "listpack_ex";
public const string LINKEDLIST = "linkedlist";
}
}
Expand Down
58 changes: 58 additions & 0 deletions tests/RDBParserTests/HashTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using RDBParser;
using System.Collections.Generic;
using System.Text;
using Xunit;
using Xunit.Abstractions;
Expand Down Expand Up @@ -161,5 +162,62 @@ public void TestHashWithRedis70ListPack()
Assert.Equal(Encoding.UTF8.GetBytes("pppppppppppppppppppppppppppppp"), hashs[0][key][Encoding.UTF8.GetBytes("1234566")]);
Assert.Equal(Encoding.UTF8.GetBytes("2.60"), hashs[0][key][Encoding.UTF8.GetBytes("abc")]);
}

[Fact]
public void TestHashWithRedis74ListPackEx()
{
// hset mykey f1 v1 f2 v2
// hexpire mykey 3000 FIELDS 1 f1
var path = TestHelper.GetRDBPath("redis74_hash_ttl_listpack_ex.rdb");

var callback = new TestReaderCallback(_output);
var parser = new BinaryReaderRDBParser(callback);
parser.Parse(path);

var lengths = callback.GetLengths();
var hashs = callback.GetHashs();
var expires = callback.GetExpiries();

Assert.Equal(2, lengths[0][Encoding.UTF8.GetBytes("mykey")]);
var key = Encoding.UTF8.GetBytes("mykey");

Assert.Equal(Encoding.UTF8.GetBytes("v1"), hashs[0][key][Encoding.UTF8.GetBytes("f1")]);
Assert.Equal(Encoding.UTF8.GetBytes("v2"), hashs[0][key][Encoding.UTF8.GetBytes("f2")]);

var expiryKey = new byte[key.Length + Encoding.UTF8.GetBytes("f1").Length];
key.CopyTo(expiryKey, 0);
Encoding.UTF8.GetBytes("f1").CopyTo(expiryKey, key.Length);
var f1Exp = expires[0][expiryKey];
Assert.True(f1Exp > 0);
}

[Fact]
public void TestHashWithRedis74HashMetadata()
{
// config set hash-max-listpack-entries 0
// hset mykey f1 v1 f2 v2
// hexpire mykey 3000 FIELDS 1 f1
var path = TestHelper.GetRDBPath("redis74_hash_ttl_hash_metadata.rdb");

var callback = new TestReaderCallback(_output);
var parser = new BinaryReaderRDBParser(callback);
parser.Parse(path);

var lengths = callback.GetLengths();
var hashs = callback.GetHashs();
var expires = callback.GetExpiries();

Assert.Equal(2, lengths[0][Encoding.UTF8.GetBytes("mykey")]);
var key = Encoding.UTF8.GetBytes("mykey");

Assert.Equal(Encoding.UTF8.GetBytes("v1"), hashs[0][key][Encoding.UTF8.GetBytes("f1")]);
Assert.Equal(Encoding.UTF8.GetBytes("v2"), hashs[0][key][Encoding.UTF8.GetBytes("f2")]);

var expiryKey = new byte[key.Length + Encoding.UTF8.GetBytes("f1").Length];
key.CopyTo(expiryKey, 0);
Encoding.UTF8.GetBytes("f1").CopyTo(expiryKey, key.Length);
var f1Exp = expires[0][expiryKey];
Assert.True(f1Exp > 0);
}
}
}
11 changes: 10 additions & 1 deletion tests/RDBParserTests/TestReaderCallback.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,24 @@ public void HandleModuleData(byte[] key, ulong opCode, byte[] data)
//throw new System.NotImplementedException();
}

public void HSet(byte[] key, byte[] field, byte[] value)
public void HSet(byte[] key, byte[] field, byte[] value, long expiry = 0)
{
if (!_hashs[_database].ContainsKey(key))
throw new System.Exception("0");

_output.WriteLine(Encoding.UTF8.GetString(key));
_output.WriteLine(Encoding.UTF8.GetString(field));
_output.WriteLine(Encoding.UTF8.GetString(value));
_output.WriteLine(expiry.ToString());
_hashs[_database][key][field] = value;

if(expiry > 0)
{
var expiryKey = new byte[key.Length + field.Length];
key.CopyTo(expiryKey, 0);
field.CopyTo(expiryKey, key.Length);
_expiries[_database][expiryKey] = expiry;
}
}

public void RPush(byte[] key, byte[] value)
Expand Down
Binary file added tests/dumps/redis74_hash_ttl_hash_metadata.rdb
Binary file not shown.
Binary file added tests/dumps/redis74_hash_ttl_listpack_ex.rdb
Binary file not shown.