Skip to content

Commit

Permalink
Merge pull request #50 from danielgerlag/read-ahead
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgerlag authored Jul 22, 2017
2 parents db64e1f + 236c8e7 commit 11b5265
Show file tree
Hide file tree
Showing 91 changed files with 1,628 additions and 840 deletions.
20 changes: 20 additions & 0 deletions ReleaseNotes/1.3.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Workflow Core 1.3.0

* Added support for async steps

Simply inherit from `StepBodyAsync` instead of `StepBody`

```c#
public class DoSomething : StepBodyAsync
{
public override async Task<ExecutionResult> RunAsync(IStepExecutionContext context)
{
await Task.Delay(2000);
return ExecutionResult.Next();
}
}
```

* Migrated from managing own thread pool to TPL datablocks for queue consumers

* After executing a workflow, will determine if it is scheduled to run before the next poll, if so, will delay queue it
81 changes: 41 additions & 40 deletions WorkflowCore.sln

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

namespace WorkflowCore.Interface
{
public interface IBackgroundWorker
public interface IBackgroundTask
{
void Start();
void Stop();
Expand Down
9 changes: 9 additions & 0 deletions src/WorkflowCore/Interface/IDateTimeProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System;

namespace WorkflowCore.Interface
{
public interface IDateTimeProvider
{
DateTime Now { get; }
}
}
3 changes: 2 additions & 1 deletion src/WorkflowCore/Interface/IDistributedLockProvider.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace WorkflowCore.Interface
Expand All @@ -11,7 +12,7 @@ namespace WorkflowCore.Interface
/// </remarks>
public interface IDistributedLockProvider
{
Task<bool> AcquireLock(string Id);
Task<bool> AcquireLock(string Id, CancellationToken cancellationToken);

Task ReleaseLock(string Id);

Expand Down
7 changes: 0 additions & 7 deletions src/WorkflowCore/Interface/IEventThread.cs

This file was deleted.

4 changes: 2 additions & 2 deletions src/WorkflowCore/Interface/IPersistenceProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public interface IPersistenceProvider

Task PersistWorkflow(WorkflowInstance workflow);

Task<IEnumerable<string>> GetRunnableInstances();
Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt);

Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take);

Expand All @@ -32,7 +32,7 @@ public interface IPersistenceProvider

Task<Event> GetEvent(string id);

Task<IEnumerable<string>> GetRunnableEvents();
Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt);

Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf);

Expand Down
5 changes: 4 additions & 1 deletion src/WorkflowCore/Interface/IQueueProvider.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using WorkflowCore.Models;

Expand All @@ -25,7 +26,9 @@ public interface IQueueProvider : IDisposable
/// If the queue is empty, NULL is returned
/// </summary>
/// <returns></returns>
Task<string> DequeueWork(QueueType queue);
Task<string> DequeueWork(QueueType queue, CancellationToken cancellationToken);

bool IsDequeueBlocking { get; }

Task Start();
Task Stop();
Expand Down
7 changes: 0 additions & 7 deletions src/WorkflowCore/Interface/IRunnablePoller.cs

This file was deleted.

2 changes: 1 addition & 1 deletion src/WorkflowCore/Interface/IStepBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ namespace WorkflowCore.Interface
{
public interface IStepBody
{
ExecutionResult Run(IStepExecutionContext context);
Task<ExecutionResult> RunAsync(IStepExecutionContext context);
}
}
2 changes: 1 addition & 1 deletion src/WorkflowCore/Interface/IWorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ namespace WorkflowCore.Interface
{
public interface IWorkflowExecutor
{
WorkflowExecutorResult Execute(WorkflowInstance workflow, WorkflowOptions options);
Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow, WorkflowOptions options);
}
}
7 changes: 0 additions & 7 deletions src/WorkflowCore/Interface/IWorkflowThread.cs

This file was deleted.

