Skip to content

Commit

Permalink
Merge pull request #250 from MarioAndron/master
Browse files Browse the repository at this point in the history
Enhanced to allow scoped dependencies to be injected into steps
  • Loading branch information
danielgerlag authored Feb 2, 2019
2 parents e432405 + 8df0cee commit e183903
Show file tree
Hide file tree
Showing 8 changed files with 354 additions and 44 deletions.
17 changes: 17 additions & 0 deletions src/WorkflowCore/Interface/IScopeProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using Microsoft.Extensions.DependencyInjection;

namespace WorkflowCore.Interface
{
/// <remarks>
/// The implemention of this interface will be responsible for
/// providing a new service scope for a DI container
/// </remarks>
public interface IScopeProvider
{
/// <summary>
/// Create a new service scope
/// </summary>
/// <returns></returns>
IServiceScope CreateScope();
}
}
1 change: 1 addition & 0 deletions src/WorkflowCore/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A

services.AddSingleton<IWorkflowController, WorkflowController>();
services.AddSingleton<IWorkflowHost, WorkflowHost>();
services.AddTransient<IScopeProvider, ScopeProvider>();
services.AddTransient<IWorkflowExecutor, WorkflowExecutor>();
services.AddTransient<ICancellationProcessor, CancellationProcessor>();
services.AddTransient<IWorkflowBuilder, WorkflowBuilder>();
Expand Down
25 changes: 25 additions & 0 deletions src/WorkflowCore/Services/ScopeProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using Microsoft.Extensions.DependencyInjection;
using System;
using WorkflowCore.Interface;

namespace WorkflowCore.Services
{
/// <summary>
/// A concrete implementation for the IScopeProvider interface
/// Largely to get around the problems of unit testing an extension method (CreateScope())
/// </summary>
public class ScopeProvider : IScopeProvider
{
private readonly IServiceProvider provider;

public ScopeProvider(IServiceProvider provider)
{
this.provider = provider;
}

public IServiceScope CreateScope()
{
return provider.CreateScope();
}
}
}
89 changes: 47 additions & 42 deletions src/WorkflowCore/Services/WorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class WorkflowExecutor : IWorkflowExecutor
{
protected readonly IWorkflowRegistry _registry;
protected readonly IServiceProvider _serviceProvider;
protected readonly IScopeProvider _scopeProvider;
protected readonly IDateTimeProvider _datetimeProvider;
protected readonly ILogger _logger;
private readonly IExecutionResultProcessor _executionResultProcessor;
Expand All @@ -23,9 +24,10 @@ public class WorkflowExecutor : IWorkflowExecutor

private IWorkflowHost Host => _serviceProvider.GetService<IWorkflowHost>();

public WorkflowExecutor(IWorkflowRegistry registry, IServiceProvider serviceProvider, IDateTimeProvider datetimeProvider, IExecutionResultProcessor executionResultProcessor, ILifeCycleEventPublisher publisher, ICancellationProcessor cancellationProcessor, WorkflowOptions options, ILoggerFactory loggerFactory)
public WorkflowExecutor(IWorkflowRegistry registry, IServiceProvider serviceProvider, IScopeProvider scopeProvider, IDateTimeProvider datetimeProvider, IExecutionResultProcessor executionResultProcessor, ILifeCycleEventPublisher publisher, ICancellationProcessor cancellationProcessor, WorkflowOptions options, ILoggerFactory loggerFactory)
{
_serviceProvider = serviceProvider;
_scopeProvider = scopeProvider;
_registry = registry;
_datetimeProvider = datetimeProvider;
_publisher = publisher;
Expand Down Expand Up @@ -87,57 +89,60 @@ public async Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow)
pointer.StartTime = _datetimeProvider.Now.ToUniversalTime();
}

_logger.LogDebug("Starting step {0} on workflow {1}", step.Name, workflow.Id);
using (var scope = _scopeProvider.CreateScope())
{
_logger.LogDebug("Starting step {0} on workflow {1}", step.Name, workflow.Id);

IStepBody body = step.ConstructBody(_serviceProvider);
IStepBody body = step.ConstructBody(scope.ServiceProvider);

if (body == null)
{
_logger.LogError("Unable to construct step body {0}", step.BodyType.ToString());
pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(_options.ErrorRetryInterval);
wfResult.Errors.Add(new ExecutionError()
if (body == null)
{
WorkflowId = workflow.Id,
ExecutionPointerId = pointer.Id,
ErrorTime = _datetimeProvider.Now.ToUniversalTime(),
Message = String.Format("Unable to construct step body {0}", step.BodyType.ToString())
});
continue;
}
_logger.LogError("Unable to construct step body {0}", step.BodyType.ToString());
pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(_options.ErrorRetryInterval);
wfResult.Errors.Add(new ExecutionError()
{
WorkflowId = workflow.Id,
ExecutionPointerId = pointer.Id,
ErrorTime = _datetimeProvider.Now.ToUniversalTime(),
Message = String.Format("Unable to construct step body {0}", step.BodyType.ToString())
});
continue;
}

IStepExecutionContext context = new StepExecutionContext()
{
Workflow = workflow,
Step = step,
PersistenceData = pointer.PersistenceData,
ExecutionPointer = pointer,
Item = pointer.ContextItem
};
IStepExecutionContext context = new StepExecutionContext()
{
Workflow = workflow,
Step = step,
PersistenceData = pointer.PersistenceData,
ExecutionPointer = pointer,
Item = pointer.ContextItem
};

foreach (var input in step.Inputs)
input.AssignInput(workflow.Data, body, context);
foreach (var input in step.Inputs)
input.AssignInput(workflow.Data, body, context);


switch (step.BeforeExecute(wfResult, context, pointer, body))
{
case ExecutionPipelineDirective.Defer:
continue;
case ExecutionPipelineDirective.EndWorkflow:
workflow.Status = WorkflowStatus.Complete;
workflow.CompleteTime = _datetimeProvider.Now.ToUniversalTime();
continue;
}
switch (step.BeforeExecute(wfResult, context, pointer, body))
{
case ExecutionPipelineDirective.Defer:
continue;
case ExecutionPipelineDirective.EndWorkflow:
workflow.Status = WorkflowStatus.Complete;
workflow.CompleteTime = _datetimeProvider.Now.ToUniversalTime();
continue;
}

var result = await body.RunAsync(context);
var result = await body.RunAsync(context);

if (result.Proceed)
{
foreach (var output in step.Outputs)
output.AssignOutput(workflow.Data, body, context);
}
if (result.Proceed)
{
foreach (var output in step.Outputs)
output.AssignOutput(workflow.Data, body, context);
}

_executionResultProcessor.ProcessExecutionResult(workflow, def, pointer, step, result, wfResult);
step.AfterExecute(wfResult, context, result, pointer);
_executionResultProcessor.ProcessExecutionResult(workflow, def, pointer, step, result, wfResult);
step.AfterExecute(wfResult, context, result, pointer);
}
}
catch (Exception ex)
{
Expand Down
Loading

0 comments on commit e183903

Please sign in to comment.