Skip to content

Commit

Permalink
Workflow life cycle events (#215)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgerlag authored Dec 30, 2018
1 parent 9cf5181 commit 156d5bf
Show file tree
Hide file tree
Showing 31 changed files with 629 additions and 111 deletions.
5 changes: 5 additions & 0 deletions ReleaseNotes/1.6.9.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Workflow Core 1.6.9

This release adds functionality to subscribe to workflow life cycle events (WorkflowStarted, WorkflowComplete, WorkflowError, WorkflowSuspended, WorkflowResumed, StepStarted, StepCompleted, etc...)
This can be achieved by either grabbing the `ILifeCycleEventHub` implementation from the IoC container and subscribing to events there, or attach an event on the workflow host class `IWorkflowHost.OnLifeCycleEvent`.
This implementation only publishes events to the local node... we will still need to implement a distributed version of the EventHub to solve the problem for multi-node clusters.
1 change: 1 addition & 0 deletions WorkflowCore.sln
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ReleaseNotes", "ReleaseNote
ReleaseNotes\1.6.0.md = ReleaseNotes\1.6.0.md
ReleaseNotes\1.6.6.md = ReleaseNotes\1.6.6.md
ReleaseNotes\1.6.8.md = ReleaseNotes\1.6.8.md
ReleaseNotes\1.6.9.md = ReleaseNotes\1.6.9.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample14", "src\samples\WorkflowCore.Sample14\WorkflowCore.Sample14.csproj", "{6BC66637-B42A-4334-ADFB-DBEC9F29D293}"
Expand Down
2 changes: 1 addition & 1 deletion src/WorkflowCore/Interface/IExecutionResultProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace WorkflowCore.Interface
{
public interface IExecutionResultProcessor
{
void HandleStepException(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step);
void HandleStepException(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, Exception exception);
void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, ExecutionResult result, WorkflowExecutorResult workflowResult);
}
}
14 changes: 14 additions & 0 deletions src/WorkflowCore/Interface/ILifeCycleEventHub.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using WorkflowCore.Models.LifeCycleEvents;

namespace WorkflowCore.Interface
{
public interface ILifeCycleEventHub
{
Task PublishNotification(LifeCycleEvent evt);
void Subscribe(Action<LifeCycleEvent> action);
}
}
12 changes: 12 additions & 0 deletions src/WorkflowCore/Interface/ILifeCycleEventPublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Models.LifeCycleEvents;

namespace WorkflowCore.Interface
{
public interface ILifeCycleEventPublisher : IBackgroundTask
{
void PublishNotification(LifeCycleEvent evt);
}
}
13 changes: 13 additions & 0 deletions src/WorkflowCore/Interface/IWorkflowErrorHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Models;

namespace WorkflowCore.Interface
{
public interface IWorkflowErrorHandler
{
WorkflowErrorHandling Type { get; }
void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, Exception exception, Queue<ExecutionPointer> bubbleUpQueue);
}
}
5 changes: 4 additions & 1 deletion src/WorkflowCore/Interface/IWorkflowHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System;
using System.Threading.Tasks;
using WorkflowCore.Models;
using WorkflowCore.Models.LifeCycleEvents;

namespace WorkflowCore.Interface
{
Expand All @@ -19,6 +20,7 @@ public interface IWorkflowHost : IWorkflowController


event StepErrorEventHandler OnStepError;
event LifeCycleEventHandler OnLifeCycleEvent;
void ReportStepError(WorkflowInstance workflow, WorkflowStep step, Exception exception);

//public dependencies to allow for extension method access
Expand All @@ -32,4 +34,5 @@ public interface IWorkflowHost : IWorkflowController
}

public delegate void StepErrorEventHandler(WorkflowInstance workflow, WorkflowStep step, Exception exception);
}
public delegate void LifeCycleEventHandler(LifeCycleEvent evt);
}
19 changes: 19 additions & 0 deletions src/WorkflowCore/Models/LifeCycleEvents/LifeCycleEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace WorkflowCore.Models.LifeCycleEvents
{
public abstract class LifeCycleEvent
{
public DateTime EventTimeUtc { get; set; }

public string WorkflowInsanceId { get; set; }

public string WorkflowDefinitionId { get; set; }

public int Version { get; set; }

public string Reference { get; set; }
}
}
13 changes: 13 additions & 0 deletions src/WorkflowCore/Models/LifeCycleEvents/StepCompleted.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace WorkflowCore.Models.LifeCycleEvents
{
public class StepCompleted : LifeCycleEvent
{
public string ExecutionPointerId { get; set; }

public int StepId { get; set; }
}
}
13 changes: 13 additions & 0 deletions src/WorkflowCore/Models/LifeCycleEvents/StepStarted.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace WorkflowCore.Models.LifeCycleEvents
{
public class StepStarted : LifeCycleEvent
{
public string ExecutionPointerId { get; set; }

public int StepId { get; set; }
}
}
10 changes: 10 additions & 0 deletions src/WorkflowCore/Models/LifeCycleEvents/WorkflowCompleted.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace WorkflowCore.Models.LifeCycleEvents
{
public class WorkflowCompleted : LifeCycleEvent
{
}
}
15 changes: 15 additions & 0 deletions src/WorkflowCore/Models/LifeCycleEvents/WorkflowError.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace WorkflowCore.Models.LifeCycleEvents
{
public class WorkflowError : LifeCycleEvent
{
public string Message { get; set; }

public string ExecutionPointerId { get; set; }

public int StepId { get; set; }
}
}
10 changes: 10 additions & 0 deletions src/WorkflowCore/Models/LifeCycleEvents/WorkflowResumed.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace WorkflowCore.Models.LifeCycleEvents
{
public class WorkflowResumed : LifeCycleEvent
{
}
}
10 changes: 10 additions & 0 deletions src/WorkflowCore/Models/LifeCycleEvents/WorkflowStarted.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace WorkflowCore.Models.LifeCycleEvents
{
public class WorkflowStarted : LifeCycleEvent
{
}
}
10 changes: 10 additions & 0 deletions src/WorkflowCore/Models/LifeCycleEvents/WorkflowSuspended.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace WorkflowCore.Models.LifeCycleEvents
{
public class WorkflowSuspended : LifeCycleEvent
{
}
}
10 changes: 10 additions & 0 deletions src/WorkflowCore/Models/LifeCycleEvents/WorkflowTerminated.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace WorkflowCore.Models.LifeCycleEvents
{
public class WorkflowTerminated : LifeCycleEvent
{
}
}
8 changes: 8 additions & 0 deletions src/WorkflowCore/Models/WorkflowOptions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.DependencyInjection;
using System;
using Microsoft.Extensions.Logging;
using WorkflowCore.Interface;
using WorkflowCore.Services;

