Skip to content

Commit

Permalink
Merge pull request #55 from danielgerlag/interface-speration
Browse files Browse the repository at this point in the history
Interface separation
  • Loading branch information
danielgerlag authored Sep 9, 2017
2 parents bd80523 + 768ae3f commit 3a9d7fe
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 171 deletions.
17 changes: 17 additions & 0 deletions ReleaseNotes/1.3.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Workflow Core 1.3.2

* Added `WorkflowController` service

Use the `WorkflowController` service to control workflows without having to run an exection node.

```c#
var controller = serviceProvider.GetService<IWorkflowController>();
```

## Exposed methods
* StartWorkflow
* PublishEvent
* RegisterWorkflow
* SuspendWorkflow
* ResumeWorkflow
* TerminateWorkflow
41 changes: 41 additions & 0 deletions src/WorkflowCore/Interface/IWorkflowController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace WorkflowCore.Interface
{
public interface IWorkflowController
{
Task<string> StartWorkflow(string workflowId, object data = null);
Task<string> StartWorkflow(string workflowId, int? version, object data = null);
Task<string> StartWorkflow<TData>(string workflowId, TData data = null) where TData : class;
Task<string> StartWorkflow<TData>(string workflowId, int? version, TData data = null) where TData : class;

Task PublishEvent(string eventName, string eventKey, object eventData, DateTime? effectiveDate = null);
void RegisterWorkflow<TWorkflow>() where TWorkflow : IWorkflow, new();
void RegisterWorkflow<TWorkflow, TData>() where TWorkflow : IWorkflow<TData>, new() where TData : new();

/// <summary>
/// Suspend the execution of a given workflow until .ResumeWorkflow is called
/// </summary>
/// <param name="workflowId"></param>
/// <returns></returns>
Task<bool> SuspendWorkflow(string workflowId);

/// <summary>
/// Resume a previously suspended workflow
/// </summary>
/// <param name="workflowId"></param>
/// <returns></returns>
Task<bool> ResumeWorkflow(string workflowId);

/// <summary>
/// Permanently terminate the exeuction of a given workflow
/// </summary>
/// <param name="workflowId"></param>
/// <returns></returns>
Task<bool> TerminateWorkflow(string workflowId);

}
}
34 changes: 3 additions & 31 deletions src/WorkflowCore/Interface/IWorkflowHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace WorkflowCore.Interface
{
public interface IWorkflowHost
public interface IWorkflowHost : IWorkflowController
{
/// <summary>
/// Start the workflow host, this enable execution of workflows
Expand All @@ -16,36 +16,8 @@ public interface IWorkflowHost
/// Stop the workflow host
/// </summary>
void Stop();
Task<string> StartWorkflow(string workflowId, object data = null);
Task<string> StartWorkflow(string workflowId, int? version, object data = null);
Task<string> StartWorkflow<TData>(string workflowId, TData data = null) where TData : class;
Task<string> StartWorkflow<TData>(string workflowId, int? version, TData data = null) where TData : class;

Task PublishEvent(string eventName, string eventKey, object eventData, DateTime? effectiveDate = null);
void RegisterWorkflow<TWorkflow>() where TWorkflow : IWorkflow, new();
void RegisterWorkflow<TWorkflow, TData>() where TWorkflow : IWorkflow<TData>, new() where TData : new();

/// <summary>
/// Suspend the execution of a given workflow until .ResumeWorkflow is called
/// </summary>
/// <param name="workflowId"></param>
/// <returns></returns>
Task<bool> SuspendWorkflow(string workflowId);

/// <summary>
/// Resume a previously suspended workflow
/// </summary>
/// <param name="workflowId"></param>
/// <returns></returns>
Task<bool> ResumeWorkflow(string workflowId);

/// <summary>
/// Permanently terminate the exeuction of a given workflow
/// </summary>
/// <param name="workflowId"></param>
/// <returns></returns>
Task<bool> TerminateWorkflow(string workflowId);



event StepErrorEventHandler OnStepError;
void ReportStepError(WorkflowInstance workflow, WorkflowStep step, Exception exception);

Expand Down
1 change: 1 addition & 0 deletions src/WorkflowCore/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public static void AddWorkflow(this IServiceCollection services, Action<Workflow
services.AddTransient<IBackgroundTask, EventConsumer>();
services.AddTransient<IBackgroundTask, RunnablePoller>();

services.AddSingleton<IWorkflowController, WorkflowController>();
services.AddSingleton<IWorkflowHost, WorkflowHost>();
services.AddTransient<IWorkflowExecutor, WorkflowExecutor>();
services.AddTransient<IWorkflowBuilder, WorkflowBuilder>();
Expand Down
194 changes: 194 additions & 0 deletions src/WorkflowCore/Services/WorkflowController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
using System;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using WorkflowCore.Exceptions;
using WorkflowCore.Interface;
using WorkflowCore.Models;

namespace WorkflowCore.Services
{
public class WorkflowController : IWorkflowController
{
private readonly IPersistenceProvider _persistenceStore;
private readonly IDistributedLockProvider _lockProvider;
private readonly IWorkflowRegistry _registry;
private readonly IQueueProvider _queueProvider;
private readonly ILogger _logger;


public WorkflowController(IPersistenceProvider persistenceStore, IDistributedLockProvider lockProvider, IWorkflowRegistry registry, IQueueProvider queueProvider, ILoggerFactory loggerFactory)
{
_persistenceStore = persistenceStore;
_lockProvider = lockProvider;
_registry = registry;
_queueProvider = queueProvider;
_logger = loggerFactory.CreateLogger<WorkflowController>();
}

public Task<string> StartWorkflow(string workflowId, object data = null)
{
return StartWorkflow(workflowId, null, data);
}

public Task<string> StartWorkflow(string workflowId, int? version, object data = null)
{
return StartWorkflow<object>(workflowId, version, data);
}

public Task<string> StartWorkflow<TData>(string workflowId, TData data = null)
where TData : class
{
return StartWorkflow<TData>(workflowId, null, data);
}

public async Task<string> StartWorkflow<TData>(string workflowId, int? version, TData data = null)
where TData : class
{

var def = _registry.GetDefinition(workflowId, version);
if (def == null)
{
throw new WorkflowNotRegisteredException(workflowId, version);
}

var wf = new WorkflowInstance
{
WorkflowDefinitionId = workflowId,
Version = def.Version,
Data = data,
Description = def.Description,
NextExecution = 0,
CreateTime = DateTime.Now.ToUniversalTime(),
Status = WorkflowStatus.Runnable
};

if ((def.DataType != null) && (data == null))
{
wf.Data = TypeExtensions.GetConstructor(def.DataType, new Type[] { }).Invoke(null);
}

wf.ExecutionPointers.Add(new ExecutionPointer
{
Id = Guid.NewGuid().ToString(),
StepId = 0,
Active = true,
StepName = Enumerable.First<WorkflowStep>(def.Steps, x => x.Id == 0).Name
});

string id = await _persistenceStore.CreateNewWorkflow(wf);
await _queueProvider.QueueWork(id, QueueType.Workflow);
return id;
}

public async Task PublishEvent(string eventName, string eventKey, object eventData, DateTime? effectiveDate = null)
{
_logger.LogDebug("Creating event {0} {1}", eventName, eventKey);
Event evt = new Event();

if (effectiveDate.HasValue)
evt.EventTime = effectiveDate.Value.ToUniversalTime();
else
evt.EventTime = DateTime.Now.ToUniversalTime();

evt.EventData = eventData;
evt.EventKey = eventKey;
evt.EventName = eventName;
evt.IsProcessed = false;
string eventId = await _persistenceStore.CreateEvent(evt);

await _queueProvider.QueueWork(eventId, QueueType.Event);
}

public async Task<bool> SuspendWorkflow(string workflowId)
{
if (!await _lockProvider.AcquireLock(workflowId, new CancellationToken()))
return false;

try
{
var wf = await _persistenceStore.GetWorkflowInstance(workflowId);
if (wf.Status == WorkflowStatus.Runnable)
{
wf.Status = WorkflowStatus.Suspended;
await _persistenceStore.PersistWorkflow(wf);
return true;
}

return false;
}
finally
{
await _lockProvider.ReleaseLock(workflowId);
}
}

public async Task<bool> ResumeWorkflow(string workflowId)
{
if (!await _lockProvider.AcquireLock(workflowId, new CancellationToken()))
{
return false;
}

bool requeue = false;
try
{
var wf = await _persistenceStore.GetWorkflowInstance(workflowId);
if (wf.Status == WorkflowStatus.Suspended)
{
wf.Status = WorkflowStatus.Runnable;
await _persistenceStore.PersistWorkflow(wf);
requeue = true;
return true;
}

return false;
}
finally
{
await _lockProvider.ReleaseLock(workflowId);
if (requeue)
await _queueProvider.QueueWork(workflowId, QueueType.Workflow);
}

return false;
}

public async Task<bool> TerminateWorkflow(string workflowId)
{
if (!await _lockProvider.AcquireLock(workflowId, new CancellationToken()))
{
return false;
}

try
{
var wf = await _persistenceStore.GetWorkflowInstance(workflowId);
wf.Status = WorkflowStatus.Terminated;
await _persistenceStore.PersistWorkflow(wf);
return true;
}
finally
{
await _lockProvider.ReleaseLock(workflowId);
}
}

public void RegisterWorkflow<TWorkflow>()
where TWorkflow : IWorkflow, new()
{
TWorkflow wf = new TWorkflow();
_registry.RegisterWorkflow(wf);
}

public void RegisterWorkflow<TWorkflow, TData>()
where TWorkflow : IWorkflow<TData>, new()
where TData : new()
{
TWorkflow wf = new TWorkflow();
_registry.RegisterWorkflow<TData>(wf);
}
}
}
Loading

0 comments on commit 3a9d7fe

Please sign in to comment.