A small framework to define long running (persistent)processes within Akka.
If you're using SBT, add the following lines to your build file:
resolvers += "processFramework at bintray" at "https://dl.bintray.com/jgordijn/maven/"
// Latest working with akka 2.3.x
libraryDependencies += "processframework" %% "process" % "0.1.16"
// akka 2.4.x
libraryDependencies += "processframework" %% "process" % "0.1.17"
// Change in organization name since 0.1.23
libraryDependencies += "com.github.jgordijn" %% "process" % "0.1.23"
For Maven and other build tools, you can visit search.maven.org
If you tried to write a long running (persistent) process in Akka, you will find out that this will result in a lot of messages handling in a single actor. It is not trivial to see what the process flow is and if there are parallel parts. Primary reason is that different steps are implemented in a single class. This library tries to make life easier by doing 2 things:
- Create a separate class for each step in the process flow
- Create a process class that describes how the steps are linked together
A process consists of different steps that are performed one after the other or in parallel. With Process you define every step in its own class.
case class DemoState(demoed: Boolean)
class DemoStep(demoService: ActorRef)(implicit val context: ActorContext)
extends ProcessStep[DemoState] {
// Code to execute when the step should perform it's task
def execute()(implicit stepActor: ActorRef): Execution = state => {
demoService ! Command(state.demoed)
}
// This catches the responses from the async execute action.
// It emits Process.Event to the process.
def receiveCommand: CommandToEvent = {
case ReplyFromDemoService =>
Demoed
}
// The updateState function handles events and changes the state. It
// should mark the step as done when the event is the last event for
// this step.
def updateState: UpdateFunction = {
case Demoed => { state =>
markDone()
state.copy(demoed = true)
}
}
}
The main goal of the process is to create a flow of steps. It is possible to chain different steps, so that they are performed in sequence. It is also possible to parallelize steps.
class DemoProcess(demoService: ActorRef) extends Process[DemoState] {
// implicit ExecutionContext is needed
import context.dispatcher
var state = DemoState(false)
val step1 = new DemoStep(demoService)
// Subflows can be created
val subflow1 = subStep1 ~> subStep2
val subflow2 = subStepX ~> subStepY
// process defines the complete process
val process = step1 ~> step2 ~> Par(subflow1, subflow2) ~> step3
def receiveCommand = {
case CommandToStartProcess =>
process.run()
}
}
Long running processes should survive restarts. Therefore it should
persist the events and automatically restart when the process is
created. It is almost as easy as changing Process[S]
to
PersistentProcess[S]
(where S
is the type of the state). The only
difference is that you need to specify a persistenceId
.
class DemoProcess(demoService: ActorRef) extends PersistentProcess[DemoState] {
val persistenceId = "demoProcess"
// implicit ExecutionContext is needed
import context.dispatcher
var state = DemoState(false)
val step1 = new DemoStep(demoService)
// Subflows can be created
val subflow1 = subStep1 ~> subStep2
val subflow2 = subStepX ~> subStepY
// process defines the complete process
val process = step1 ~> step2 ~> Par(subflow1, subflow2) ~> step3
def receiveCommand = {
case CommandToStartProcess =>
process.run()
}
}