From c003f7897dfd2577971ea62e45e1ec14602fcfec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 23 Feb 2024 17:22:29 +0100 Subject: [PATCH] Working for H2 and event source, snapshot, durable state --- .../reflect-config.json | 13 +++ .../com/lightbend/DurableStateTester.scala | 86 +++++++++++++++++++ .../main/scala/com/lightbend/EsbTester.scala | 9 +- .../src/main/scala/com/lightbend/Main.scala | 38 +++++--- 4 files changed, 127 insertions(+), 19 deletions(-) create mode 100644 native-image-tests/src/main/scala/com/lightbend/DurableStateTester.scala diff --git a/core/src/main/resources/META-INF/native-image/com.lightbend.akka/akka-persistence-r2dbc/reflect-config.json b/core/src/main/resources/META-INF/native-image/com.lightbend.akka/akka-persistence-r2dbc/reflect-config.json index 65eb2451..6e247736 100644 --- a/core/src/main/resources/META-INF/native-image/com.lightbend.akka/akka-persistence-r2dbc/reflect-config.json +++ b/core/src/main/resources/META-INF/native-image/com.lightbend.akka/akka-persistence-r2dbc/reflect-config.json @@ -35,6 +35,19 @@ ] } ] + }, + { + "name": "akka.persistence.r2dbc.state.R2dbcDurableStateStoreProvider", + "methods": [ + { + "name": "", + "parameterTypes": [ + "akka.actor.ExtendedActorSystem", + "com.typesafe.config.Config", + "java.lang.String" + ] + } + ] } ] \ No newline at end of file diff --git a/native-image-tests/src/main/scala/com/lightbend/DurableStateTester.scala b/native-image-tests/src/main/scala/com/lightbend/DurableStateTester.scala new file mode 100644 index 00000000..6017f71a --- /dev/null +++ b/native-image-tests/src/main/scala/com/lightbend/DurableStateTester.scala @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2009-2024 Lightbend Inc. + */ +package com.lightbend + +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.state.scaladsl.DurableStateBehavior +import akka.persistence.typed.state.scaladsl.Effect +import akka.serialization.jackson.JsonSerializable +import com.fasterxml.jackson.annotation.JsonCreator + +import scala.concurrent.duration.DurationInt + +object DurableStateCounter { + sealed trait Command extends JsonSerializable + final case class Increase(amount: Int, replyTo: ActorRef[Increased]) extends Command + + // FIXME why doesn't @JsonCreator work as usual? is it something missing from the jackson feature? + final case class GetState @JsonCreator() (replyTo: ActorRef[State]) extends Command + + final case class Increased @JsonCreator() (newValue: Int) extends JsonSerializable + + final case class State @JsonCreator() (value: Int) extends JsonSerializable + def apply(id: String): Behavior[Command] = + DurableStateBehavior[Command, State]( + PersistenceId("DSCounter", id), + State(0), + { + case (state, Increase(amount, replyTo)) => + Effect.persist(State(state.value + amount)).thenReply(replyTo)(newState => Increased(newState.value)) + case (state, GetState(replyTo)) => + Effect.reply(replyTo)(state) + }) +} + +object DurableStateTester { + + def apply(whenDone: ActorRef[String]): Behavior[AnyRef] = Behaviors.setup { context => + Behaviors.withTimers { timers => + timers.startSingleTimer("Timeout", 10.seconds) + + var durableActor = context.spawn(DurableStateCounter("one"), "DurableOne") + context.watchWith(durableActor, "DurableOneStopped") + + def messageOrTimeout(step: String)(partial: PartialFunction[AnyRef, Behavior[AnyRef]]): Behavior[AnyRef] = { + context.log.info("On {}", step) + Behaviors.receiveMessage(message => + partial.orElse[AnyRef, Behavior[AnyRef]] { + case "Timeout" => + context.log.error(s"Durable state checks timed out in {}", step) + System.exit(1) + Behaviors.same + + case other => + context.log.warn("Unexpected message in {}: {}", step, other) + Behaviors.same + }(message)) + } + + durableActor ! DurableStateCounter.Increase(1, context.self) + + def step1() = messageOrTimeout("step1") { case DurableStateCounter.Increased(1) => + // write works + context.stop(durableActor) + step2() + } + + def step2() = messageOrTimeout("step2") { case "DurableOneStopped" => + durableActor = context.spawn(DurableStateCounter("one"), "DurableOneIncarnation2") + durableActor ! DurableStateCounter.GetState(context.self) + step3() + } + + def step3() = messageOrTimeout("step3") { case DurableStateCounter.State(1) => + whenDone ! "Durable State works" + Behaviors.stopped + } + + step1() + } + } + +} diff --git a/native-image-tests/src/main/scala/com/lightbend/EsbTester.scala b/native-image-tests/src/main/scala/com/lightbend/EsbTester.scala index 35658355..59bab439 100644 --- a/native-image-tests/src/main/scala/com/lightbend/EsbTester.scala +++ b/native-image-tests/src/main/scala/com/lightbend/EsbTester.scala @@ -12,7 +12,6 @@ import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.serialization.jackson.JsonSerializable import com.fasterxml.jackson.annotation.JsonCreator -import com.fasterxml.jackson.annotation.JsonProperty import scala.concurrent.duration.DurationInt @@ -20,16 +19,14 @@ object EventSourcedCounter { sealed trait Command extends JsonSerializable final case class Increase(amount: Int, replyTo: ActorRef[StatusReply[Increased]]) extends Command - @JsonCreator - final case class GetValue(replyTo: ActorRef[StatusReply[GetValueResponse]]) extends Command + final case class GetValue @JsonCreator() (replyTo: ActorRef[StatusReply[GetValueResponse]]) extends Command final case class GetValueResponse(value: Int) sealed trait Event extends JsonSerializable - // FIXME why doesn't @JsonCreator work as usual? is it something missing from the akka jackson feature? - final case class Increased(@JsonProperty("amount") amount: Int) extends Event + final case class Increased @JsonCreator() (amount: Int) extends Event - final case class State(@JsonProperty("value") value: Int) extends JsonSerializable + final case class State @JsonCreator() (value: Int) extends JsonSerializable def apply(id: String): Behavior[Command] = EventSourcedBehavior[Command, Event, State]( PersistenceId("EventSourcedHelloWorld", id), diff --git a/native-image-tests/src/main/scala/com/lightbend/Main.scala b/native-image-tests/src/main/scala/com/lightbend/Main.scala index 767439d7..6ceeeaeb 100644 --- a/native-image-tests/src/main/scala/com/lightbend/Main.scala +++ b/native-image-tests/src/main/scala/com/lightbend/Main.scala @@ -4,24 +4,36 @@ import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors +import scala.concurrent.duration.DurationInt + object RootBehavior { def apply(): Behavior[AnyRef] = Behaviors.setup { context => - context.spawn(EsbTester(context.self), "ESBTester") + Behaviors.withTimers { timers => + timers.startSingleTimer("Timeout", 30.seconds) + context.spawn(EsbTester(context.self), "ESBTester") + context.spawn(DurableStateTester(context.self), "DurableStateTester") + + var awaitedOks = Set("ESB works", "Durable State works") - var awaitedOks = Set("ESB works") + Behaviors.receiveMessage { + case "Timeout" => + context.log.error("Suite of checks timed out, missing awaitedOks: {}", awaitedOks) + System.exit(1) + Behaviors.same - Behaviors.receiveMessage { - case string: String => - awaitedOks -= string - if (awaitedOks.isEmpty) { - context.log.info("All checks ok, shutting down") - Behaviors.stopped - } else { + case string: String => + awaitedOks -= string + if (awaitedOks.isEmpty) { + context.log.info("All checks ok, shutting down") + Behaviors.stopped + } else { + context.log.info("Continuing, awaitedOks not empty: {}", awaitedOks) + Behaviors.same + } + case other => + context.log.warn("Unexpected message: {}", other) Behaviors.same - } - case other => - context.log.warn("Unexpected message: {}", other) - Behaviors.same + } } } }