Skip to content

Commit

Permalink
Misc improvements (#269)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgerlag authored Feb 24, 2019
1 parent 791287d commit b4cba8d
Show file tree
Hide file tree
Showing 29 changed files with 303 additions and 162 deletions.
2 changes: 1 addition & 1 deletion src/WorkflowCore/Interface/IQueueProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ public interface IQueueProvider : IDisposable
Task Stop();
}

public enum QueueType { Workflow = 0, Event = 1 }
public enum QueueType { Workflow = 0, Event = 1, Index = 2 }
}
14 changes: 14 additions & 0 deletions src/WorkflowCore/Interface/IStepBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ public interface IStepBuilder<TData, TStepBody>
/// <returns></returns>
IStepBuilder<TData, TStepBody> Name(string name);

/// <summary>
/// Specifies a custom Id to reference this step
/// </summary>
/// <param name="id">A custom Id to reference this step</param>
/// <returns></returns>
IStepBuilder<TData, TStepBody> Id(string id);

/// <summary>
/// Specify the next step in the workflow
/// </summary>
Expand Down Expand Up @@ -51,6 +58,13 @@ public interface IStepBuilder<TData, TStepBody>
/// <returns></returns>
IStepBuilder<TData, ActionStepBody> Then(Action<IStepExecutionContext> body);

/// <summary>
/// Specify the next step in the workflow by Id
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
IStepBuilder<TData, TStepBody> Attach(string id);

/// <summary>
/// Configure an outcome for this step, then wire it to another step
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/WorkflowCore/Models/StepOutcome.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public Expression<Func<object, object>> Value

public string Label { get; set; }

public string Tag { get; set; }
public string ExternalNextStepId { get; set; }

