Skip to content

Commit

Permalink
Kafka: Added Serialized Headers to WorkflowInput (#4572)
Browse files Browse the repository at this point in the history
* Added serialized message headers to MessageReceivedInput --> WorkflowInput

* Changed type of headers from string to dictionary<string,string>

---------

Co-authored-by: Yannick Laubscher <[email protected]>
  • Loading branch information
Snotax and yannicklaubscherswt authored Oct 30, 2023
1 parent 39c3504 commit 2caab0c
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\core\Elsa.Core\Elsa.Core.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.9.0" />
<PackageReference Include="Confluent.SchemaRegistry" Version="1.9.0" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.9.0" />
<PackageReference Include="System.Reactive" Version="5.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\core\Elsa.Core\Elsa.Core.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Confluent.Kafka;
using System;
using System.Collections.Generic;
using System.Linq;
Expand All @@ -8,6 +9,8 @@ namespace Elsa.Activities.Kafka.Models
{
public class MessageReceivedInput
{
public Dictionary<string, string>? MessageHeaders { get; set; }

public string? MessageString { get; set; }

public byte[]? MessageBytes { get; set; }
Expand Down
3 changes: 2 additions & 1 deletion src/activities/Elsa.Activities.Kafka/Services/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Elsa.Services.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Elsa.Activities.Kafka.Services
{
Expand Down Expand Up @@ -91,7 +92,7 @@ private async Task TriggerWorkflowsAsync(KafkaMessageEvent ev, CancellationToken
using var scope = _scopeFactory.CreateScope();
var config = _client.Configuration;
var tenantId = await _tenantIdResolver.ResolveAsync(ev, config.Topic, config.Group, Tags, cancellationToken);
var workflowInput = new WorkflowInput(new MessageReceivedInput() { MessageBytes = ev.Message.Value, MessageString = Encoding.ASCII.GetString(ev.Message.Value) });
var workflowInput = new WorkflowInput(new MessageReceivedInput() { MessageBytes = ev.Message.Value, MessageString = Encoding.ASCII.GetString(ev.Message.Value), MessageHeaders = GetHeaders(ev.Message.Headers,false) });

// Schema extraction if injected
var schemaResolver = scope.ServiceProvider.GetRequiredService<ISchemaResolver>();
Expand Down

0 comments on commit 2caab0c

Please sign in to comment.