diff --git a/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs b/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs
index 3b4177c05..9b0ee6e37 100644
--- a/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs
+++ b/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs
@@ -1,12 +1,11 @@
-namespace KafkaFlow.Configuration
+namespace KafkaFlow.Configuration;
+
+/// SaslOauthbearerMethod enum values
+public enum SaslOauthbearerMethod
{
- /// SaslOauthbearerMethod enum values
- public enum SaslOauthbearerMethod
- {
- /// Default
- Default,
+ /// Default
+ Default,
- /// Oidc
- Oidc,
- }
+ /// Oidc
+ Oidc,
}
diff --git a/src/KafkaFlow/Producers/IMessageProducer.cs b/src/KafkaFlow/Producers/IMessageProducer.cs
index 4b25ae5e3..8c97729d0 100644
--- a/src/KafkaFlow/Producers/IMessageProducer.cs
+++ b/src/KafkaFlow/Producers/IMessageProducer.cs
@@ -62,6 +62,7 @@ Task> ProduceAsync(
/// The message headers
/// A handler with the operation result
/// The partition where the message will be produced, if no partition is provided it will be calculated using the message key
+ [Obsolete("This method will be remove in the next major release, please use ProduceAsync() instead")]
void Produce(
string topic,
object messageKey,
@@ -79,6 +80,7 @@ void Produce(
/// The message headers
/// A handler with the operation result
/// The partition where the message will be produced, if no partition is provided it will be calculated using the message key
+ [Obsolete("This method will be remove in the next major release, please use ProduceAsync() instead")]
void Produce(
object messageKey,
object messageValue,
diff --git a/src/KafkaFlow/Producers/MessageProducer.cs b/src/KafkaFlow/Producers/MessageProducer.cs
index 13161360f..b368e43c1 100644
--- a/src/KafkaFlow/Producers/MessageProducer.cs
+++ b/src/KafkaFlow/Producers/MessageProducer.cs
@@ -1,21 +1,25 @@
using System;
using System.Text;
+using System.Threading.Channels;
using System.Threading.Tasks;
using Confluent.Kafka;
using KafkaFlow.Authentication;
using KafkaFlow.Configuration;
+using KafkaFlow.Extensions;
namespace KafkaFlow.Producers;
-internal class MessageProducer : IMessageProducer, IDisposable
+internal class MessageProducer : IMessageProducer, IAsyncDisposable
{
+ private readonly Channel _queue = Channel.CreateUnbounded();
+ private readonly object _producerCreationSync = new();
+
private readonly IDependencyResolverScope _producerDependencyScope;
private readonly ILogHandler _logHandler;
private readonly IProducerConfiguration _configuration;
private readonly MiddlewareExecutor _middlewareExecutor;
private readonly GlobalEvents _globalEvents;
-
- private readonly object _producerCreationSync = new();
+ private readonly Task _produceTask;
private volatile IProducer _producer;
@@ -28,52 +32,42 @@ public MessageProducer(
_configuration = configuration;
_middlewareExecutor = new MiddlewareExecutor(configuration.MiddlewaresConfigurations);
_globalEvents = dependencyResolver.Resolve();
+
+ _produceTask = Task.Run(async () =>
+ {
+ await foreach (var item in _queue.Reader.ReadAllItemsAsync())
+ {
+ IMessageContext context;
+
+ try
+ {
+ context = await item.MiddlewareCompletion;
+ }
+ catch
+ {
+ continue;
+ }
+
+ InternalProduce(item, context);
+ }
+ });
}
public string ProducerName => _configuration.Name;
- public async Task> ProduceAsync(
+ public Task> ProduceAsync(
string topic,
object messageKey,
object messageValue,
IMessageHeaders headers = null,
int? partition = null)
{
- DeliveryResult report = null;
-
- using var messageScope = _producerDependencyScope.Resolver.CreateScope();
-
- var messageContext = this.CreateMessageContext(
+ return InternalProduceAsync(
topic,
messageKey,
messageValue,
- headers,
- messageScope.Resolver);
-
- await _globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext));
-
- try
- {
- await _middlewareExecutor
- .Execute(
- messageContext,
- async context =>
- {
- report = await this
- .InternalProduceAsync(context, partition)
- .ConfigureAwait(false);
- })
- .ConfigureAwait(false);
-
- await _globalEvents.FireMessageProduceCompletedAsync(new MessageEventContext(messageContext));
- }
- catch (Exception e)
- {
- await _globalEvents.FireMessageProduceErrorAsync(new MessageErrorEventContext(messageContext, e));
- throw;
- }
-
- return report;
+ headers: headers,
+ partition: partition);
}
public Task> ProduceAsync(
@@ -85,15 +79,15 @@ public Task> ProduceAsync(
if (string.IsNullOrWhiteSpace(_configuration.DefaultTopic))
{
throw new InvalidOperationException(
- $"There is no default topic defined for producer {this.ProducerName}");
+ $"There is no default topic defined for producer {ProducerName}");
}
- return this.ProduceAsync(
+ return InternalProduceAsync(
_configuration.DefaultTopic,
messageKey,
messageValue,
- headers,
- partition);
+ headers: headers,
+ partition: partition);
}
public void Produce(
@@ -104,61 +98,13 @@ public void Produce(
Action> deliveryHandler = null,
int? partition = null)
{
- var messageScope = _producerDependencyScope.Resolver.CreateScope();
-
- var messageContext = this.CreateMessageContext(
+ _ = InternalProduceAsync(
topic,
messageKey,
messageValue,
- headers,
- messageScope.Resolver);
-
- _globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext));
-
- _middlewareExecutor
- .Execute(
- messageContext,
- context =>
- {
- var completionSource = new TaskCompletionSource();
-
- this.InternalProduce(
- context,
- partition,
- report =>
- {
- if (report.Error.IsError)
- {
- completionSource.SetException(new ProduceException(report.Error, report));
- }
- else
- {
- completionSource.SetResult(0);
- }
-
- deliveryHandler?.Invoke(report);
- });
-
- return completionSource.Task;
- })
- .ContinueWith(
- task =>
- {
- if (task.IsFaulted)
- {
- deliveryHandler?.Invoke(
- new DeliveryReport
- {
- Error = new Error(ErrorCode.Local_Fail, task.Exception?.Message),
- Status = PersistenceStatus.NotPersisted,
- Topic = topic,
- });
- }
-
- messageScope.Dispose();
- });
-
- _globalEvents.FireMessageProduceCompletedAsync(new MessageEventContext(messageContext));
+ headers: headers,
+ deliveryHandler: deliveryHandler,
+ partition: partition);
}
public void Produce(
@@ -171,29 +117,25 @@ public void Produce(
if (string.IsNullOrWhiteSpace(_configuration.DefaultTopic))
{
throw new InvalidOperationException(
- $"There is no default topic defined for producer {this.ProducerName}");
+ $"There is no default topic defined for producer {ProducerName}");
}
- this.Produce(
+ _ = InternalProduceAsync(
_configuration.DefaultTopic,
messageKey,
messageValue,
- headers,
- deliveryHandler,
- partition);
+ headers: headers,
+ deliveryHandler: deliveryHandler,
+ partition: partition);
}
- public void Dispose()
+ public async ValueTask DisposeAsync()
{
- _producer?.Dispose();
- }
+ _queue.Writer.Complete();
- private static void FillContextWithResultMetadata(IMessageContext context, DeliveryResult result)
- {
- var concreteProducerContext = (ProducerContext)context.ProducerContext;
+ await _produceTask;
- concreteProducerContext.Offset = result.Offset;
- concreteProducerContext.Partition = result.Partition;
+ _producer?.Dispose();
}
private static Message CreateMessage(IMessageContext context)
@@ -226,6 +168,77 @@ private static Message CreateMessage(IMessageContext context)
};
}
+ private async Task> InternalProduceAsync(
+ string topic,
+ object messageKey,
+ object messageValue,
+ IMessageHeaders headers = null,
+ Action> deliveryHandler = null,
+ int? partition = null)
+ {
+ var middlewareCompletionSource = new TaskCompletionSource();
+ var produceItem = new ProduceItem(partition, deliveryHandler, middlewareCompletionSource.Task);
+
+ _queue.Writer.TryWrite(produceItem);
+
+ using var messageScope = _producerDependencyScope.Resolver.CreateScope();
+
+ var startContext = CreateMessageContext(
+ topic,
+ messageKey,
+ messageValue,
+ headers,
+ messageScope.Resolver);
+
+ await _globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(startContext));
+
+ DeliveryReport report = null;
+
+ try
+ {
+ await _middlewareExecutor.Execute(
+ startContext,
+ async endContext =>
+ {
+ middlewareCompletionSource.TrySetResult(endContext);
+ report = await produceItem.ProduceCompletionSource.Task;
+ });
+ }
+ catch (Exception e)
+ {
+ middlewareCompletionSource.TrySetException(e);
+ await _globalEvents.FireMessageProduceErrorAsync(new MessageErrorEventContext(startContext, e));
+ throw;
+ }
+
+ await _globalEvents.FireMessageProduceCompletedAsync(new MessageEventContext(startContext));
+
+ return report;
+ }
+
+ private void OnMessageDelivered(
+ DeliveryReport report,
+ ProduceItem item,
+ IMessageContext context)
+ {
+ item.DeliveryHandler?.Invoke(report);
+
+ if (report.Error.IsError)
+ {
+ var exception = new ProduceException(report.Error, report);
+ OnProduceError(context, exception, report);
+ item.ProduceCompletionSource.SetException(exception);
+ }
+ else
+ {
+ var concreteProducerContext = (ProducerContext)context.ProducerContext;
+ concreteProducerContext.Offset = report.Offset;
+ concreteProducerContext.Partition = report.Partition;
+
+ item.ProduceCompletionSource.SetResult(report);
+ }
+ }
+
private IProducer EnsureProducer()
{
if (_producer != null)
@@ -246,7 +259,7 @@ private IProducer EnsureProducer()
{
if (error.IsFatal)
{
- this.InvalidateProducer(error, null);
+ InvalidateProducer(error, null);
}
else
{
@@ -294,71 +307,46 @@ private void InvalidateProducer(Error error, DeliveryResult resu
new { Error = error });
}
- private async Task> InternalProduceAsync(IMessageContext context, int? partition)
+ private void InternalProduce(ProduceItem item, IMessageContext context)
{
- DeliveryResult result = null;
-
- var localProducer = this.EnsureProducer();
+ var producer = EnsureProducer();
var message = CreateMessage(context);
try
{
- var produceTask = partition.HasValue ?
- localProducer.ProduceAsync(new TopicPartition(context.ProducerContext.Topic, partition.Value), message) :
- localProducer.ProduceAsync(context.ProducerContext.Topic, message);
-
- result = await produceTask.ConfigureAwait(false);
- }
- catch (ProduceException e)
- {
- await _globalEvents.FireMessageProduceErrorAsync(new MessageErrorEventContext(context, e));
-
- if (e.Error.IsFatal)
+ if (item.Partition.HasValue)
{
- this.InvalidateProducer(e.Error, result);
+ producer.Produce(
+ new TopicPartition(
+ context.ProducerContext.Topic,
+ item.Partition.Value),
+ message,
+ report => OnMessageDelivered(report, item, context));
+ }
+ else
+ {
+ producer.Produce(
+ context.ProducerContext.Topic,
+ message,
+ report => OnMessageDelivered(report, item, context));
}
-
- throw;
}
-
- FillContextWithResultMetadata(context, result);
-
- return result;
+ catch (Exception e)
+ {
+ OnProduceError(context, e, null);
+ }
}
- private void InternalProduce(
+ private async void OnProduceError(
IMessageContext context,
- int? partition,
- Action> deliveryHandler)
+ Exception e,
+ DeliveryResult result)
{
- var localProducer = this.EnsureProducer();
- var message = CreateMessage(context);
+ await _globalEvents.FireMessageProduceErrorAsync(new MessageErrorEventContext(context, e));
- if (partition.HasValue)
+ if (e is KafkaException kafkaException && kafkaException.Error.IsFatal)
{
- localProducer.Produce(
- new TopicPartition(context.ProducerContext.Topic, partition.Value),
- message,
- Handler);
-
- return;
- }
-
- localProducer.Produce(
- context.ProducerContext.Topic,
- message,
- Handler);
-
- void Handler(DeliveryReport report)
- {
- if (report.Error.IsFatal)
- {
- this.InvalidateProducer(report.Error, report);
- }
-
- FillContextWithResultMetadata(context, report);
-
- deliveryHandler(report);
+ InvalidateProducer(kafkaException.Error, result);
}
}
@@ -377,4 +365,25 @@ private MessageContext CreateMessageContext(
new ProducerContext(topic, _producerDependencyScope.Resolver),
_configuration.Cluster.Brokers);
}
+
+ private class ProduceItem
+ {
+ public ProduceItem(
+ int? partition,
+ Action> deliveryHandler,
+ Task middlewareCompletion)
+ {
+ Partition = partition;
+ DeliveryHandler = deliveryHandler;
+ MiddlewareCompletion = middlewareCompletion;
+ }
+
+ public int? Partition { get; }
+
+ public Action> DeliveryHandler { get; }
+
+ public Task MiddlewareCompletion { get; }
+
+ public TaskCompletionSource> ProduceCompletionSource { get; } = new();
+ }
}