7 changes: 6 additions & 1 deletion src/WorkflowCore/Models/StepBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ namespace WorkflowCore.Models
{
public abstract class StepBody : IStepBody
{

public abstract ExecutionResult Run(IStepExecutionContext context);

public Task<ExecutionResult> RunAsync(IStepExecutionContext context)
{
return Task.FromResult(Run(context));
}

protected ExecutionResult OutcomeResult(object value)
{
return new ExecutionResult()
Expand Down
14 changes: 14 additions & 0 deletions src/WorkflowCore/Models/StepBodyAsync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using WorkflowCore.Interface;

namespace WorkflowCore.Models
{
public abstract class StepBodyAsync : IStepBody
{
public abstract Task<ExecutionResult> RunAsync(IStepExecutionContext context);

}
}
2 changes: 2 additions & 0 deletions src/WorkflowCore/Models/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public class WorkflowInstance

public string Description { get; set; }

public string Reference { get; set; }

public List<ExecutionPointer> ExecutionPointers { get; set; } = new List<ExecutionPointer>();

public long? NextExecution { get; set; }
Expand Down
9 changes: 1 addition & 8 deletions src/WorkflowCore/Models/WorkflowOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@ public class WorkflowOptions
internal Func<IServiceProvider, IPersistenceProvider> PersistanceFactory;
internal Func<IServiceProvider, IQueueProvider> QueueFactory;
internal Func<IServiceProvider, IDistributedLockProvider> LockFactory;
internal int ThreadCount;
internal TimeSpan PollInterval;
internal TimeSpan IdleTime;
internal TimeSpan ErrorRetryInterval;

public WorkflowOptions()
{
//set defaults
ThreadCount = Environment.ProcessorCount;
PollInterval = TimeSpan.FromSeconds(10);
IdleTime = TimeSpan.FromMilliseconds(500);
IdleTime = TimeSpan.FromMilliseconds(100);
ErrorRetryInterval = TimeSpan.FromSeconds(60);

QueueFactory = new Func<IServiceProvider, IQueueProvider>(sp => new SingleNodeQueueProvider());
Expand All @@ -45,11 +43,6 @@ public void UseQueueProvider(Func<IServiceProvider, IQueueProvider> factory)
QueueFactory = factory;
}

public void UseThreads(int count)
{
ThreadCount = count;
}

public void UsePollInterval(TimeSpan interval)
{
PollInterval = interval;
Expand Down
1 change: 0 additions & 1 deletion src/WorkflowCore/Models/WorkflowStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public virtual IStepBody ConstructBody(IServiceProvider serviceProvider)
}
return body;
}

}

public enum ExecutionPipelineDirective { Next = 0, Defer = 1, EndWorkflow = 2 }
Expand Down
16 changes: 12 additions & 4 deletions src/WorkflowCore/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
using WorkflowCore.Services;
using WorkflowCore.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using WorkflowCore.Primitives;
using WorkflowCore.Services.BackgroundTasks;

namespace Microsoft.Extensions.DependencyInjection
{
Expand All @@ -23,13 +25,19 @@ public static void AddWorkflow(this IServiceCollection services, Action<Workflow
services.AddSingleton<IDistributedLockProvider>(options.LockFactory);
services.AddSingleton<IWorkflowRegistry, WorkflowRegistry>();
services.AddSingleton<WorkflowOptions>(options);

services.AddTransient<IBackgroundTask, WorkflowConsumer>();
services.AddTransient<IBackgroundTask, EventConsumer>();
services.AddTransient<IBackgroundTask, RunnablePoller>();

services.AddSingleton<IWorkflowHost, WorkflowHost>();
services.AddTransient<IWorkflowExecutor, WorkflowExecutor>();
services.AddTransient<IWorkflowBuilder, WorkflowBuilder>();
services.AddTransient<IWorkflowThread, WorkflowThread>();
services.AddTransient<IEventThread, EventThread>();
services.AddTransient<IRunnablePoller, RunnablePoller>();

services.AddTransient<IDateTimeProvider, DateTimeProvider>();

services.AddTransient<IPooledObjectPolicy<IPersistenceProvider>, InjectedObjectPoolPolicy<IPersistenceProvider>>();
services.AddTransient<IPooledObjectPolicy<IWorkflowExecutor>, InjectedObjectPoolPolicy<IWorkflowExecutor>>();

services.AddTransient<Foreach>();
}
}
Expand Down
95 changes: 95 additions & 0 deletions src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using WorkflowCore.Interface;
using WorkflowCore.Models;

namespace WorkflowCore.Services.BackgroundTasks
{
internal class EventConsumer : QueueConsumer, IBackgroundTask
{
private readonly IPersistenceProvider _persistenceStore;
private readonly IDistributedLockProvider _lockProvider;
private readonly IDateTimeProvider _datetimeProvider;

protected override QueueType Queue => QueueType.Event;

public EventConsumer(IPersistenceProvider persistenceStore, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, WorkflowOptions options, IDateTimeProvider datetimeProvider)
: base(queueProvider, loggerFactory, options)
{
_persistenceStore = persistenceStore;
_lockProvider = lockProvider;
_datetimeProvider = datetimeProvider;
}

protected override async Task ProcessItem(string itemId, CancellationToken cancellationToken)
{
if (await _lockProvider.AcquireLock($"evt:{itemId}", cancellationToken))
{
try
{
cancellationToken.ThrowIfCancellationRequested();
var evt = await _persistenceStore.GetEvent(itemId);
if (evt.EventTime <= _datetimeProvider.Now.ToUniversalTime())
{
var subs = await _persistenceStore.GetSubcriptions(evt.EventName, evt.EventKey, evt.EventTime);
var success = true;

foreach (var sub in subs.ToList())
success = success && await SeedSubscription(evt, sub, cancellationToken);

if (success)
await _persistenceStore.MarkEventProcessed(itemId);
}
}
finally
{
await _lockProvider.ReleaseLock($"evt:{itemId}");
}
}
else
{
Logger.LogInformation($"Event locked {itemId}");
}
}

private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, CancellationToken cancellationToken)
{
if (await _lockProvider.AcquireLock(sub.WorkflowId, cancellationToken))
{
try
{
var workflow = await _persistenceStore.GetWorkflowInstance(sub.WorkflowId);
var pointers = workflow.ExecutionPointers.Where(p => p.EventName == sub.EventName && p.EventKey == sub.EventKey && !p.EventPublished);
foreach (var p in pointers)
{
p.EventData = evt.EventData;
p.EventPublished = true;
p.Active = true;
}
workflow.NextExecution = 0;
await _persistenceStore.PersistWorkflow(workflow);
await _persistenceStore.TerminateSubscription(sub.Id);
return true;
}
catch (Exception ex)
{
Logger.LogError(ex.Message);
return false;
}
finally
{
await _lockProvider.ReleaseLock(sub.WorkflowId);
await QueueProvider.QueueWork(sub.WorkflowId, QueueType.Workflow);
}
}
else
{
Logger.LogInformation("Workflow locked {0}", sub.WorkflowId);
return false;
}
}
}
}
Loading

0 comments on commit 11b5265

Please sign in to comment.