diff --git a/RabbitMQ.Stream.Client/ICrc32.cs b/RabbitMQ.Stream.Client/ICrc32.cs index 62fd2a5a..49c57688 100644 --- a/RabbitMQ.Stream.Client/ICrc32.cs +++ b/RabbitMQ.Stream.Client/ICrc32.cs @@ -6,6 +6,20 @@ namespace RabbitMQ.Stream.Client { + public struct ChunkInfo + { + /// + /// The stream name of the chunk. + /// + public string StreamName { get; init; } + + public ulong Id { get; init; } + + public uint ServerHash { get; init; } + + public uint LocalHash { get; init; } + } + public enum ChunkAction { /// @@ -37,5 +51,7 @@ public interface ICrc32 /// The code here should be safe /// Func FailAction { get; init; } + + Func AsyncFailAction { get; init; } } } diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index f5cc5948..560eea10 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -38,6 +38,16 @@ RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte RabbitMQ.Stream.Client.ChunkAction RabbitMQ.Stream.Client.ChunkAction.Skip = 1 -> RabbitMQ.Stream.Client.ChunkAction RabbitMQ.Stream.Client.ChunkAction.TryToProcess = 0 -> RabbitMQ.Stream.Client.ChunkAction +RabbitMQ.Stream.Client.ChunkInfo +RabbitMQ.Stream.Client.ChunkInfo.ChunkInfo() -> void +RabbitMQ.Stream.Client.ChunkInfo.Id.get -> ulong +RabbitMQ.Stream.Client.ChunkInfo.Id.init -> void +RabbitMQ.Stream.Client.ChunkInfo.LocalHash.get -> uint +RabbitMQ.Stream.Client.ChunkInfo.LocalHash.init -> void +RabbitMQ.Stream.Client.ChunkInfo.ServerHash.get -> uint +RabbitMQ.Stream.Client.ChunkInfo.ServerHash.init -> void +RabbitMQ.Stream.Client.ChunkInfo.StreamName.get -> string +RabbitMQ.Stream.Client.ChunkInfo.StreamName.init -> void RabbitMQ.Stream.Client.Client.ClientId.get -> string RabbitMQ.Stream.Client.Client.ClientId.init -> void RabbitMQ.Stream.Client.Client.Consumers.get -> System.Collections.Generic.IDictionary @@ -186,6 +196,8 @@ RabbitMQ.Stream.Client.IConsumerConfig.FlowControl.set -> void RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void RabbitMQ.Stream.Client.ICrc32 +RabbitMQ.Stream.Client.ICrc32.AsyncFailAction.get -> System.Func +RabbitMQ.Stream.Client.ICrc32.AsyncFailAction.init -> void RabbitMQ.Stream.Client.ICrc32.FailAction.get -> System.Func RabbitMQ.Stream.Client.ICrc32.FailAction.init -> void RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[] @@ -339,6 +351,8 @@ RabbitMQ.Stream.Client.RoutingStrategyType RabbitMQ.Stream.Client.RoutingStrategyType.Hash = 0 -> RabbitMQ.Stream.Client.RoutingStrategyType RabbitMQ.Stream.Client.RoutingStrategyType.Key = 1 -> RabbitMQ.Stream.Client.RoutingStrategyType RabbitMQ.Stream.Client.StreamCrc32 +RabbitMQ.Stream.Client.StreamCrc32.AsyncFailAction.get -> System.Func +RabbitMQ.Stream.Client.StreamCrc32.AsyncFailAction.init -> void RabbitMQ.Stream.Client.StreamCrc32.FailAction.get -> System.Func RabbitMQ.Stream.Client.StreamCrc32.FailAction.init -> void RabbitMQ.Stream.Client.StreamCrc32.Hash(byte[] data) -> byte[] diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index c0c28a8e..d81cf56c 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -107,7 +107,6 @@ internal void Validate() } FlowControl ??= new FlowControl(); - } internal bool IsFiltering => ConsumerFilter is { Values.Count: > 0 }; @@ -642,7 +641,14 @@ private async Task Init() // if the user has set the FailAction, we call it // to allow the user to handle the chunk action // if the FailAction is not set, we skip the chunk - chunkAction = _config.Crc32.FailAction?.Invoke(this) ?? ChunkAction.Skip; + chunkAction = _config.Crc32.AsyncFailAction?.Invoke(this, + new ChunkInfo() + { + Id = deliver.Chunk.ChunkId, + ServerHash = deliver.Chunk.Crc, + LocalHash = crcCalculated, + StreamName = _config.Stream + }) ?? ChunkAction.Skip; } } diff --git a/RabbitMQ.Stream.Client/StreamCrc32.cs b/RabbitMQ.Stream.Client/StreamCrc32.cs index b74c896a..04fda860 100644 --- a/RabbitMQ.Stream.Client/StreamCrc32.cs +++ b/RabbitMQ.Stream.Client/StreamCrc32.cs @@ -20,4 +20,5 @@ public byte[] Hash(byte[] data) } public Func FailAction { get; init; } = null; + public Func AsyncFailAction { get; init; } } diff --git a/Tests/Crc32Tests.cs b/Tests/Crc32Tests.cs index c6ebc9c6..ead5bdf3 100644 --- a/Tests/Crc32Tests.cs +++ b/Tests/Crc32Tests.cs @@ -27,6 +27,7 @@ public byte[] Hash(byte[] data) } public Func FailAction { get; init; } + public Func AsyncFailAction { get; init; } } public class Crc32Tests(ITestOutputHelper testOutputHelper)