Skip to content

feat: add DomainEvent activity #5994

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions src/modules/Elsa.Webhooks/Handlers/DomainEventHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using Elsa.Mediator.Contracts;
using Elsa.Webhooks.Models;
using Elsa.Workflows.Runtime.Notifications;
using JetBrains.Annotations;
using WebhooksCore;

namespace Elsa.Webhooks.Handlers;

/// Handles the <see cref="DomainEventNotification"/> notification and asynchronously invokes all registered webhook endpoints.
[UsedImplicitly]
public class DomainEventHandler(IWebhookEventBroadcaster webhookDispatcher) : INotificationHandler<DomainEventNotification>
{
/// <inheritdoc />
public async Task HandleAsync(DomainEventNotification notification, CancellationToken cancellationToken)
{
var activityExecutionContext = notification.ActivityExecutionContext;
var workflowExecutionContext = activityExecutionContext.WorkflowExecutionContext;
var workflowInstanceId = workflowExecutionContext.Id;
var correlationId = workflowExecutionContext.CorrelationId;
var workflow = workflowExecutionContext.Workflow;
var workflowDefinitionId = workflow.Identity.DefinitionId;
var workflowName = workflow.WorkflowMetadata.Name;

var payload = new DomainEventWebhookPayload(
workflowInstanceId,
workflowDefinitionId,
workflowName,
correlationId,
notification.DomainEventId,
notification.DomainEventName,
notification.DomainEventPayload
);

var webhookEvent = new NewWebhookEvent("Elsa.DomainEvent", payload);
await webhookDispatcher.BroadcastAsync(webhookEvent, cancellationToken);
}
}
13 changes: 13 additions & 0 deletions src/modules/Elsa.Webhooks/Models/DomainEventWebhookPayload.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace Elsa.Webhooks.Models;

/// <summary>
/// Stores payload information about the DomainEvent webhook event type.
/// </summary>
public record DomainEventWebhookPayload(
string WorkflowInstanceId,
string WorkflowDefinitionId,
string? WorkflowName,
string? CorrelationId,
string DomainEventId,
string DomainEventName,
object? DomainEventPayload);
42 changes: 42 additions & 0 deletions src/modules/Elsa.Workflows.Runtime/Activities/DomainEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using Elsa.Extensions;
using Elsa.Workflows.Attributes;
using Elsa.Workflows.Models;
using Elsa.Workflows.Runtime.Notifications;
using JetBrains.Annotations;

namespace Elsa.Workflows.Runtime.Activities;

