Skip to content

Commit 6022df1

Browse files
authored
Kafka: Update ProduceMessage activity with support for specifying a Key (#6166)
* Add Key to Kafka ProduceMessage activity Deleted unnecessary Consumer and Producer workflow classes and the OrderReceived message class to clean up code. Refactored Kafka producer interface and implementation to include message keys for improved message handling. Updated configuration to enable Kafka and removed unused service registrations. * Add Kafka factory classes and type alias registry Introduce GenericConsumerFactory and GenericProducerFactory for handling Kafka consumer and producer creation. Implement a TypeAliasRegistry to manage type aliases, enabling cleaner configuration through aliases. Update the OrderReceived message class and ensure better integration with the server web program via these new components. * Handle empty topics and predicates in Kafka worker. Ensure the Kafka consumer unsubscribes when no topics are available to subscribe to. Additionally, add a check to handle empty string values for predicates, allowing workflow triggers to proceed in this scenario. * Disable Kafka usage in Elsa Server Web configuration Kafka has been disabled in the current configuration by setting the useKafka constant to false. This change might be intended to switch to a different messaging system or to simplify the current setup by removing unnecessary services. Ensure that any dependencies on Kafka are handled elsewhere in the application.
1 parent fba1a19 commit 6022df1

File tree

16 files changed

+97
-114
lines changed

16 files changed

+97
-114
lines changed

src/apps/Elsa.Server.Web/Messages/OrderReceived.cs

-1
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,4 @@ namespace Elsa.Server.Web.Messages;
33
public class OrderReceived
44
{
55
public string OrderId { get; set; } = default!;
6-
public decimal OrderTotal { get; set; } = default!;
76
}

src/apps/Elsa.Server.Web/Program.cs

+8-5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Elsa.Alterations.MassTransit.Extensions;
55
using Elsa.Common.DistributedHosting.DistributedLocks;
66
using Elsa.Common.RecurringTasks;
7+
using Elsa.Common.Serialization;
78
using Elsa.Dapper.Extensions;
89
using Elsa.Dapper.Services;
910
using Elsa.DropIns.Extensions;
@@ -16,6 +17,7 @@
1617
using Elsa.Features.Services;
1718
using Elsa.Identity.Multitenancy;
1819
using Elsa.Kafka;
20+
using Elsa.Kafka.Factories;
1921
using Elsa.MassTransit.Extensions;
2022
using Elsa.MongoDb.Extensions;
2123
using Elsa.MongoDb.Modules.Alterations;
@@ -30,11 +32,11 @@
3032
using Elsa.Server.Web.Extensions;
3133
using Elsa.Server.Web.Filters;
3234
using Elsa.Server.Web.Messages;
33-
using Elsa.Server.Web.WorkflowContextProviders;
3435
using Elsa.Tenants.AspNetCore;
3536
using Elsa.Tenants.Extensions;
3637
using Elsa.Workflows.Api;
3738
using Elsa.Workflows.LogPersistence;
39+
using Elsa.Workflows.Management;
3840
using Elsa.Workflows.Management.Compression;
3941
using Elsa.Workflows.Management.Stores;
4042
using Elsa.Workflows.Runtime.Distributed.Extensions;
@@ -93,6 +95,10 @@
9395
var distributedLockProviderName = configuration.GetSection("Runtime:DistributedLocking")["Provider"];
9496
var appRole = Enum.Parse<ApplicationRole>(configuration["AppRole"] ?? "Default");
9597

98+
// Optionally create type aliases for easier configuration.
99+
TypeAliasRegistry.RegisterAlias("OrderReceivedProducerFactory", typeof(GenericProducerFactory<string, OrderReceived>));
100+
TypeAliasRegistry.RegisterAlias("OrderReceivedConsumerFactory", typeof(GenericConsumerFactory<string, OrderReceived>));
101+
96102
// Add Elsa services.
97103
services
98104
.AddElsa(elsa =>
@@ -198,6 +204,7 @@
198204

199205
management.SetDefaultLogPersistenceMode(LogPersistenceMode.Inherit);
200206
management.UseReadOnlyMode(useReadOnlyMode);
207+
management.AddVariableTypeAndAlias<OrderReceived>("Application");
201208
})
202209
.UseProtoActor(proto =>
203210
{
@@ -426,8 +433,6 @@
426433
// etc.
427434
});
428435
}
429-
430-
massTransit.AddMessageType<OrderReceived>();
431436
});
432437
}
433438

