From 809711e9e86a71959d03279bdbcc281d07010af0 Mon Sep 17 00:00:00 2001 From: Bradley Wadsworth Date: Tue, 30 Jul 2024 11:58:15 +0100 Subject: [PATCH 1/6] Add ManualAssignPartitionOffsets to consumer configuration --- .../IConsumerConfigurationBuilder.cs | 8 ++ .../Configuration/TopicPartitionOffsets.cs | 16 +++ .../Configuration/ConsumerConfiguration.cs | 4 + .../ConsumerConfigurationBuilder.cs | 8 ++ .../Configuration/IConsumerConfiguration.cs | 5 + src/KafkaFlow/Consumers/Consumer.cs | 22 +++- .../ConsumerTest.cs | 60 +++++++++++ .../Core/Bootstrapper.cs | 25 +++++ .../Core/Handlers/MessageStorage.cs | 55 ++++++++++ .../Handlers/OffsetTrackerMessageHandler.cs | 14 +++ .../Core/Messages/OffsetTrackerMessage.cs | 9 ++ .../Core/Producers/OffsetTrackerProducer.cs | 6 ++ .../Core/ServiceProviderHelper.cs | 100 ++++++++++++++++++ 13 files changed, 330 insertions(+), 2 deletions(-) create mode 100644 src/KafkaFlow.Abstractions/Configuration/TopicPartitionOffsets.cs create mode 100644 tests/KafkaFlow.IntegrationTests/Core/Handlers/OffsetTrackerMessageHandler.cs create mode 100644 tests/KafkaFlow.IntegrationTests/Core/Messages/OffsetTrackerMessage.cs create mode 100644 tests/KafkaFlow.IntegrationTests/Core/Producers/OffsetTrackerProducer.cs create mode 100644 tests/KafkaFlow.IntegrationTests/Core/ServiceProviderHelper.cs diff --git a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs index d223feb90..416a729a3 100644 --- a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs @@ -30,6 +30,14 @@ public interface IConsumerConfigurationBuilder /// IConsumerConfigurationBuilder ManualAssignPartitions(string topicName, IEnumerable partitions); + /// + /// Explicitly defines the topic, partitions and offsets that will be used to read the messages + /// + /// Topic name + /// The partition offset dictionary [Partition ID, Offset] + /// + IConsumerConfigurationBuilder ManualAssignPartitionOffsets(string topicName, IDictionary partitionOffsets); + /// /// Sets the topics that will be used to read the messages, the partitions will be automatically assigned /// diff --git a/src/KafkaFlow.Abstractions/Configuration/TopicPartitionOffsets.cs b/src/KafkaFlow.Abstractions/Configuration/TopicPartitionOffsets.cs new file mode 100644 index 000000000..5a3a17756 --- /dev/null +++ b/src/KafkaFlow.Abstractions/Configuration/TopicPartitionOffsets.cs @@ -0,0 +1,16 @@ +using System.Collections.Generic; + +namespace KafkaFlow.Configuration; + +public class TopicPartitionOffsets +{ + public TopicPartitionOffsets(string name, IDictionary partitionOffsets) + { + this.Name = name; + this.PartitionOffsets = partitionOffsets; + } + + public string Name { get; } + + public IDictionary PartitionOffsets { get; } +} diff --git a/src/KafkaFlow/Configuration/ConsumerConfiguration.cs b/src/KafkaFlow/Configuration/ConsumerConfiguration.cs index 67eb2570b..0359999d0 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfiguration.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfiguration.cs @@ -12,6 +12,7 @@ public ConsumerConfiguration( Confluent.Kafka.ConsumerConfig consumerConfig, IReadOnlyList topics, IReadOnlyList manualAssignPartitions, + IReadOnlyList manualAssignPartitionOffsets, string consumerName, ClusterConfiguration clusterConfiguration, bool managementDisabled, @@ -48,6 +49,7 @@ public ConsumerConfiguration( this.AutoCommitInterval = autoCommitInterval; this.Topics = topics ?? throw new ArgumentNullException(nameof(topics)); this.ManualAssignPartitions = manualAssignPartitions ?? throw new ArgumentNullException(nameof(manualAssignPartitions)); + this.ManualAssignPartitionOffsets = manualAssignPartitionOffsets ?? throw new ArgumentNullException(nameof(manualAssignPartitionOffsets)); this.ConsumerName = consumerName ?? Guid.NewGuid().ToString(); this.ClusterConfiguration = clusterConfiguration; this.ManagementDisabled = managementDisabled; @@ -76,6 +78,8 @@ public ConsumerConfiguration( public IReadOnlyList ManualAssignPartitions { get; } + public IReadOnlyList ManualAssignPartitionOffsets { get; } + public string ConsumerName { get; } public ClusterConfiguration ClusterConfiguration { get; } diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs index 304fe2d34..cc2a849d2 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs @@ -11,6 +11,7 @@ internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuild { private readonly List _topics = new(); private readonly List _topicsPartitions = new(); + private readonly List _topicsPartitionOffsets = new(); private readonly List> _statisticsHandlers = new(); private readonly List _pendingOffsetsStatisticsHandlers = new(); @@ -60,6 +61,12 @@ public IConsumerConfigurationBuilder ManualAssignPartitions(string topicName, IE return this; } + public IConsumerConfigurationBuilder ManualAssignPartitionOffsets(string topicName, IDictionary partitionOffsets) + { + _topicsPartitionOffsets.Add(new TopicPartitionOffsets(topicName, partitionOffsets)); + return this; + } + public IConsumerConfigurationBuilder WithConsumerConfig(Confluent.Kafka.ConsumerConfig config) { _consumerConfig = config; @@ -259,6 +266,7 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration) consumerConfigCopy, _topics, _topicsPartitions, + _topicsPartitionOffsets, _name, clusterConfiguration, _disableManagement, diff --git a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs index e8cd02ab4..57142c933 100644 --- a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs +++ b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs @@ -29,6 +29,11 @@ public interface IConsumerConfiguration /// IReadOnlyList ManualAssignPartitions { get; } + /// + /// Gets the topic partition offsets to manually assign + /// + IReadOnlyList ManualAssignPartitionOffsets { get; } + /// /// Gets the consumer name /// diff --git a/src/KafkaFlow/Consumers/Consumer.cs b/src/KafkaFlow/Consumers/Consumer.cs index 3fef05e5f..66a2f9246 100644 --- a/src/KafkaFlow/Consumers/Consumer.cs +++ b/src/KafkaFlow/Consumers/Consumer.cs @@ -273,11 +273,16 @@ private void EnsureConsumer() if (this.Configuration.ManualAssignPartitions.Any()) { - this.ManualAssign(this.Configuration.ManualAssignPartitions); + this.ManualAssignPartitions(this.Configuration.ManualAssignPartitions); + } + + if (this.Configuration.ManualAssignPartitionOffsets.Any()) + { + this.ManualAssignPartitionOffsets(this.Configuration.ManualAssignPartitionOffsets); } } - private void ManualAssign(IEnumerable topics) + private void ManualAssignPartitions(IEnumerable topics) { var partitions = topics .SelectMany( @@ -289,6 +294,19 @@ private void ManualAssign(IEnumerable topics) this.FirePartitionsAssignedHandlers(_consumer, partitions); } + private void ManualAssignPartitionOffsets(IEnumerable topics) + { + var partitionOffsets = topics + .SelectMany( + topic => topic.PartitionOffsets.Select( + partitionOffset => new Confluent.Kafka.TopicPartitionOffset( + topic.Name, new Partition(partitionOffset.Key), new Offset(partitionOffset.Value)))) + .ToList(); + + _consumer.Assign(partitionOffsets); + this.FirePartitionsAssignedHandlers(_consumer, partitionOffsets.Select(x => x.TopicPartition).ToList()); + } + private void FirePartitionsAssignedHandlers( IConsumer consumer, List partitions) diff --git a/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs b/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs index 43c47600a..d7befe81a 100644 --- a/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs +++ b/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs @@ -1,7 +1,10 @@ using System; +using System.Collections.Generic; using System.Linq; +using System.Text; using System.Threading.Tasks; using AutoFixture; +using Confluent.Kafka; using global::Microsoft.Extensions.DependencyInjection; using global::Microsoft.VisualStudio.TestTools.UnitTesting; using KafkaFlow.Consumers; @@ -9,6 +12,7 @@ using KafkaFlow.IntegrationTests.Core.Handlers; using KafkaFlow.IntegrationTests.Core.Messages; using KafkaFlow.IntegrationTests.Core.Producers; +using KafkaFlow.Serializer; namespace KafkaFlow.IntegrationTests; @@ -157,4 +161,60 @@ public void AddConsumer_WithSharedConsumerConfig_ConsumersAreConfiguratedIndepen Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.ProtobufGroupId))); Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.ProtobufGzipGroupId))); } + + [TestMethod] + public async Task ManualAssignPartitionOffsetsTest() + { + // Arrange + var producer = _provider.GetRequiredService>(); + var messages = _fixture + .Build() + .Without(m => m.Offset) + .CreateMany(10).ToList(); + + messages.ForEach(m => producer.Produce(m.Id.ToString(), m, null, DeliveryHandler)); + + foreach (var message in messages) + { + await MessageStorage.AssertMessageAsync(message); + } + + var endOffset = MessageStorage.GetOffsetTrack(); + MessageStorage.Clear(); + + // Act + var serviceProvider = await new ServiceProviderHelper().GetServiceProviderAsync( + consumerConfig => + { + consumerConfig.ManualAssignPartitionOffsets(Bootstrapper.OffsetTrackerTopicName, new Dictionary { { 0, endOffset - 4 } }) + .WithGroupId("ManualAssignPartitionOffsetsTest") + .AddMiddlewares( + middlewares => middlewares + .AddDeserializer() + .AddTypedHandlers( + handlers => handlers.AddHandler())); + }, null); + + // Assert + for (var i = 0; i < 5; i++) + { + await MessageStorage.AssertOffsetTrackerMessageAsync(messages[i], false); + } + + for (var i = 5; i < 10; i++) + { + await MessageStorage.AssertOffsetTrackerMessageAsync(messages[i]); + } + + await serviceProvider.CreateKafkaBus().StopAsync(); + + return; + + void DeliveryHandler(DeliveryReport report) + { + var key = Encoding.UTF8.GetString(report.Message.Key); + var message = messages.First(m => m.Id.ToString() == key); + message.Offset = report.Offset; + } + } } diff --git a/tests/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs b/tests/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs index 5e317cc8e..730db3743 100644 --- a/tests/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs +++ b/tests/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs @@ -30,6 +30,7 @@ internal static class Bootstrapper internal const string AvroGroupId = "consumer-avro"; internal const string JsonGroupId = "consumer-json"; internal const string NullGroupId = "consumer-null"; + internal const string OffsetTrackerGroupId = "consumer-offset-tracker"; private const string ProtobufTopicName = "test-protobuf"; @@ -43,6 +44,7 @@ internal static class Bootstrapper private const string AvroTopicName = "test-avro"; private const string NullTopicName = "test-null"; private const string DefaultParamsTopicName = "test-default-params"; + internal const string OffsetTrackerTopicName = "test-offset-tracker"; private static readonly Lazy s_lazyProvider = new(SetupProvider); @@ -203,6 +205,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .CreateTopicIfNotExists(ProtobufGzipTopicName, 2, 1) .CreateTopicIfNotExists(ProtobufGzipTopicName2, 2, 1) .CreateTopicIfNotExists(NullTopicName, 1, 1) + .CreateTopicIfNotExists(OffsetTrackerTopicName, 1, 1) .CreateTopicIfNotExists(DefaultParamsTopicName) .AddConsumer( consumer => consumer @@ -270,6 +273,22 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .WithHandlerLifetime(InstanceLifetime.Singleton) .AddHandler() ))) + .AddConsumer( + consumer => consumer + .Topic(OffsetTrackerTopicName) + .WithGroupId(OffsetTrackerGroupId) + .WithBufferSize(100) + .WithWorkersCount(10) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddDeserializer() + .AddTypedHandlers( + handlers => + handlers + .WithHandlerLifetime(InstanceLifetime.Singleton) + .AddHandler() + ))) .AddConsumer( consumer => consumer .Topics(GzipTopicName) @@ -325,6 +344,12 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .AddMiddlewares( middlewares => middlewares .AddSerializer())) + .AddProducer( + producer => producer + .DefaultTopic(OffsetTrackerTopicName) + .AddMiddlewares( + middlewares => middlewares + .AddSerializer())) .AddProducer( producer => producer .DefaultTopic(JsonGzipTopicName) diff --git a/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs b/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs index 40fc634e7..91c898dae 100644 --- a/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs +++ b/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs @@ -18,6 +18,8 @@ internal static class MessageStorage private static readonly ConcurrentBag<(long, int)> s_versions = new(); private static readonly ConcurrentBag s_byteMessages = new(); private static readonly ConcurrentBag s_nullMessages = new(); + private static readonly ConcurrentBag s_offsetTrackerMessages = new(); + private static long s_offsetTrack; public static void Add(ITestMessage message) { @@ -34,6 +36,12 @@ public static void Add(TestProtoMessage message) { s_protoMessages.Add(message); } + + public static void Add(OffsetTrackerMessage message) + { + s_offsetTrackerMessages.Add(message); + s_offsetTrack = Math.Max(message.Offset, s_offsetTrack); + } public static void Add(byte[] message) { @@ -139,6 +147,51 @@ public static async Task AssertNullMessageAsync() await Task.Delay(100).ConfigureAwait(false); } } + + public static async Task AssertMessageAsync(OffsetTrackerMessage message) + { + var start = DateTime.Now; + + while (!s_offsetTrackerMessages.Any(x => x.Id == message.Id && x.Offset == message.Offset)) + { + if (DateTime.Now.Subtract(start).Seconds > TimeoutSec) + { + Assert.Fail("Message (OffsetTrackerMessage) not received"); + return; + } + + await Task.Delay(100).ConfigureAwait(false); + } + } + + public static async Task AssertOffsetTrackerMessageAsync(OffsetTrackerMessage message, bool assertInStore = true) + { + var start = DateTime.Now; + + while (!s_offsetTrackerMessages.Any(x => x.Id == message.Id && x.Offset == message.Offset)) + { + if (DateTime.Now.Subtract(start).Seconds > TimeoutSec) + { + if (assertInStore) + { + Assert.Fail("Message (OffsetTrackerMessage) not received"); + } + return; + } + + await Task.Delay(100).ConfigureAwait(false); + } + + if (!assertInStore) + { + Assert.Fail("Message (OffsetTrackerMessage) received when it should not have been."); + } + } + + public static long GetOffsetTrack() + { + return s_offsetTrack; + } public static List<(long ticks, int version)> GetVersions() { @@ -151,5 +204,7 @@ public static void Clear() s_testMessages.Clear(); s_byteMessages.Clear(); s_protoMessages.Clear(); + s_offsetTrackerMessages.Clear(); + s_offsetTrack = 0; } } diff --git a/tests/KafkaFlow.IntegrationTests/Core/Handlers/OffsetTrackerMessageHandler.cs b/tests/KafkaFlow.IntegrationTests/Core/Handlers/OffsetTrackerMessageHandler.cs new file mode 100644 index 000000000..4d514b4df --- /dev/null +++ b/tests/KafkaFlow.IntegrationTests/Core/Handlers/OffsetTrackerMessageHandler.cs @@ -0,0 +1,14 @@ +using System.Threading.Tasks; +using KafkaFlow.IntegrationTests.Core.Messages; + +namespace KafkaFlow.IntegrationTests.Core.Handlers; + +internal class OffsetTrackerMessageHandler : IMessageHandler +{ + public Task Handle(IMessageContext context, OffsetTrackerMessage message) + { + message.Offset = context.ConsumerContext.Offset; + MessageStorage.Add(message); + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/tests/KafkaFlow.IntegrationTests/Core/Messages/OffsetTrackerMessage.cs b/tests/KafkaFlow.IntegrationTests/Core/Messages/OffsetTrackerMessage.cs new file mode 100644 index 000000000..1bb456bb7 --- /dev/null +++ b/tests/KafkaFlow.IntegrationTests/Core/Messages/OffsetTrackerMessage.cs @@ -0,0 +1,9 @@ +using System; + +namespace KafkaFlow.IntegrationTests.Core.Messages; + +internal class OffsetTrackerMessage +{ + public Guid Id { get; set; } + public long Offset { get; set; } +} \ No newline at end of file diff --git a/tests/KafkaFlow.IntegrationTests/Core/Producers/OffsetTrackerProducer.cs b/tests/KafkaFlow.IntegrationTests/Core/Producers/OffsetTrackerProducer.cs new file mode 100644 index 000000000..033fb7a26 --- /dev/null +++ b/tests/KafkaFlow.IntegrationTests/Core/Producers/OffsetTrackerProducer.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow.IntegrationTests.Core.Producers; + +public class OffsetTrackerProducer +{ + +} \ No newline at end of file diff --git a/tests/KafkaFlow.IntegrationTests/Core/ServiceProviderHelper.cs b/tests/KafkaFlow.IntegrationTests/Core/ServiceProviderHelper.cs new file mode 100644 index 000000000..4b4732933 --- /dev/null +++ b/tests/KafkaFlow.IntegrationTests/Core/ServiceProviderHelper.cs @@ -0,0 +1,100 @@ +using System; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using KafkaFlow.Configuration; +using KafkaFlow.IntegrationTests.Core.Producers; +using KafkaFlow.Serializer; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; +using Polly; + +namespace KafkaFlow.IntegrationTests.Core; + +public class ServiceProviderHelper +{ + private bool _isPartitionAssigned; + + public async Task GetServiceProviderAsync( + Action consumerConfiguration, + Action producerConfiguration, + Action builderConfiguration = null) + { + if (consumerConfiguration == null && producerConfiguration == null) + { + throw new ArgumentException("At least one of the configurations must be provided"); + } + + var clusterBuilderAction = (HostBuilderContext context, IClusterConfigurationBuilder cluster) => + { + cluster.WithBrokers(context.Configuration.GetValue("Kafka:Brokers").Split(';')); + + if (consumerConfiguration != null) + { + cluster.AddConsumer(builder => + { + consumerConfiguration(builder); + builder + .WithBufferSize(100) + .WithWorkersCount(10) + .WithPartitionsAssignedHandler((_, _) => + { + _isPartitionAssigned = true; + }) + .AddMiddlewares(middlewares => middlewares.AddDeserializer()); + }); + } + + if (producerConfiguration != null) + { + cluster.AddProducer(producerConfiguration); + } + }; + + clusterBuilderAction += (_, cluster) => + { + builderConfiguration?.Invoke(cluster); + }; + + var builder = Host + .CreateDefaultBuilder() + .ConfigureAppConfiguration( + (_, config) => + { + config + .SetBasePath(Directory.GetCurrentDirectory()) + .AddJsonFile( + "conf/appsettings.json", + false, + true) + .AddEnvironmentVariables(); + }) + .ConfigureServices((context, services) => + services.AddKafka( + kafka => kafka + .UseLogHandler() + .AddCluster(cluster => { clusterBuilderAction.Invoke(context, cluster); }))) + .UseDefaultServiceProvider( + (_, options) => + { + options.ValidateScopes = true; + options.ValidateOnBuild = true; + }); + + var host = builder.Build(); + var bus = host.Services.CreateKafkaBus(); + await bus.StartAsync(); + + await WaitForPartitionAssignmentAsync(); + + return host.Services; + } + + private async Task WaitForPartitionAssignmentAsync() + { + await Policy + .HandleResult(isAvailable => !isAvailable) + .WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2)))) + .ExecuteAsync(() => Task.FromResult(_isPartitionAssigned)); + } +} \ No newline at end of file From 08d897b1cc8350d57b56bd6494ba99feb3acab93 Mon Sep 17 00:00:00 2001 From: Bradley Wadsworth Date: Tue, 30 Jul 2024 13:55:02 +0100 Subject: [PATCH 2/6] Refactor to reduce code duplication in integration tests --- .../ConsumerTest.cs | 10 ++- .../Core/ServiceProviderHelper.cs | 52 ++++++----- .../GlobalEventsTest.cs | 89 ++++--------------- 3 files changed, 53 insertions(+), 98 deletions(-) diff --git a/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs b/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs index d7befe81a..ffe5f33da 100644 --- a/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs +++ b/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs @@ -183,11 +183,15 @@ public async Task ManualAssignPartitionOffsetsTest() MessageStorage.Clear(); // Act - var serviceProvider = await new ServiceProviderHelper().GetServiceProviderAsync( + var serviceProviderHelper = new ServiceProviderHelper(); + + await serviceProviderHelper.GetServiceProviderAsync( consumerConfig => { consumerConfig.ManualAssignPartitionOffsets(Bootstrapper.OffsetTrackerTopicName, new Dictionary { { 0, endOffset - 4 } }) .WithGroupId("ManualAssignPartitionOffsetsTest") + .WithBufferSize(100) + .WithWorkersCount(10) .AddMiddlewares( middlewares => middlewares .AddDeserializer() @@ -205,8 +209,8 @@ public async Task ManualAssignPartitionOffsetsTest() { await MessageStorage.AssertOffsetTrackerMessageAsync(messages[i]); } - - await serviceProvider.CreateKafkaBus().StopAsync(); + + await serviceProviderHelper.StopBusAsync(); return; diff --git a/tests/KafkaFlow.IntegrationTests/Core/ServiceProviderHelper.cs b/tests/KafkaFlow.IntegrationTests/Core/ServiceProviderHelper.cs index 4b4732933..7d96a282f 100644 --- a/tests/KafkaFlow.IntegrationTests/Core/ServiceProviderHelper.cs +++ b/tests/KafkaFlow.IntegrationTests/Core/ServiceProviderHelper.cs @@ -4,7 +4,6 @@ using System.Threading.Tasks; using KafkaFlow.Configuration; using KafkaFlow.IntegrationTests.Core.Producers; -using KafkaFlow.Serializer; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Polly; @@ -14,47 +13,39 @@ namespace KafkaFlow.IntegrationTests.Core; public class ServiceProviderHelper { private bool _isPartitionAssigned; + private IKafkaBus _bus; public async Task GetServiceProviderAsync( Action consumerConfiguration, Action producerConfiguration, - Action builderConfiguration = null) + Action builderConfiguration = null, + Action configureGlobalEvents = null) { if (consumerConfiguration == null && producerConfiguration == null) { throw new ArgumentException("At least one of the configurations must be provided"); } - + var clusterBuilderAction = (HostBuilderContext context, IClusterConfigurationBuilder cluster) => { cluster.WithBrokers(context.Configuration.GetValue("Kafka:Brokers").Split(';')); - + if (consumerConfiguration != null) { cluster.AddConsumer(builder => { consumerConfiguration(builder); - builder - .WithBufferSize(100) - .WithWorkersCount(10) - .WithPartitionsAssignedHandler((_, _) => - { - _isPartitionAssigned = true; - }) - .AddMiddlewares(middlewares => middlewares.AddDeserializer()); + builder.WithPartitionsAssignedHandler((_, _) => { _isPartitionAssigned = true; }); }); } - + if (producerConfiguration != null) { - cluster.AddProducer(producerConfiguration); + cluster.AddProducer(producerConfiguration); } }; - clusterBuilderAction += (_, cluster) => - { - builderConfiguration?.Invoke(cluster); - }; + clusterBuilderAction += (_, cluster) => { builderConfiguration?.Invoke(cluster); }; var builder = Host .CreateDefaultBuilder() @@ -71,9 +62,17 @@ public async Task GetServiceProviderAsync( }) .ConfigureServices((context, services) => services.AddKafka( - kafka => kafka - .UseLogHandler() - .AddCluster(cluster => { clusterBuilderAction.Invoke(context, cluster); }))) + kafka => + { + kafka + .UseLogHandler() + .AddCluster(cluster => { clusterBuilderAction.Invoke(context, cluster); }); + + if (configureGlobalEvents != null) + { + kafka.SubscribeGlobalEvents(configureGlobalEvents); + } + })) .UseDefaultServiceProvider( (_, options) => { @@ -82,14 +81,19 @@ public async Task GetServiceProviderAsync( }); var host = builder.Build(); - var bus = host.Services.CreateKafkaBus(); - await bus.StartAsync(); - + _bus = host.Services.CreateKafkaBus(); + await _bus.StartAsync(); + await WaitForPartitionAssignmentAsync(); return host.Services; } + public async Task StopBusAsync() + { + await _bus.StopAsync(); + } + private async Task WaitForPartitionAssignmentAsync() { await Policy diff --git a/tests/KafkaFlow.IntegrationTests/GlobalEventsTest.cs b/tests/KafkaFlow.IntegrationTests/GlobalEventsTest.cs index 366a456cb..0f2ddf19e 100644 --- a/tests/KafkaFlow.IntegrationTests/GlobalEventsTest.cs +++ b/tests/KafkaFlow.IntegrationTests/GlobalEventsTest.cs @@ -1,6 +1,5 @@ using System; using System.IO; -using System.Linq; using System.Threading.Tasks; using AutoFixture; using Confluent.Kafka; @@ -12,11 +11,8 @@ using KafkaFlow.IntegrationTests.Core.Middlewares; using KafkaFlow.IntegrationTests.Core.Producers; using KafkaFlow.Serializer; -using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; using Microsoft.VisualStudio.TestTools.UnitTesting; -using Polly; namespace KafkaFlow.IntegrationTests; @@ -25,8 +21,6 @@ public class GlobalEventsTest { private readonly Fixture _fixture = new(); private string _topic; - private bool _isPartitionAssigned; - private IKafkaBus _bus; [TestInitialize] public void Setup() @@ -66,6 +60,7 @@ public async Task OnStopping_RegisterMultipleOnStoppingCallbacks_AllAreCalled() var countOnStopping = 0; // Act + var serviceProviderHelper = new ServiceProviderHelper(); await this.GetServiceProviderAsync( observers => { }, this.ConfigureConsumer, @@ -74,9 +69,10 @@ await this.GetServiceProviderAsync( { cluster.OnStopping(_ => countOnStopping++); cluster.OnStopping(_ => countOnStopping++); - }); + }, + serviceProviderHelper); - await _bus?.StopAsync(); + await serviceProviderHelper.StopBusAsync(); // Assert Assert.AreEqual(ExpectedOnStoppingCount, countOnStopping); @@ -270,11 +266,7 @@ private void ConfigureConsumer(IConsumerConfigurationBuilder consumerConfigur .AddMiddlewares( middlewares => middlewares .AddDeserializer() - .Add()) - .WithPartitionsAssignedHandler((_, _) => - { - _isPartitionAssigned = true; - }); + .Add()); } private void ConfigureProducer(IProducerConfigurationBuilder producerConfigurationBuilder) @@ -289,65 +281,20 @@ private async Task GetServiceProviderAsync( Action configureGlobalEvents, Action consumerConfiguration, Action producerConfiguration, - Action builderConfiguration = null) + Action builderConfiguration = null, + ServiceProviderHelper serviceProviderHelper = null) { - _isPartitionAssigned = false; - - var clusterBuilderAction = (HostBuilderContext context, IClusterConfigurationBuilder cluster) => - { - cluster - .WithBrokers(context.Configuration.GetValue("Kafka:Brokers").Split(';')) - .CreateTopicIfNotExists(_topic, 1, 1) - .AddProducer(producerConfiguration) - .AddConsumer(consumerConfiguration); - }; - - clusterBuilderAction += (_, cluster) => - { - builderConfiguration?.Invoke(cluster); - }; - - var builder = Host - .CreateDefaultBuilder() - .ConfigureAppConfiguration( - (_, config) => - { - config - .SetBasePath(Directory.GetCurrentDirectory()) - .AddJsonFile( - "conf/appsettings.json", - false, - true) - .AddEnvironmentVariables(); - }) - .ConfigureServices((context, services) => - services.AddKafka( - kafka => kafka - .UseLogHandler() - .AddCluster(cluster => { clusterBuilderAction.Invoke(context, cluster); }) - .SubscribeGlobalEvents(configureGlobalEvents))) - .UseDefaultServiceProvider( - (_, options) => - { - options.ValidateScopes = true; - options.ValidateOnBuild = true; - }); - - var host = builder.Build(); - _bus = host.Services.CreateKafkaBus(); - _bus.StartAsync().GetAwaiter().GetResult(); - - await this.WaitForPartitionAssignmentAsync(); - - return host.Services; - } - - private async Task WaitForPartitionAssignmentAsync() - { - await Policy - .HandleResult(isAvailable => !isAvailable) - .WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2)))) - .ExecuteAsync(() => Task.FromResult(_isPartitionAssigned)); + serviceProviderHelper ??= new ServiceProviderHelper(); + return await serviceProviderHelper.GetServiceProviderAsync( + consumerConfiguration, + producerConfiguration, + cluster => + { + cluster.CreateTopicIfNotExists(_topic, 1, 1); + builderConfiguration?.Invoke(cluster); + }, + configureGlobalEvents + ); } private class TriggerErrorMessageMiddleware : IMessageMiddleware From f89a552122c489adbfb6e75d513969535db0c642 Mon Sep 17 00:00:00 2001 From: Bradley Wadsworth Date: Tue, 30 Jul 2024 13:57:58 +0100 Subject: [PATCH 3/6] Convert to file-scoped namespace --- .../Configuration/SaslOauthbearerMethod.cs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs b/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs index 3b4177c05..9b0ee6e37 100644 --- a/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs +++ b/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs @@ -1,12 +1,11 @@ -namespace KafkaFlow.Configuration +namespace KafkaFlow.Configuration; + +/// SaslOauthbearerMethod enum values +public enum SaslOauthbearerMethod { - /// SaslOauthbearerMethod enum values - public enum SaslOauthbearerMethod - { - /// Default - Default, + /// Default + Default, - /// Oidc - Oidc, - } + /// Oidc + Oidc, } From b1b6dcdeab06fd81fcd3b72d0a997c46c4b380c7 Mon Sep 17 00:00:00 2001 From: Bradley Wadsworth Date: Tue, 30 Jul 2024 14:31:39 +0100 Subject: [PATCH 4/6] Update docs for ManualAssignPartitionOffsets --- .../docs/guides/consumers/add-consumers.md | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/website/docs/guides/consumers/add-consumers.md b/website/docs/guides/consumers/add-consumers.md index eb8c1333b..5cf11971e 100644 --- a/website/docs/guides/consumers/add-consumers.md +++ b/website/docs/guides/consumers/add-consumers.md @@ -79,6 +79,27 @@ services.AddKafka(kafka => kafka ); ``` +## Manual Partition Offset Assignment + +The client application can specify the offsets to start consuming from per partition for topics manually using the `ManualAssignPartitionOffsets()` method. + + +```csharp +using KafkaFlow; +using KafkaFlow.Serializer; +using Microsoft.Extensions.DependencyInjection; + +services.AddKafka(kafka => kafka + .AddCluster(cluster => cluster + .WithBrokers(new[] { "localhost:9092" }) + .AddConsumer(consumer => consumer + .ManualAssignPartitionOffsets("topic-name", new Dictionary { { 0, 100 }, { 1, 120 } }) + ... + ) + ) +); +``` + ## Offset Strategy You can configure the Offset Strategy for a consumer group in case the Consumer Group has no offset stored in Kafka. From 130775abab91bcb3b87daef8a779576bc9ac55ae Mon Sep 17 00:00:00 2001 From: Bradley Wadsworth Date: Tue, 30 Jul 2024 14:57:42 +0100 Subject: [PATCH 5/6] Move local function out to private method --- .../KafkaFlow.IntegrationTests/ConsumerTest.cs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs b/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs index ffe5f33da..4352df585 100644 --- a/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs +++ b/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs @@ -172,7 +172,7 @@ public async Task ManualAssignPartitionOffsetsTest() .Without(m => m.Offset) .CreateMany(10).ToList(); - messages.ForEach(m => producer.Produce(m.Id.ToString(), m, null, DeliveryHandler)); + messages.ForEach(m => producer.Produce(m.Id.ToString(), m, null, report => DeliveryHandler(report, messages))); foreach (var message in messages) { @@ -211,14 +211,12 @@ await serviceProviderHelper.GetServiceProviderAsync( } await serviceProviderHelper.StopBusAsync(); - - return; - - void DeliveryHandler(DeliveryReport report) - { - var key = Encoding.UTF8.GetString(report.Message.Key); - var message = messages.First(m => m.Id.ToString() == key); - message.Offset = report.Offset; - } + } + + private static void DeliveryHandler(DeliveryReport report, List messages) + { + var key = Encoding.UTF8.GetString(report.Message.Key); + var message = messages.First(m => m.Id.ToString() == key); + message.Offset = report.Offset; } } From 72e0027d8e1f19ecf24cbe93d63617dc1dd8124f Mon Sep 17 00:00:00 2001 From: Bradley Wadsworth Date: Thu, 29 Aug 2024 17:58:27 +0000 Subject: [PATCH 6/6] Minor refactoring --- .../KafkaFlow.IntegrationTests/ConsumerTest.cs | 6 +++--- .../Core/Handlers/MessageStorage.cs | 18 +----------------- 2 files changed, 4 insertions(+), 20 deletions(-) diff --git a/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs b/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs index 4352df585..adb5c9db5 100644 --- a/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs +++ b/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs @@ -176,7 +176,7 @@ public async Task ManualAssignPartitionOffsetsTest() foreach (var message in messages) { - await MessageStorage.AssertMessageAsync(message); + await MessageStorage.AssertOffsetTrackerMessageNotReceivedAsync(message); } var endOffset = MessageStorage.GetOffsetTrack(); @@ -202,12 +202,12 @@ await serviceProviderHelper.GetServiceProviderAsync( // Assert for (var i = 0; i < 5; i++) { - await MessageStorage.AssertOffsetTrackerMessageAsync(messages[i], false); + await MessageStorage.AssertOffsetTrackerMessageNotReceivedAsync(messages[i], false); } for (var i = 5; i < 10; i++) { - await MessageStorage.AssertOffsetTrackerMessageAsync(messages[i]); + await MessageStorage.AssertOffsetTrackerMessageNotReceivedAsync(messages[i]); } await serviceProviderHelper.StopBusAsync(); diff --git a/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs b/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs index 91c898dae..c7d296ca8 100644 --- a/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs +++ b/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs @@ -148,23 +148,7 @@ public static async Task AssertNullMessageAsync() } } - public static async Task AssertMessageAsync(OffsetTrackerMessage message) - { - var start = DateTime.Now; - - while (!s_offsetTrackerMessages.Any(x => x.Id == message.Id && x.Offset == message.Offset)) - { - if (DateTime.Now.Subtract(start).Seconds > TimeoutSec) - { - Assert.Fail("Message (OffsetTrackerMessage) not received"); - return; - } - - await Task.Delay(100).ConfigureAwait(false); - } - } - - public static async Task AssertOffsetTrackerMessageAsync(OffsetTrackerMessage message, bool assertInStore = true) + public static async Task AssertOffsetTrackerMessageNotReceivedAsync(OffsetTrackerMessage message, bool assertInStore = true) { var start = DateTime.Now;