Expand All @@ -10,6 +11,7 @@ public class WorkflowOptions
internal Func<IServiceProvider, IPersistenceProvider> PersistanceFactory;
internal Func<IServiceProvider, IQueueProvider> QueueFactory;
internal Func<IServiceProvider, IDistributedLockProvider> LockFactory;
internal Func<IServiceProvider, ILifeCycleEventHub> EventHubFactory;
internal TimeSpan PollInterval;
internal TimeSpan IdleTime;
internal TimeSpan ErrorRetryInterval;
Expand All @@ -26,6 +28,7 @@ public WorkflowOptions(IServiceCollection services)
QueueFactory = new Func<IServiceProvider, IQueueProvider>(sp => new SingleNodeQueueProvider());
LockFactory = new Func<IServiceProvider, IDistributedLockProvider>(sp => new SingleNodeLockProvider());
PersistanceFactory = new Func<IServiceProvider, IPersistenceProvider>(sp => new MemoryPersistenceProvider());
EventHubFactory = new Func<IServiceProvider, ILifeCycleEventHub>(sp => new SingleNodeEventHub(sp.GetService<ILoggerFactory>()));
}

public void UsePersistence(Func<IServiceProvider, IPersistenceProvider> factory)
Expand All @@ -43,6 +46,11 @@ public void UseQueueProvider(Func<IServiceProvider, IQueueProvider> factory)
QueueFactory = factory;
}

public void UseEventHub(Func<IServiceProvider, ILifeCycleEventHub> factory)
{
EventHubFactory = factory;
}

public void UsePollInterval(TimeSpan interval)
{
PollInterval = interval;
Expand Down
9 changes: 9 additions & 0 deletions src/WorkflowCore/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using WorkflowCore.Primitives;
using WorkflowCore.Services.BackgroundTasks;
using WorkflowCore.Services.DefinitionStorage;
using WorkflowCore.Services.ErrorHandlers;

namespace Microsoft.Extensions.DependencyInjection
{
Expand All @@ -26,12 +27,20 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A
services.AddTransient<IPersistenceProvider>(options.PersistanceFactory);
services.AddSingleton<IQueueProvider>(options.QueueFactory);
services.AddSingleton<IDistributedLockProvider>(options.LockFactory);
services.AddSingleton<ILifeCycleEventHub>(options.EventHubFactory);
services.AddSingleton<IWorkflowRegistry, WorkflowRegistry>();
services.AddSingleton<WorkflowOptions>(options);
services.AddSingleton<ILifeCycleEventPublisher, LifeCycleEventPublisher>();

services.AddTransient<IBackgroundTask, WorkflowConsumer>();
services.AddTransient<IBackgroundTask, EventConsumer>();
services.AddTransient<IBackgroundTask, RunnablePoller>();
services.AddTransient<IBackgroundTask>(sp => sp.GetService<ILifeCycleEventPublisher>());

services.AddTransient<IWorkflowErrorHandler, CompensateHandler>();
services.AddTransient<IWorkflowErrorHandler, RetryHandler>();
services.AddTransient<IWorkflowErrorHandler, TerminateHandler>();
services.AddTransient<IWorkflowErrorHandler, SuspendHandler>();

services.AddSingleton<IWorkflowController, WorkflowController>();
services.AddSingleton<IWorkflowHost, WorkflowHost>();
Expand Down
45 changes: 45 additions & 0 deletions src/WorkflowCore/Services/DefaultProviders/SingleNodeEventHub.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using WorkflowCore.Interface;
using WorkflowCore.Models.LifeCycleEvents;

namespace WorkflowCore.Services
{
public class SingleNodeEventHub : ILifeCycleEventHub
{
private ICollection<Action<LifeCycleEvent>> _subscribers = new HashSet<Action<LifeCycleEvent>>();
private readonly ILogger _logger;

public SingleNodeEventHub(ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<SingleNodeEventHub>();
}

public Task PublishNotification(LifeCycleEvent evt)
{
Task.Run(() =>
{
foreach (var subscriber in _subscribers)
{
try
{
subscriber(evt);
}
catch (Exception ex)
{
_logger.LogWarning(default(EventId), ex, $"Error on event subscriber: {ex.Message}");
}
}
});
return Task.CompletedTask;
}

public void Subscribe(Action<LifeCycleEvent> action)
{
_subscribers.Add(action);
}
}
}
Loading

0 comments on commit 156d5bf

Please sign in to comment.