From d7ba809022bc2416cd924db246aed24585d1f605 Mon Sep 17 00:00:00 2001 From: Catcher Wong Date: Tue, 1 Oct 2024 08:08:53 +0800 Subject: [PATCH] Support Redis 72 stream with RDB_TYPE_STREAM_LISTPACKS_3 (#70) * support stream listpack3 21 Signed-off-by: catcherwong * fix bugs and add test for 72 stream Signed-off-by: catcherwong --------- Signed-off-by: catcherwong --- src/RDBParser/BinaryReaderRDBParser.Base.cs | 3 +- src/RDBParser/BinaryReaderRDBParser.Stream.cs | 22 +++++- .../Callbacks/Models/StreamConsumerEntity.cs | 6 +- tests/RDBParserTests/StreamTests.cs | 71 ++++++++++++++++++ tests/dumps/redis_72_stream.rdb | Bin 0 -> 273 bytes tests/dumps/redis_72_stream_for_skip.rdb | Bin 0 -> 273 bytes 6 files changed, 96 insertions(+), 6 deletions(-) create mode 100644 tests/dumps/redis_72_stream.rdb create mode 100644 tests/dumps/redis_72_stream_for_skip.rdb diff --git a/src/RDBParser/BinaryReaderRDBParser.Base.cs b/src/RDBParser/BinaryReaderRDBParser.Base.cs index d7bb1ae..a794314 100644 --- a/src/RDBParser/BinaryReaderRDBParser.Base.cs +++ b/src/RDBParser/BinaryReaderRDBParser.Base.cs @@ -200,7 +200,8 @@ private void SkipObject(BinaryReader br, int encType) SkipModule(br); } else if (encType == Constant.DataType.STREAM_LISTPACKS - || encType == Constant.DataType.STREAM_LISTPACKS_2) + || encType == Constant.DataType.STREAM_LISTPACKS_2 + || encType == Constant.DataType.STREAM_LISTPACKS_3) { SkipStream(br, encType); } diff --git a/src/RDBParser/BinaryReaderRDBParser.Stream.cs b/src/RDBParser/BinaryReaderRDBParser.Stream.cs index e9c7b1f..76ee8b2 100644 --- a/src/RDBParser/BinaryReaderRDBParser.Stream.cs +++ b/src/RDBParser/BinaryReaderRDBParser.Stream.cs @@ -37,7 +37,7 @@ private void ReadStream(BinaryReader br, int encType) var firstEntryId = "0-0"; var maxDeletedEntryId = "0-0"; ulong entriesAdded = items; - if (encType == Constant.DataType.STREAM_LISTPACKS_2) + if (encType >= Constant.DataType.STREAM_LISTPACKS_2) { // the first entry ID var firstMs = br.ReadLength(); @@ -53,18 +53,21 @@ private void ReadStream(BinaryReader br, int encType) entriesAdded = br.ReadLength(); } + // Consumer groups loading var cgroups = br.ReadLength(); var cgroupsData = new List(); while (cgroups > 0) { var cgName = br.ReadStr(); + // ms var l = br.ReadLength(); + // seq var r = br.ReadLength(); var lastCgEntryId = $"{l}-{r}"; // group offset ulong cgOffset = 0; - if (encType == Constant.DataType.STREAM_LISTPACKS_2) + if (encType >= Constant.DataType.STREAM_LISTPACKS_2) { cgOffset = br.ReadLength(); } @@ -96,6 +99,12 @@ private void ReadStream(BinaryReader br, int encType) var cname = br.ReadStr(); var seenTime = br.ReadUInt64(); + var activeTime = seenTime; + if (encType >= Constant.DataType.STREAM_LISTPACKS_3) + { + activeTime = br.ReadUInt64(); + } + // the PEL about entries owned by this specific consumer pelSize = br.ReadLength(); var consumerPendingEntries = new List(); @@ -115,6 +124,7 @@ private void ReadStream(BinaryReader br, int encType) { Name = cname, SeenTime = seenTime, + ActiveTime = activeTime, Pending = consumerPendingEntries }); @@ -162,7 +172,7 @@ private void SkipStream(BinaryReader br, int encType) _ = br.ReadLength(); _ = br.ReadLength(); - if (encType == Constant.DataType.STREAM_LISTPACKS_2) + if (encType >= Constant.DataType.STREAM_LISTPACKS_2) { // the first entry ID _ = br.ReadLength(); @@ -183,7 +193,7 @@ private void SkipStream(BinaryReader br, int encType) _ = br.ReadLength(); _ = br.ReadLength(); - if (encType == Constant.DataType.STREAM_LISTPACKS_2) + if (encType >= Constant.DataType.STREAM_LISTPACKS_2) { _ = br.ReadLength(); } @@ -202,6 +212,10 @@ private void SkipStream(BinaryReader br, int encType) { br.SkipStr(); br.ReadBytes(8); + if (encType >= Constant.DataType.STREAM_LISTPACKS_3) + { + br.ReadBytes(8); + } pending = br.ReadLength(); br.ReadBytes((int)(pending * 16)); diff --git a/src/RDBParser/Callbacks/Models/StreamConsumerEntity.cs b/src/RDBParser/Callbacks/Models/StreamConsumerEntity.cs index 851a642..afe4bf9 100644 --- a/src/RDBParser/Callbacks/Models/StreamConsumerEntity.cs +++ b/src/RDBParser/Callbacks/Models/StreamConsumerEntity.cs @@ -11,10 +11,14 @@ public class StreamConsumerEntity /// public byte[] Name { get; set; } /// - /// Last time this consumer was active. + /// Last time this consumer tried to perform an action (attempted reading/claiming). /// public ulong SeenTime { get; set; } /// + /// Last time this consumer was active (successful reading/claiming). + /// + public ulong ActiveTime { get; set; } + /// /// Consumer specific pending entries list /// public List Pending { get; set; } diff --git a/tests/RDBParserTests/StreamTests.cs b/tests/RDBParserTests/StreamTests.cs index 15a82a6..896d687 100644 --- a/tests/RDBParserTests/StreamTests.cs +++ b/tests/RDBParserTests/StreamTests.cs @@ -153,5 +153,76 @@ public void TestStreamsWithRedis70AndGroup() var streamId = RedisRdbObjectHelper.GetStreamId(pending.Id); Assert.Equal("1526919030474-56", streamId); } + + [Fact] + public void TestStreamsWithRedis72AndGroup() + { + // xadd mystream 1526919030474-55 message 1 + // xadd mystream 1526919030474-56 message 2 + // xadd mystream 1526919030474-57 message 3 + // XGROUP create mystream sg 0-0 + // XREADGROUP group sg c1 count 1 streams mystream > + // XACK mystream sg "1526919030474-55" + // XREADGROUP group sg c1 count 1 streams mystream > + // bgsave + var path = TestHelper.GetRDBPath("redis_72_stream.rdb"); + + var callback = new TestReaderCallback(_output); + var parser = new BinaryReaderRDBParser(callback); + parser.Parse(path); + + var lengths = callback.GetLengths(); + var streamEntities = callback.GetStreamEntities(); + + Assert.Equal(3, lengths[0][Encoding.UTF8.GetBytes("mystream")]); + + var streamEntity = streamEntities[0][Encoding.UTF8.GetBytes("mystream")]; + Assert.Single(streamEntity); + + var se = streamEntity.Single(); + + Assert.Equal("1526919030474-55", se.FirstId); + Assert.Equal("1526919030474-57", se.LastId); + + var sgList = se.CGroups; + Assert.Single(sgList); + + var sg = sgList.Single(); + Assert.Equal(Encoding.UTF8.GetBytes("sg"), sg.Name); + Assert.Equal("1526919030474-56", sg.LastEntryId); + Assert.Single(sg.Consumers); + Assert.Single(sg.Pending); + Assert.Equal((ulong)2, sg.EntriesRead); + + var consumer = sg.Consumers.Single(); + Assert.Equal(Encoding.UTF8.GetBytes("c1"), consumer.Name); + Assert.Single(consumer.Pending); + + var pending = sg.Pending.Single(); + var streamId = RedisRdbObjectHelper.GetStreamId(pending.Id); + Assert.Equal("1526919030474-56", streamId); + } + + [Fact] + public void SkipTestStreamsWithRedis72AndGroup() + { + // xadd mystream 1526919030474-55 message 1 + // xadd mystream 1526919030474-56 message 2 + // xadd mystream 1526919030474-57 message 3 + // XGROUP create mystream sg 0-0 + // XREADGROUP group sg c1 count 1 streams mystream > + // XACK mystream sg "1526919030474-55" + // XREADGROUP group sg c1 count 1 streams mystream > + // bgsave + var path = TestHelper.GetRDBPath("redis_72_stream_for_skip.rdb"); + + var callback = new TestReaderCallback(_output); + var parser = new BinaryReaderRDBParser(callback, new ParserFilter { Types = new System.Collections.Generic.List { "string"} }); + parser.Parse(path); + + var lengths = callback.GetLengths(); + + Assert.Empty(lengths[0]); + } } } \ No newline at end of file diff --git a/tests/dumps/redis_72_stream.rdb b/tests/dumps/redis_72_stream.rdb new file mode 100644 index 0000000000000000000000000000000000000000..e3efc9d3a5b9c18896b07f39b15a7806f8df3815 GIT binary patch literal 273 zcmWG?b@2=~FfcUy#aWb^l3A=^hTn_~q8z!E#U(|liMfmd3=E9P&E;vQ z7$Cs>u)dkN8IUc)z|6?N$jI29n_66)n4ZePz{sQ^;2^}nlEBHpsKDC5$;8CStiZ|e zpSclch9#V3&H&WTRGbbMuwY_@se;;M@vyzkc@jvJktx{_#(~ios{bE-BezrU>nv*k DiZV^! literal 0 HcmV?d00001 diff --git a/tests/dumps/redis_72_stream_for_skip.rdb b/tests/dumps/redis_72_stream_for_skip.rdb new file mode 100644 index 0000000000000000000000000000000000000000..e3efc9d3a5b9c18896b07f39b15a7806f8df3815 GIT binary patch literal 273 zcmWG?b@2=~FfcUy#aWb^l3A=^hTn_~q8z!E#U(|liMfmd3=E9P&E;vQ z7$Cs>u)dkN8IUc)z|6?N$jI29n_66)n4ZePz{sQ^;2^}nlEBHpsKDC5$;8CStiZ|e zpSclch9#V3&H&WTRGbbMuwY_@se;;M@vyzkc@jvJktx{_#(~ios{bE-BezrU>nv*k DiZV^! literal 0 HcmV?d00001