public object GetValue(object data)
{
Expand Down
2 changes: 1 addition & 1 deletion src/WorkflowCore/Models/WorkflowDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class WorkflowDefinition

public string Description { get; set; }

public List<WorkflowStep> Steps { get; set; }
public WorkflowStepCollection Steps { get; set; } = new WorkflowStepCollection();

public Type DataType { get; set; }

Expand Down
2 changes: 1 addition & 1 deletion src/WorkflowCore/Models/WorkflowStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public abstract class WorkflowStep

public virtual string Name { get; set; }

public virtual string Tag { get; set; }
public virtual string ExternalId { get; set; }

public virtual List<int> Children { get; set; } = new List<int>();

Expand Down
81 changes: 81 additions & 0 deletions src/WorkflowCore/Models/WorkflowStepCollection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace WorkflowCore.Models
{
public class WorkflowStepCollection : ICollection<WorkflowStep>
{
private readonly Dictionary<int, WorkflowStep> _dictionary = new Dictionary<int, WorkflowStep>();

public WorkflowStepCollection()
{
}

public WorkflowStepCollection(int capacity)
{
_dictionary = new Dictionary<int, WorkflowStep>(capacity);
}

public WorkflowStepCollection(ICollection<WorkflowStep> steps)
{
foreach (var step in steps)
{
Add(step);
}
}

public IEnumerator<WorkflowStep> GetEnumerator()
{
return _dictionary.Values.GetEnumerator();
}

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

public WorkflowStep FindById(int id)
{
if (!_dictionary.ContainsKey(id))
return null;

return _dictionary[id];
}

public void Add(WorkflowStep item)
{
_dictionary.Add(item.Id, item);
}

public void Clear()
{
_dictionary.Clear();
}

public bool Contains(WorkflowStep item)
{
return _dictionary.ContainsValue(item);
}

public void CopyTo(WorkflowStep[] array, int arrayIndex)
{
_dictionary.Values.CopyTo(array, arrayIndex);
}

public bool Remove(WorkflowStep item)
{
return _dictionary.Remove(item.Id);
}

public WorkflowStep Find(Predicate<WorkflowStep> match)
{
return _dictionary.Values.FirstOrDefault(x => match(x));
}

public int Count => _dictionary.Count;
public bool IsReadOnly => false;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using WorkflowCore.Interface;
Expand All @@ -12,48 +13,42 @@ namespace WorkflowCore.Services
/// </summary>
public class SingleNodeQueueProvider : IQueueProvider
{

private readonly BlockingCollection<string> _runQueue = new BlockingCollection<string>();
private readonly BlockingCollection<string> _eventQueue = new BlockingCollection<string>();

private readonly Dictionary<QueueType, BlockingCollection<string>> _queues = new Dictionary<QueueType, BlockingCollection<string>>()
{
[QueueType.Workflow] = new BlockingCollection<string>(),
[QueueType.Event] = new BlockingCollection<string>(),
[QueueType.Index] = new BlockingCollection<string>()
};

public bool IsDequeueBlocking => true;

public async Task QueueWork(string id, QueueType queue)
{
SelectQueue(queue).Add(id);
_queues[queue].Add(id);
}

public async Task<string> DequeueWork(QueueType queue, CancellationToken cancellationToken)
{
if (SelectQueue(queue).TryTake(out string id, 100, cancellationToken))
if (_queues[queue].TryTake(out string id, 100, cancellationToken))
return id;

return null;
}

public async Task Start()
public Task Start()
{
return Task.CompletedTask;
}

public async Task Stop()
public Task Stop()
{
return Task.CompletedTask;
}

public void Dispose()
{
}

private BlockingCollection<string> SelectQueue(QueueType queue)
{
switch (queue)
{
case QueueType.Workflow:
return _runQueue;
case QueueType.Event:
return _eventQueue;
}
return null;
}

}

Expand Down
26 changes: 13 additions & 13 deletions src/WorkflowCore/Services/DefinitionStorage/DefinitionLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ private WorkflowDefinition Convert(DefinitionSourceV1 source)
}


private List<WorkflowStep> ConvertSteps(ICollection<StepSourceV1> source, Type dataType)
private WorkflowStepCollection ConvertSteps(ICollection<StepSourceV1> source, Type dataType)
{
var result = new List<WorkflowStep>();
var result = new WorkflowStepCollection();
int i = 0;
var stack = new Stack<StepSourceV1>(source.Reverse<StepSourceV1>());
var parents = new List<StepSourceV1>();
Expand Down Expand Up @@ -87,7 +87,7 @@ private List<WorkflowStep> ConvertSteps(ICollection<StepSourceV1> source, Type d
targetStep.Name = nextStep.Name;
targetStep.ErrorBehavior = nextStep.ErrorBehavior;
targetStep.RetryInterval = nextStep.RetryInterval;
targetStep.Tag = $"{nextStep.Id}";
targetStep.ExternalId = $"{nextStep.Id}";

AttachInputs(nextStep, dataType, stepType, targetStep);
AttachOutputs(nextStep, dataType, stepType, targetStep);
Expand All @@ -114,7 +114,7 @@ private List<WorkflowStep> ConvertSteps(ICollection<StepSourceV1> source, Type d
}

if (!string.IsNullOrEmpty(nextStep.NextStepId))
targetStep.Outcomes.Add(new StepOutcome() { Tag = $"{nextStep.NextStepId}" });
targetStep.Outcomes.Add(new StepOutcome() { ExternalNextStepId = $"{nextStep.NextStepId}" });

result.Add(targetStep);

Expand All @@ -123,26 +123,26 @@ private List<WorkflowStep> ConvertSteps(ICollection<StepSourceV1> source, Type d

foreach (var step in result)
{
if (result.Any(x => x.Tag == step.Tag && x.Id != step.Id))
throw new WorkflowDefinitionLoadException($"Duplicate step Id {step.Tag}");
if (result.Any(x => x.ExternalId == step.ExternalId && x.Id != step.Id))
throw new WorkflowDefinitionLoadException($"Duplicate step Id {step.ExternalId}");

foreach (var outcome in step.Outcomes)
{
if (result.All(x => x.Tag != outcome.Tag))
throw new WorkflowDefinitionLoadException($"Cannot find step id {outcome.Tag}");
if (result.All(x => x.ExternalId != outcome.ExternalNextStepId))
throw new WorkflowDefinitionLoadException($"Cannot find step id {outcome.ExternalNextStepId}");

outcome.NextStep = result.Single(x => x.Tag == outcome.Tag).Id;
outcome.NextStep = result.Single(x => x.ExternalId == outcome.ExternalNextStepId).Id;
}
}

foreach (var parent in parents)
{
var target = result.Single(x => x.Tag == parent.Id);
var target = result.Single(x => x.ExternalId == parent.Id);
foreach (var branch in parent.Do)
{
var childTags = branch.Select(x => x.Id).ToList();
target.Children.AddRange(result
.Where(x => childTags.Contains(x.Tag))
.Where(x => childTags.Contains(x.ExternalId))
.OrderBy(x => x.Id)
.Select(x => x.Id)
.Take(1)
Expand All @@ -152,11 +152,11 @@ private List<WorkflowStep> ConvertSteps(ICollection<StepSourceV1> source, Type d

foreach (var item in compensatables)
{
var target = result.Single(x => x.Tag == item.Id);
var target = result.Single(x => x.ExternalId == item.Id);
var tag = item.CompensateWith.Select(x => x.Id).FirstOrDefault();
if (tag != null)
{
var compStep = result.FirstOrDefault(x => x.Tag == tag);
var compStep = result.FirstOrDefault(x => x.ExternalId == tag);
if (compStep != null)
target.CompensationStepId = compStep.Id;
}
Expand Down
6 changes: 3 additions & 3 deletions src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP
{
var pointerId = scope.Pop();
var scopePointer = workflow.ExecutionPointers.FindById(pointerId);
var scopeStep = def.Steps.First(x => x.Id == scopePointer.StepId);
var scopeStep = def.Steps.FindById(scopePointer.StepId);

var resume = true;
var revert = false;
Expand All @@ -44,7 +44,7 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP
{
var parentId = txnStack.Pop();
var parentPointer = workflow.ExecutionPointers.FindById(parentId);
var parentStep = def.Steps.First(x => x.Id == parentPointer.StepId);
var parentStep = def.Steps.FindById(parentPointer.StepId);
if ((!parentStep.ResumeChildrenAfterCompensation) || (parentStep.RevertChildrenAfterCompensation))
{
resume = parentStep.ResumeChildrenAfterCompensation;
Expand Down Expand Up @@ -86,7 +86,7 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP

foreach (var siblingPointer in prevSiblings)
{
var siblingStep = def.Steps.First(x => x.Id == siblingPointer.StepId);
var siblingStep = def.Steps.FindById(siblingPointer.StepId);
if (siblingStep.CompensationStepId.HasValue)
{
var compensationPointer = _pointerFactory.BuildCompensationPointer(def, siblingPointer, exceptionPointer, siblingStep.CompensationStepId.Value);
Expand Down
8 changes: 4 additions & 4 deletions src/WorkflowCore/Services/ExecutionPointerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public ExecutionPointer BuildGenesisPointer(WorkflowDefinition def)
StepId = 0,
Active = true,
Status = PointerStatus.Pending,
StepName = Enumerable.First<WorkflowStep>(def.Steps, x => x.Id == 0).Name
StepName = def.Steps.FindById(0).Name
};
}

Expand All @@ -32,7 +32,7 @@ public ExecutionPointer BuildNextPointer(WorkflowDefinition def, ExecutionPointe
Active = true,
ContextItem = pointer.ContextItem,
Status = PointerStatus.Pending,
StepName = def.Steps.First(x => x.Id == outcomeTarget.NextStep).Name,
StepName = def.Steps.FindById(outcomeTarget.NextStep).Name,
Scope = new List<string>(pointer.Scope)
};
}
Expand All @@ -52,7 +52,7 @@ public ExecutionPointer BuildChildPointer(WorkflowDefinition def, ExecutionPoint
Active = true,
ContextItem = branch,
Status = PointerStatus.Pending,
StepName = def.Steps.First(x => x.Id == childDefinitionId).Name,
StepName = def.Steps.FindById(childDefinitionId).Name,
Scope = new List<string>(childScope)
};
}
Expand All @@ -68,7 +68,7 @@ public ExecutionPointer BuildCompensationPointer(WorkflowDefinition def, Executi
Active = true,
ContextItem = pointer.ContextItem,
Status = PointerStatus.Pending,
StepName = def.Steps.First(x => x.Id == compensationStepId).Name,
StepName = def.Steps.FindById(compensationStepId).Name,
Scope = new List<string>(pointer.Scope)
};
}
Expand Down
4 changes: 2 additions & 2 deletions src/WorkflowCore/Services/ExecutionResultProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void HandleStepException(WorkflowInstance workflow, WorkflowDefinition de
while (queue.Count > 0)
{
var exceptionPointer = queue.Dequeue();
var exceptionStep = def.Steps.Find(x => x.Id == exceptionPointer.StepId);
var exceptionStep = def.Steps.FindById(exceptionPointer.StepId);
var compensatingStepId = FindScopeCompensationStepId(workflow, def, exceptionPointer);
var errorOption = (exceptionStep.ErrorBehavior ?? (compensatingStepId.HasValue ? WorkflowErrorHandling.Compensate : def.DefaultErrorBehavior));

Expand All @@ -129,7 +129,7 @@ public void HandleStepException(WorkflowInstance workflow, WorkflowDefinition de
{
var pointerId = scope.Pop();
var pointer = workflow.ExecutionPointers.FindById(pointerId);
var step = def.Steps.First(x => x.Id == pointer.StepId);
var step = def.Steps.FindById(pointer.StepId);
if (step.CompensationStepId.HasValue)
return step.CompensationStepId.Value;
}
Expand Down
Loading

0 comments on commit b4cba8d

Please sign in to comment.