@@ -454,8 +459,6 @@
454459
{
455460
kafka.ConfigureOptions(options => configuration.GetSection("Kafka").Bind(options));
456461
});
457-
458-
services.AddWorkflowContextProvider<ConsumerDefinitionWorkflowContextProvider>();
459462
}
460463

461464
if (useAgents)

src/apps/Elsa.Server.Web/WorkflowContextProviders/ConsumerConfigWorkflowContextProvider.cs

-22
This file was deleted.

src/apps/Elsa.Server.Web/Workflows/ConsumerWorkflow.cs

-32
This file was deleted.

src/apps/Elsa.Server.Web/Workflows/ProducerWorkflow.cs

-22
This file was deleted.

src/apps/Elsa.Server.Web/appsettings.json

+2-10
Original file line numberDiff line numberDiff line change
@@ -206,21 +206,13 @@
206206
{
207207
"Id": "topic-2",
208208
"Name": "topic-2"
209-
},
210-
{
211-
"Id": "topic-3",
212-
"Name": "topic-3"
213-
},
214-
{
215-
"Id": "topic-4",
216-
"Name": "topic-4"
217209
}
218210
],
219211
"Producers": [
220212
{
221213
"Id": "producer-1",
222214
"Name": "Producer 1",
223-
"FactoryType": "Elsa.Kafka.Factories.ExpandoObjectProducerFactory, Elsa.Kafka",
215+
"FactoryType": "Elsa.Kafka.Factories.GenericProducerFactory`2[[System.String, System.Private.CoreLib], [Elsa.Server.Web.Messages.OrderReceived, Elsa.Server.Web]], Elsa.Kafka",
224216
"Config": {
225217
"BootstrapServers": "localhost:9092"
226218
}
@@ -230,7 +222,7 @@
230222
{
231223
"Id": "consumer-1",
232224
"Name": "Consumer 1",
233-
"FactoryType": "Elsa.Kafka.Factories.ExpandoObjectConsumerFactory, Elsa.Kafka",
225+
"FactoryType": "Elsa.Kafka.Factories.GenericConsumerFactory`2[[System.String, System.Private.CoreLib], [Elsa.Server.Web.Messages.OrderReceived, Elsa.Server.Web]], Elsa.Kafka",
234226
"Config": {
235227
"BootstrapServers": "localhost:9092",
236228
"GroupId": "group-1",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
namespace Elsa.Common.Serialization;
2+
3+
public static class TypeAliasRegistry
4+
{
5+
public static Dictionary<string, Type> TypeAliases { get; } = new();
6+
7+
public static void RegisterAlias(string alias, Type type) => TypeAliases[alias] = type;
8+
9+
public static Type? GetType(string alias) => TypeAliases.GetValueOrDefault(alias);
10+
}

src/modules/Elsa.Common/Serialization/TypeTypeConverter.cs

+8
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@ public override bool CanConvertFrom(ITypeDescriptorContext? context, Type source
1515
public override object? ConvertFrom(ITypeDescriptorContext? context, CultureInfo? culture, object value)
1616
{
1717
if (value is string stringValue)
18+
{
19+
if (TypeAliasRegistry.GetType(stringValue) is { } type)
20+
return type;
1821
return Type.GetType(stringValue);
22+
}
1923
return base.ConvertFrom(context, culture, value);
2024
}
2125

@@ -27,7 +31,11 @@ public override bool CanConvertTo(ITypeDescriptorContext? context, Type? destina
2731
public override object? ConvertTo(ITypeDescriptorContext? context, CultureInfo? culture, object? value, Type destinationType)
2832
{
2933
if (destinationType == typeof(string) && value is Type type)
34+
{
35+
if (TypeAliasRegistry.TypeAliases.FirstOrDefault(x => x.Value == type).Key is { } alias)
36+
return alias;
3037
return type.AssemblyQualifiedName;
38+
}
3139
return base.ConvertTo(context, culture, value, destinationType);
3240
}
3341
}

src/modules/Elsa.Kafka/Activities/ProduceMessage.cs

+20-11
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using Elsa.Workflows.Models;
88
using Elsa.Workflows.Runtime;
99
using Elsa.Workflows.UIHints;
10+
using Microsoft.Extensions.DependencyInjection;
1011
using Microsoft.Extensions.Options;
1112

1213
namespace Elsa.Kafka.Activities;
@@ -48,25 +49,33 @@ public class ProduceMessage : CodeActivity
4849
public Input<string?> CorrelationId { get; set; } = default!;
4950

5051
/// <summary>
51-
/// The content of the message to send.
52+
/// The content of the message to produce.
5253
/// </summary>
5354
[Input(Description = "The content of the message to produce.")]
5455
public Input<object> Content { get; set; } = default!;
5556

57+
/// <summary>
58+
/// The key of the message to send.
59+
/// </summary>
60+
[Input(Description = "The key of the message to produce.")]
61+
public Input<object?> Key { get; set; } = default!;
62+
5663
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
5764
{
65+
var cancellationToken = context.CancellationToken;
5866
var topic = Topic.Get(context);
5967
var producerDefinitionId = ProducerDefinitionId.Get(context);
6068
var producerDefinitionEnumerator = context.GetRequiredService<IProducerDefinitionEnumerator>();
6169
var producerDefinition = await producerDefinitionEnumerator.GetByIdAsync(producerDefinitionId);
6270
var content = Content.Get(context);
71+
var key = Key.Get(context);
72+
73+
if (key is string keyString && string.IsNullOrWhiteSpace(keyString))
74+
key = null;
6375

64-
context.DeferTask(async () =>
65-
{
66-
using var producer = CreateProducer(context, producerDefinition);
67-
var headers = CreateHeaders(context);
68-
await producer.ProduceAsync(topic, content, headers);
69-
});
76+
using var producer = CreateProducer(context, producerDefinition);
77+
var headers = CreateHeaders(context);
78+
await producer.ProduceAsync(topic, key, content, headers, cancellationToken);
7079
}
7180

7281
private Headers CreateHeaders(ActivityExecutionContext context)
@@ -84,14 +93,14 @@ private Headers CreateHeaders(ActivityExecutionContext context)
8493

8594
return headers;
8695
}
87-
96+
8897
private IProducer CreateProducer(ActivityExecutionContext context, ProducerDefinition producerDefinition)
8998
{
90-
var factory = context.GetRequiredService(producerDefinition.FactoryType) as IProducerFactory;
91-
99+
var factory = context.GetOrCreateService(producerDefinition.FactoryType) as IProducerFactory;
100+
92101
if (factory == null)
93102
throw new InvalidOperationException($"Producer factory of type '{producerDefinition.FactoryType}' not found.");
94-
103+
95104
var createProducerContext = new CreateProducerContext(producerDefinition);
96105
return factory.CreateProducer(createProducerContext);
97106
}

src/modules/Elsa.Kafka/Contracts/IProducer.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@ namespace Elsa.Kafka;
44

55
public interface IProducer : IDisposable
66
{
7-
Task ProduceAsync(string topic, object value, Headers? headers = null, CancellationToken cancellationToken = default);
7+
Task ProduceAsync(string topic, object? key, object value, Headers? headers = null, CancellationToken cancellationToken = default);
88
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
using Confluent.Kafka;
2+
using Elsa.Kafka.Implementations;
3+
using Elsa.Kafka.Serializers;
4+
5+
namespace Elsa.Kafka.Factories;
6+
7+
public class GenericConsumerFactory<TKey, TValue> : IConsumerFactory
8+
{
9+
public IConsumer CreateConsumer(CreateConsumerContext context)
10+
{
11+
var consumer = new ConsumerBuilder<TKey, TValue>(context.ConsumerDefinition.Config)
12+
.SetValueDeserializer(new JsonDeserializer<TValue>())
13+
.Build();
14+
return new ConsumerProxy(consumer);
15+
}
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
using Confluent.Kafka;
2+
using Elsa.Kafka.Implementations;
3+
using Elsa.Kafka.Serializers;
4+
5+
namespace Elsa.Kafka.Factories;
6+
7+
public class GenericProducerFactory<TKey, TValue> : IProducerFactory
8+
{
9+
public IProducer CreateProducer(CreateProducerContext workerContext)
10+
{
11+
var producer = new ProducerBuilder<TKey, TValue>(workerContext.ProducerDefinition.Config)
12+
.SetValueSerializer(new JsonSerializer<TValue>())
13+
.Build();
14+
return new ProducerProxy(producer);
15+
}
16+
}

src/modules/Elsa.Kafka/Handlers/TriggerWorkflows.cs

+3
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ private async Task<bool> EvaluatePredicateAsync(KafkaTransportMessage transportM
183183

184184
if (predicate == null)
185185
return true;
186+
187+
if(string.IsNullOrWhiteSpace(predicate.Value as string))
188+
return true;
186189

187190
var expressionExecutionContext = await GetExpressionExecutionContextAsync(transportMessage, binding, cancellationToken);
188191

src/modules/Elsa.Kafka/Implementations/ProducerProxy.cs

+6-7
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ public class ProducerProxy(object producer) : IProducer
77
{
88
private object Producer { get; } = producer;
99

10-
public async Task ProduceAsync(string topic, object value, Headers? headers = null, CancellationToken cancellationToken = default)
10+
public async Task ProduceAsync(string topic, object? key, object value, Headers? headers = null, CancellationToken cancellationToken = default)
1111
{
1212
var producerType = Producer.GetType();
1313
var keyType = producerType.GetGenericArguments()[0];
@@ -16,14 +16,13 @@ public async Task ProduceAsync(string topic, object value, Headers? headers = nu
1616
var produceAsyncMethod = producerType.GetMethod("ProduceAsync", [typeof(string), messageType, typeof(CancellationToken)])!;
1717
var messageInstance = Activator.CreateInstance(messageType);
1818
var convertedValue = value.ConvertTo(valueType);
19-
19+
2020
messageType.GetProperty("Value")!.SetValue(messageInstance, convertedValue);
21-
22-
if (headers != null)
23-
messageType.GetProperty("Headers")!.SetValue(messageInstance, headers);
24-
21+
if (key != null) messageType.GetProperty("Key")!.SetValue(messageInstance, key);
22+
if (headers != null) messageType.GetProperty("Headers")!.SetValue(messageInstance, headers);
23+
2524
await (Task)produceAsyncMethod.Invoke(Producer, [topic, messageInstance, cancellationToken])!;
26-
25+
2726
var flushMethod = producerType.GetMethod("Flush", [typeof(CancellationToken)])!;
2827
flushMethod.Invoke(Producer, [cancellationToken]);
2928
}

src/modules/Elsa.Kafka/Implementations/Worker.cs

+4-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ private void Subscribe(IEnumerable<string> topics)
9090
return;
9191

9292
_subscribedTopics = topicList.ToHashSet();
93-
consumer.Subscribe(_subscribedTopics);
93+
if(_subscribedTopics.Any())
94+
consumer.Subscribe(_subscribedTopics);
95+
else
96+
consumer.Unsubscribe();
9497

9598
logger.LogInformation("Subscribed to topics: {Topics}", string.Join(", ", _subscribedTopics));
9699
}

src/modules/Elsa.Kafka/Implementations/WorkerManager.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,9 @@ public void StopWorkers()
165165
private IWorker CreateWorker(IServiceProvider serviceProvider, ConsumerDefinition consumerDefinition)
166166
{
167167
var factoryType = consumerDefinition.FactoryType;
168-
169-
if (serviceProvider.GetRequiredService(factoryType) is not IConsumerFactory consumerFactory)
168+
var consumerFactory = ActivatorUtilities.GetServiceOrCreateInstance(serviceProvider, factoryType) as IConsumerFactory;
169+
170+
if (consumerFactory == null)
170171
throw new InvalidOperationException($"Worker factory of type '{factoryType}' not found.");
171172

172173
var createConsumerContext = new CreateConsumerContext(consumerDefinition);

0 commit comments

Comments
 (0)