/// <summary>
/// Notifies the application that a domain event with a given name is requested to be published.
/// </summary>
[Activity("Elsa", "Primitives", "Requests a given domain event to be published. ", Kind = ActivityKind.Action)]
[UsedImplicitly]
public class DomainEvent: Activity
{
/// <summary>
/// The name of the domain event being published.
/// </summary>
[Input(Description = "The name of the domain event being published.")]
public Input<string> DomainEventName { get; set; } = default!;

/// <summary>
/// The payload of the domain event being published.
/// </summary>
[Input(Description = "Any additional parameters to send to the domain event.")]
public Input<object?> Payload { get; set; } = default!;

protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
var domainEventName = DomainEventName.Get(context);
var identityGenerator = context.GetRequiredService<IIdentityGenerator>();
var domainEventId = identityGenerator.GenerateId();

// Publish the domain event
var domainEventPayload = Payload.GetOrDefault(context);
var domainEventNotification = new DomainEventNotification(context, domainEventId, domainEventName, domainEventPayload);
var dispatcher = context.GetRequiredService<IDomainEventDispatcher>();

await dispatcher.DispatchAsync(domainEventNotification, context.CancellationToken);
await context.CompleteActivityAsync();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using Elsa.Workflows.Runtime.Notifications;

namespace Elsa.Workflows.Runtime;

/// <summary>
/// Dispatches a domain event.
/// </summary>
public interface IDomainEventDispatcher
{
/// <summary>
/// Asynchronously publishes the specified event using the domain event dispatcher.
/// </summary>
Task DispatchAsync(DomainEventNotification domainEvent, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public WorkflowRuntimeFeature(IModule module) : base(module)
/// A factory that instantiates an <see cref="ITaskDispatcher"/>.
/// </summary>
public Func<IServiceProvider, ITaskDispatcher> RunTaskDispatcher { get; set; } = sp => sp.GetRequiredService<BackgroundTaskDispatcher>();

/// <summary>
/// A factory that instantiates an <see cref="IDomainEventDispatcher"/>.
/// </summary>
public Func<IServiceProvider, IDomainEventDispatcher> DomainEventDispatcher { get; set; } = sp => sp.GetRequiredService<BackgroundDomainEventDispatcher>();

/// <summary>
/// A factory that instantiates an <see cref="IBackgroundActivityScheduler"/>.
Expand Down Expand Up @@ -226,6 +231,7 @@ public override void Apply()
.AddScoped(WorkflowDispatcher)
.AddScoped(WorkflowCancellationDispatcher)
.AddScoped(RunTaskDispatcher)
.AddScoped(DomainEventDispatcher)
.AddScoped(ActivityExecutionLogSink)
.AddScoped(WorkflowExecutionLogSink)
.AddSingleton(BackgroundActivityScheduler)
Expand All @@ -248,7 +254,9 @@ public override void Apply()
.AddScoped<IBookmarkBoundWorkflowService, BookmarkBoundWorkflowService>()
.AddScoped<ITaskReporter, TaskReporter>()
.AddScoped<SynchronousTaskDispatcher>()
.AddScoped<SynchronousDomainEventDispatcher>()
.AddScoped<BackgroundTaskDispatcher>()
.AddScoped<BackgroundDomainEventDispatcher>()
.AddScoped<StoreActivityExecutionLogSink>()
.AddScoped<StoreWorkflowExecutionLogSink>()
.AddScoped<DispatchWorkflowCommandHandler>()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using Elsa.Mediator.Contracts;

namespace Elsa.Workflows.Runtime.Notifications;

/// <summary>
/// A domain event that applications can subscribe to.
/// </summary>
/// <param name="ActivityExecutionContext">The context of the activity that requested the domain event to be published.</param>
/// <param name="DomainEventId">A unique identifier for an individual domain event notification.</param>
/// <param name="DomainEventName">The name of the domain event requested to be published.</param>
/// <param name="DomainEventPayload">Any additional parameters to send to the domain event.</param>
public record DomainEventNotification(ActivityExecutionContext ActivityExecutionContext, string DomainEventId, string DomainEventName, object? DomainEventPayload) : INotification;
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using Elsa.Mediator;
using Elsa.Mediator.Contracts;
using Elsa.Workflows.Runtime.Notifications;

namespace Elsa.Workflows.Runtime;

/// <summary>
/// Relies on the <see cref="INotificationSender"/> to publish the received request as a domain event from a background worker.
/// </summary>
public class BackgroundDomainEventDispatcher(INotificationSender notificationSender) : IDomainEventDispatcher
{
/// <inheritdoc />
public async Task DispatchAsync(DomainEventNotification request, CancellationToken cancellationToken = default)
{
await notificationSender.SendAsync(request, NotificationStrategy.Background, cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using Elsa.Mediator.Contracts;
using Elsa.Workflows.Runtime.Notifications;

namespace Elsa.Workflows.Runtime;

/// <summary>
/// Relies on the <see cref="INotificationSender"/> to synchronously publish the received request as a domain event.
/// </summary>
public class SynchronousDomainEventDispatcher : IDomainEventDispatcher
{
private readonly INotificationSender _notificationSender;

/// <summary>
/// Constructor.
/// </summary>
public SynchronousDomainEventDispatcher(INotificationSender notificationSender) => _notificationSender = notificationSender;

/// <inheritdoc />
public async Task DispatchAsync(DomainEventNotification request, CancellationToken cancellationToken = default) => await _notificationSender.SendAsync(request, cancellationToken);
}
Loading