Skip to content

Commit

Permalink
Merge pull request #94 from danielgerlag/transaction-comensation-feature
Browse files Browse the repository at this point in the history
Saga Transactions and compensation feature
  • Loading branch information
danielgerlag authored Dec 24, 2017
2 parents 7789d61 + 68d09c3 commit eb00c95
Show file tree
Hide file tree
Showing 67 changed files with 2,151 additions and 248 deletions.
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class MyWorkflow : IWorkflow
}
```

* Resilient service orchestration
* Saga Transactions

```c#
public class MyWorkflow : IWorkflow
Expand All @@ -70,6 +70,21 @@ public class MyWorkflow : IWorkflow
}
```

```c#
builder
.StartWith<LogStart>()
.Saga(saga => saga
.StartWith<Task1>()
.CompensateWith<UndoTask1>()
.Then<Task2>()
.CompensateWith<UndoTask2>()
.Then<Task3>()
.CompensateWith<UndoTask3>()
)
.OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromMinutes(10))
.Then<LogEnd>();
```

## Persistence

Since workflows are typically long running processes, they will need to be persisted to storage between steps.
Expand Down Expand Up @@ -106,6 +121,8 @@ There are several persistence providers available as separate Nuget packages.

* [Parallel Tasks](src/samples/WorkflowCore.Sample13)

* [Saga Transactions (with compensation)](src/samples/WorkflowCore.Sample17)

* [Scheduled Background Tasks](src/samples/WorkflowCore.Sample16)

* [Recurring Background Tasks](src/samples/WorkflowCore.Sample14)
Expand Down
141 changes: 141 additions & 0 deletions ReleaseNotes/1.6.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Workflow Core 1.6.0


* Added Saga transaction feature
* Added `.CompensateWith` feature


#### Specifying compensation steps for each component of a saga transaction

In this sample, if `Task2` throws an exception, then `UndoTask2` and `UndoTask1` will be triggered.

```c#
builder
.StartWith<SayHello>()
.CompensateWith<UndoHello>()
.Saga(saga => saga
.StartWith<DoTask1>()
.CompensateWith<UndoTask1>()
.Then<DoTask2>()
.CompensateWith<UndoTask2>()
.Then<DoTask3>()
.CompensateWith<UndoTask3>()
)
.Then<SayGoodbye>();
```

#### Retrying a failed transaction

This particular example will retry the entire saga every 5 seconds

```c#
builder
.StartWith<SayHello>()
.CompensateWith<UndoHello>()
.Saga(saga => saga
.StartWith<DoTask1>()
.CompensateWith<UndoTask1>()
.Then<DoTask2>()
.CompensateWith<UndoTask2>()
.Then<DoTask3>()
.CompensateWith<UndoTask3>()
)
.OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5))
.Then<SayGoodbye>();
```

#### Compensating the entire transaction

You could also only specify a master compensation step, as follows

```c#
builder
.StartWith<SayHello>()
.CompensateWith<UndoHello>()
.Saga(saga => saga
.StartWith<DoTask1>()
.Then<DoTask2>()
.Then<DoTask3>()
)
.CompensateWithSequence(comp => comp
.StartWith<UndoTask1>()
.Then<UndoTask2>()
.Then<UndoTask3>()
)
.Then<SayGoodbye>();
```

#### Passing parameters

Parameters can be passed to a compensation step as follows

```c#
builder
.StartWith<SayHello>()
.CompensateWith<PrintMessage>(compensate =>
{
compensate.Input(step => step.Message, data => "undoing...");
})
```


### Expressing a saga in JSON

A saga transaction can be expressed in JSON, by using the `WorkflowCore.Primitives.Sequence` step and setting the `Saga` parameter to `true`.

The compensation steps can be defined by specifying the `CompensateWith` parameter.

```json
{
"Id": "Saga-Sample",
"Version": 1,
"DataType": "MyApp.MyDataClass, MyApp",
"Steps": [
{
"Id": "Hello",
"StepType": "MyApp.HelloWorld, MyApp",
"NextStepId": "MySaga"
},
{
"Id": "MySaga",
"StepType": "WorkflowCore.Primitives.Sequence, WorkflowCore",
"NextStepId": "Bye",
"Saga": true,
"Do": [
[
{
"Id": "do1",
"StepType": "MyApp.Task1, MyApp",
"NextStepId": "do2",
"CompensateWith": [
{
"Id": "undo1",
"StepType": "MyApp.UndoTask1, MyApp"
}
]
},
{
"Id": "do2",
"StepType": "MyApp.Task2, MyApp",
"CompensateWith": [
{
"Id": "undo2-1",
"NextStepId": "undo2-2",
"StepType": "MyApp.UndoTask2, MyApp"
},
{
"Id": "undo2-2",
"StepType": "MyApp.DoSomethingElse, MyApp"
}
]
}
]
]
},
{
"Id": "Bye",
"StepType": "MyApp.GoodbyeWorld, MyApp"
}
]
}
```
10 changes: 9 additions & 1 deletion WorkflowCore.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.27004.2008
VisualStudioVersion = 15.0.27130.2010
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{EF47161E-E399-451C-BDE8-E92AAD3BD761}"
EndProject
Expand Down Expand Up @@ -90,6 +90,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ReleaseNotes", "ReleaseNote
ReleaseNotes\1.3.2.md = ReleaseNotes\1.3.2.md
ReleaseNotes\1.3.3.md = ReleaseNotes\1.3.3.md
ReleaseNotes\1.4.0.md = ReleaseNotes\1.4.0.md
ReleaseNotes\1.6.0.md = ReleaseNotes\1.6.0.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample14", "src\samples\WorkflowCore.Sample14\WorkflowCore.Sample14.csproj", "{6BC66637-B42A-4334-ADFB-DBEC9F29D293}"
Expand All @@ -106,6 +107,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample16", "sr
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ScratchPad", "test\ScratchPad\ScratchPad.csproj", "{6396453F-4D0E-4CD4-BC89-87E8970F2A80}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample17", "src\samples\WorkflowCore.Sample17\WorkflowCore.Sample17.csproj", "{42F475BC-95F4-42E1-8CCD-7B9C27487E33}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -268,6 +271,10 @@ Global
{6396453F-4D0E-4CD4-BC89-87E8970F2A80}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6396453F-4D0E-4CD4-BC89-87E8970F2A80}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6396453F-4D0E-4CD4-BC89-87E8970F2A80}.Release|Any CPU.Build.0 = Release|Any CPU
{42F475BC-95F4-42E1-8CCD-7B9C27487E33}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{42F475BC-95F4-42E1-8CCD-7B9C27487E33}.Debug|Any CPU.Build.0 = Debug|Any CPU
{42F475BC-95F4-42E1-8CCD-7B9C27487E33}.Release|Any CPU.ActiveCfg = Release|Any CPU
{42F475BC-95F4-42E1-8CCD-7B9C27487E33}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -315,6 +322,7 @@ Global
{9B7811AC-68D6-4D19-B1E9-65423393ED83} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
{0C9617A9-C8B7-45F6-A54A-261A23AC881B} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
{6396453F-4D0E-4CD4-BC89-87E8970F2A80} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
{42F475BC-95F4-42E1-8CCD-7B9C27487E33} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DC0FA8D3-6449-4FDA-BB46-ECF58FAD23B4}
Expand Down
12 changes: 12 additions & 0 deletions src/WorkflowCore/Interface/IExecutionPointerFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using WorkflowCore.Models;

namespace WorkflowCore.Interface
{
public interface IExecutionPointerFactory
{
ExecutionPointer BuildStartingPointer(WorkflowDefinition def);
ExecutionPointer BuildCompensationPointer(WorkflowDefinition def, ExecutionPointer pointer, ExecutionPointer exceptionPointer, int compensationStepId);
ExecutionPointer BuildNextPointer(WorkflowDefinition def, ExecutionPointer pointer, StepOutcome outcomeTarget);
ExecutionPointer BuildChildPointer(WorkflowDefinition def, ExecutionPointer pointer, int childDefinitionId, object branch);
}
}
11 changes: 11 additions & 0 deletions src/WorkflowCore/Interface/IExecutionResultProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;
using WorkflowCore.Models;

namespace WorkflowCore.Interface
{
public interface IExecutionResultProcessor
{
void HandleStepException(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step);
void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, ExecutionResult result, WorkflowExecutorResult workflowResult);
}
}
2 changes: 1 addition & 1 deletion src/WorkflowCore/Interface/IStepBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ namespace WorkflowCore.Interface
{
public interface IStepBody
{
Task<ExecutionResult> RunAsync(IStepExecutionContext context);
Task<ExecutionResult> RunAsync(IStepExecutionContext context);
}
}
37 changes: 37 additions & 0 deletions src/WorkflowCore/Interface/IStepBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ public interface IStepBuilder<TData, TStepBody>
/// <returns></returns>
IParallelStepBuilder<TData, Sequence> Parallel();

/// <summary>
/// Execute a sequence of steps in a container
/// </summary>
/// <returns></returns>
IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);

/// <summary>
/// Schedule a block of steps to execute in parallel sometime in the future
/// </summary>
Expand All @@ -177,5 +183,36 @@ public interface IStepBuilder<TData, TStepBody>
/// <param name="until">Resolves a condition to stop the recurring task</param>
/// <returns></returns>
IContainerStepBuilder<TData, Recur, TStepBody> Recur(Expression<Func<TData, TimeSpan>> interval, Expression<Func<TData, bool>> until);


/// <summary>
/// Undo step if unhandled exception is thrown by this step
/// </summary>
/// <typeparam name="TStep">The type of the step to execute</typeparam>
/// <param name="stepSetup">Configure additional parameters for this step</param>
/// <returns></returns>
IStepBuilder<TData, TStepBody> CompensateWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;

/// <summary>
/// Undo step if unhandled exception is thrown by this step
/// </summary>
/// <param name="body"></param>
/// <returns></returns>
IStepBuilder<TData, TStepBody> CompensateWith(Func<IStepExecutionContext, ExecutionResult> body);

/// <summary>
/// Undo step if unhandled exception is thrown by this step
/// </summary>
/// <param name="body"></param>
/// <returns></returns>
IStepBuilder<TData, TStepBody> CompensateWith(Action<IStepExecutionContext> body);

/// <summary>
/// Undo step if unhandled exception is thrown by this step
/// </summary>
/// <param name="builder"></param>
/// <returns></returns>
IStepBuilder<TData, TStepBody> CompensateWithSequence(Action<IWorkflowBuilder<TData>> builder);

}
}
2 changes: 1 addition & 1 deletion src/WorkflowCore/Interface/IWorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ namespace WorkflowCore.Interface
{
public interface IWorkflowExecutor
{
Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow, WorkflowOptions options);
Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow);
}
}
4 changes: 4 additions & 0 deletions src/WorkflowCore/Models/DefinitionStorage/v1/StepSourceV1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public class StepSourceV1

public List<List<StepSourceV1>> Do { get; set; } = new List<List<StepSourceV1>>();

public List<StepSourceV1> CompensateWith { get; set; } = new List<StepSourceV1>();

public bool Saga { get; set; } = false;

public string NextStepId { get; set; }

public Dictionary<string, string> Inputs { get; set; } = new Dictionary<string, string>();
Expand Down
16 changes: 16 additions & 0 deletions src/WorkflowCore/Models/ExecutionPointer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,21 @@ public class ExecutionPointer
public string PredecessorId { get; set; }

public object Outcome { get; set; }

public PointerStatus Status { get; set; } = PointerStatus.Legacy;

public Stack<string> Scope { get; set; } = new Stack<string>();
}

public enum PointerStatus
{
Legacy = 0,
Pending = 1,
Running = 2,
Complete = 3,
Sleeping = 4,
WaitingForEvent = 5,
Failed = 6,
Compensated = 7
}
}
2 changes: 1 addition & 1 deletion src/WorkflowCore/Models/StepBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public abstract class StepBody : IStepBody
public Task<ExecutionResult> RunAsync(IStepExecutionContext context)
{
return Task.FromResult(Run(context));
}
}

protected ExecutionResult OutcomeResult(object value)
{
Expand Down
3 changes: 2 additions & 1 deletion src/WorkflowCore/Models/WorkflowDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public enum WorkflowErrorHandling
{
Retry = 0,
Suspend = 1,
Terminate = 2
Terminate = 2,
Compensate = 3
}
}
12 changes: 11 additions & 1 deletion src/WorkflowCore/Models/WorkflowStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ public abstract class WorkflowStep

public WorkflowErrorHandling? ErrorBehavior { get; set; }

public TimeSpan? RetryInterval { get; set; }
public TimeSpan? RetryInterval { get; set; }

public int? CompensationStepId { get; set; }

public virtual bool ResumeChildrenAfterCompensation => true;

public virtual bool RevertChildrenAfterCompensation => false;

public virtual ExecutionPipelineDirective InitForExecution(WorkflowExecutorResult executorResult, WorkflowDefinition defintion, WorkflowInstance workflow, ExecutionPointer executionPointer)
{
Expand All @@ -41,6 +47,10 @@ public virtual void AfterExecute(WorkflowExecutorResult executorResult, IStepExe
{
}

public virtual void PrimeForRetry(ExecutionPointer pointer)
{
}

/// <summary>
/// Called after every workflow execution round,
/// every exectuon pointer with no end time, even if this step was not executed in this round
Expand Down
Loading

0 comments on commit eb00c95

Please sign in to comment.