Skip to content

Commit

Permalink
Add data processing sample (Sample25)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfmskywalker committed Oct 16, 2020
1 parent 00e6df0 commit 4705c9b
Show file tree
Hide file tree
Showing 10 changed files with 1,395 additions and 1 deletion.
7 changes: 7 additions & 0 deletions Samples.sln
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample23", "src\samples\Sam
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample24", "src\samples\Sample24\Sample24.csproj", "{C7B1C36D-7297-4AA2-8AFE-31FC5CB99AA0}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample25", "src\samples\Sample25\Sample25.csproj", "{ADDA6EB3-32F0-4802-B595-B12090DC4A1C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -313,6 +315,10 @@ Global
{C7B1C36D-7297-4AA2-8AFE-31FC5CB99AA0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C7B1C36D-7297-4AA2-8AFE-31FC5CB99AA0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C7B1C36D-7297-4AA2-8AFE-31FC5CB99AA0}.Release|Any CPU.Build.0 = Release|Any CPU
{ADDA6EB3-32F0-4802-B595-B12090DC4A1C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{ADDA6EB3-32F0-4802-B595-B12090DC4A1C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{ADDA6EB3-32F0-4802-B595-B12090DC4A1C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{ADDA6EB3-32F0-4802-B595-B12090DC4A1C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -370,6 +376,7 @@ Global
{BE634D4E-73A1-48CE-8682-F548CA0BA5B6} = {5E5E1E84-DDBC-40D6-B891-0D563A15A44A}
{5F16E2EC-7162-47F8-B8F1-81CACB05A5D9} = {5E5E1E84-DDBC-40D6-B891-0D563A15A44A}
{C7B1C36D-7297-4AA2-8AFE-31FC5CB99AA0} = {5E5E1E84-DDBC-40D6-B891-0D563A15A44A}
{ADDA6EB3-32F0-4802-B595-B12090DC4A1C} = {5E5E1E84-DDBC-40D6-B891-0D563A15A44A}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {8B0975FD-7050-48B0-88C5-48C33378E158}
Expand Down
3 changes: 2 additions & 1 deletion src/samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ This list of sample implementations showcase a variety of workflows using ELSA.
21. [Shopping Cart: Mass Transit & Quartz Scheduling](Sample21) - Example of a workflow that tracks a shopping cart. Uses Mass Transit and Quartz scheduling.
22. [Hellow World: SQL Server](Sample22) - A simple demonstration of using Entity Framework Core persistence providers with SQL Server.
23. [Hello World Custom Schema: Sqllite](Sample23) - A simple demonstration of using Entity Framework Core persistence providers with SqlLite and Custom Schema.
24. [Hello World: MySql with migration](Sample24) - A simple demonstration of using Entity Framework Core MySql persistence provider. A migration is also demonstrated.
24. [Hello World: MySql with migration](Sample24) - A simple demonstration of using Entity Framework Core MySql persistence provider. A migration is also demonstrated.
24. [Data processing](Sample25) - A simple demonstration of flowing data through a pipeline of processing activities (https://github.com/elsa-workflows/elsa-core/issues/405).
28 changes: 28 additions & 0 deletions src/samples/Sample25/Activities/Absolute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Elsa.Expressions;
using Elsa.Results;
using Elsa.Services;
using Elsa.Services.Models;

namespace Sample25.Activities
{
public class Absolute : Activity
{
public WorkflowExpression<double> ValueExpression
{
get => GetState<WorkflowExpression<double>>();
set => SetState(value);
}

protected override async Task<ActivityExecutionResult> OnExecuteAsync(WorkflowExecutionContext context, CancellationToken cancellationToken)
{
var value = await context.EvaluateAsync(ValueExpression, cancellationToken);
var result = Math.Abs(value);

Output.SetVariable("Result", result);
return Done();
}
}
}
44 changes: 44 additions & 0 deletions src/samples/Sample25/Activities/ArithmeticOperation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Elsa;
using Elsa.Activities.ControlFlow.Activities;
using Elsa.Expressions;
using Elsa.Results;
using Elsa.Services.Models;

namespace Sample25.Activities
{
public abstract class ArithmeticOperation : Join
{
public ArithmeticOperation()
{
Mode = JoinMode.WaitAll;
}

public WorkflowExpression<double[]> Values
{
get => GetState<WorkflowExpression<double[]>>();
set => SetState(value);
}

protected override async Task<ActivityExecutionResult> OnExecuteAsync(WorkflowExecutionContext context, CancellationToken cancellationToken)
{
var activityExecutionResult = await base.OnExecuteAsync(context, cancellationToken);

if (IsCompleted(activityExecutionResult))
{
var values = await context.EvaluateAsync(Values, cancellationToken);
var result = Calculate(values);

Output.SetVariable("Result", result);
}

return activityExecutionResult;
}

protected abstract double Calculate(params double[] values);

private bool IsCompleted(IActivityExecutionResult result) => result is OutcomeResult outcome && outcome.EndpointNames.Any(x => x == OutcomeNames.Done);
}
}
37 changes: 37 additions & 0 deletions src/samples/Sample25/Activities/Channel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using Elsa.Results;
using Elsa.Services;
using Elsa.Services.Models;

namespace Sample25.Activities
{
/// <summary>
/// Produces a single value.
/// </summary>
public class Channel : Activity
{
/// <summary>
/// The ID of the sensor to observe.
/// </summary>
public string SensorId
{
get => GetState<string>();
set => SetState(value);
}

// Execute only if we received data from the sensor being observed.
protected override bool OnCanExecute(WorkflowExecutionContext context) => context.Workflow.Input.ContainsKey(SensorId);

// Halt workflow execution until sensor data is received.
protected override ActivityExecutionResult OnExecute(WorkflowExecutionContext context) => Halt();

protected override ActivityExecutionResult OnResume(WorkflowExecutionContext context)
{
// Read sensor output provided as workflow input.
var value = context.Workflow.Input.GetVariable<double>(SensorId);

// Set the value as an output of this activity.
Output.SetVariable("Value", value);
return Done();
}
}
}
12 changes: 12 additions & 0 deletions src/samples/Sample25/Activities/Subtract.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System.Linq;

namespace Sample25.Activities
{
/// <summary>
/// Subtracts two incoming inputs
/// </summary>
public class Subtract : ArithmeticOperation
{
protected override double Calculate(params double[] values) => values.Aggregate((left, right) => left - right);
}
}
57 changes: 57 additions & 0 deletions src/samples/Sample25/DataProcessingWorkflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using Elsa;
using Elsa.Activities.Console.Activities;
using Elsa.Activities.ControlFlow.Activities;
using Elsa.Expressions;
using Elsa.Scripting.JavaScript;
using Elsa.Services;
using Elsa.Services.Models;
using Sample25.Activities;

namespace Sample25
{
public class DataProcessingWorkflow : IWorkflow
{
public void Build(IWorkflowBuilder builder)
{
builder
.StartWith<WriteLine>(x => x.TextExpression = new LiteralExpression("Waiting for sensor input."))

// Fork execution into two branches to wait for external stimuli from the two channels in parallel.
.Then<Fork>(
fork => fork.Branches = new[] { "Channel 1", "Channel 2" },
fork =>
{
fork
.When("Channel 1")
.Then<Channel>(x => x.SensorId = "Sensor1").WithName("Channel1")
.Then("Subtract"); // Connect to Subtract activity.

fork
.When("Channel 2")
.Then<Channel>(x => x.SensorId = "Sensor2").WithName("Channel2")
.Then("Subtract"); // Connect to Subtract activity.
})

// Subtract the specified values.
.Then<Subtract>(x => x.Values = new JavaScriptExpression<double[]>("[Channel1.Value, Channel2.Value]")).WithName("Subtract")

// Calculate the absolute value of the subtraction.
.Then<Absolute>(x => x.ValueExpression = new JavaScriptExpression<double>("(Subtract.Result)")).WithName("Absolute")

// Compare the absolute value against a constant threshold, and write the appropriate output.
.Then<IfElse>(
x => x.ConditionExpression = new JavaScriptExpression<bool>("(Absolute.Result) > 0.5"),
ifElse =>
{
ifElse
.When(OutcomeNames.False)
.Then<WriteLine>(x => x.TextExpression = new LiteralExpression("Data does not exceed threshold (FALSE)"));

ifElse
.When(OutcomeNames.True)
.Then<WriteLine>(x => x.TextExpression = new LiteralExpression("Data exceeds threshold (TRUE)"));
})
.Then<WriteLine>(x => x.TextExpression = new JavaScriptExpression<string>("(`Finished data processing. Result: ${Absolute.Result}`)"));
}
}
}
Loading

0 comments on commit 4705c9b

Please sign in to comment.