diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessRescaleSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessRescaleSpec.scala index 4e47b184874..f0f96aaaa0d 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessRescaleSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessRescaleSpec.scala @@ -7,19 +7,22 @@ package akka.cluster.sharding.typed import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.ScalaFutures +import org.scalatest.time.Span +import akka.Done import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior -import akka.actor.typed.receptionist.Receptionist -import akka.actor.typed.receptionist.ServiceKey import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.Routers import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps import akka.cluster.MultiNodeClusterSpec import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess +import akka.cluster.typed.ClusterSingleton import akka.cluster.typed.MultiNodeTypedClusterSpec +import akka.cluster.typed.SingletonActor import akka.pattern.StatusReply import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec @@ -34,28 +37,57 @@ object ShardedDaemonProcessRescaleSpec extends MultiNodeConfig { val sixth = role("sixth") val seventh = role("seventh") - val SnitchServiceKey = ServiceKey[AnyRef]("snitch") - case class ProcessActorEvent(id: Int, event: Any) extends CborSerializable object ProcessActor { trait Command case object Stop extends Command - def apply(id: Int): Behavior[Command] = Behaviors.setup { ctx => + def apply(id: Int, collector: ActorRef[Collector.Command]): Behavior[Command] = Behaviors.setup { ctx => ctx.log.info("Started [{}]", id) - val snitchRouter = ctx.spawn(Routers.group(SnitchServiceKey), "router") - snitchRouter ! ProcessActorEvent(id, "Started") + collector ! Collector.Started(id) Behaviors.receiveMessagePartial { case Stop => ctx.log.info("Stopped [{}]", id) - snitchRouter ! ProcessActorEvent(id, "Stopped") + collector ! Collector.Stopped(id) Behaviors.stopped } } } + object Collector { + sealed trait Command extends CborSerializable + final case class Started(id: Int) extends Command + final case class Stopped(id: Int) extends Command + final case class Get(replyTo: ActorRef[Counts]) extends Command + final case class Counts(startedCount: Int, stoppedCount: Int) extends CborSerializable + final case class Reset(replyTo: ActorRef[Done]) extends Command + + def init(system: ActorSystem[_]): ActorRef[Command] = { + ClusterSingleton(system).init(SingletonActor(Collector(), "collector")) + } + + def apply(): Behavior[Command] = { + behavior(Counts(startedCount = 0, stoppedCount = 0)) + } + + private def behavior(counts: Counts): Behavior[Command] = { + Behaviors.receiveMessage { + case Started(_) => + behavior(counts.copy(startedCount = counts.startedCount + 1)) + case Stopped(_) => + behavior(counts.copy(stoppedCount = counts.stoppedCount + 1)) + case Get(replyTo) => + replyTo ! counts + Behaviors.same + case Reset(replyTo) => + replyTo ! Done + behavior(Counts(0, 0)) + } + } + } + commonConfig( ConfigFactory.parseString(""" akka.loglevel = DEBUG @@ -89,12 +121,19 @@ class ShardedDaemonProcessRescaleMultiJvmNode7 extends ShardedDaemonProcessResca abstract class ShardedDaemonProcessRescaleSpec extends MultiNodeSpec(ShardedDaemonProcessRescaleSpec) with MultiNodeTypedClusterSpec - with ScalaFutures { + with ScalaFutures + with Eventually { import ShardedDaemonProcessRescaleSpec._ - val topicProbe: TestProbe[AnyRef] = TestProbe[AnyRef]() + implicit val patience: PatienceConfig = { + import akka.testkit.TestDuration + PatienceConfig(testKitSettings.DefaultTimeout.duration.dilated * 2, Span(500, org.scalatest.time.Millis)) + } + private var sdp: ActorRef[ShardedDaemonProcessCommand] = _ + private var collector: ActorRef[Collector.Command] = _ + private val resetProbe = TestProbe[Done]() private def assertNumberOfProcesses(n: Int, revision: Int): Unit = { val probe = TestProbe[NumberOfProcesses]() @@ -108,35 +147,23 @@ abstract class ShardedDaemonProcessRescaleSpec "Cluster sharding in multi dc cluster" must { "form cluster" in { formCluster(first, second, third, fourth, fifth, sixth, seventh) - runOn(first) { - typedSystem.receptionist ! Receptionist.Register(SnitchServiceKey, topicProbe.ref, topicProbe.ref) - topicProbe.expectMessageType[Receptionist.Registered] - } - enterBarrier("snitch-registered") - topicProbe.awaitAssert({ - typedSystem.receptionist ! Receptionist.Find(SnitchServiceKey, topicProbe.ref) - topicProbe.expectMessageType[Receptionist.Listing].serviceInstances(SnitchServiceKey).size should ===(1) - }, 5.seconds) - enterBarrier("snitch-seen") + collector = Collector.init(system.toTyped) + enterBarrier("collector-started") } "init actor set" in { sdp = ShardedDaemonProcess(typedSystem).initWithContext( "the-fearless", 4, - ctx => ProcessActor(ctx.processNumber), + ctx => ProcessActor(ctx.processNumber, collector), ShardedDaemonProcessSettings(system.toTyped), ProcessActor.Stop) enterBarrier("sharded-daemon-process-initialized") - runOn(first) { - val startedIds = (0 to 3).map { _ => - val event = topicProbe.expectMessageType[ProcessActorEvent](5.seconds) - event.event should ===("Started") - event.id - }.toSet - startedIds.size should ===(4) - topicProbe.expectNoMessage() + eventually { + val countsReplyProbe = TestProbe[Collector.Counts]() + collector ! Collector.Get(countsReplyProbe.ref) + countsReplyProbe.expectMessage(500.millis, Collector.Counts(startedCount = 4, stoppedCount = 0)) } enterBarrier("sharded-daemon-process-started-acked") runOn(third) { @@ -147,14 +174,18 @@ abstract class ShardedDaemonProcessRescaleSpec "rescale to 8 workers" in { runOn(first) { + collector ! Collector.Reset(resetProbe.ref) + resetProbe.expectMessage(Done) + val probe = TestProbe[AnyRef]() sdp ! ChangeNumberOfProcesses(8, probe.ref) probe.expectMessage(30.seconds, StatusReply.Ack) -// FIXME snitch router is dropping messages -// val events = topicProbe.receiveMessages(4 + 8, 10.seconds).map(_.asInstanceOf[ProcessActorEvent]) -// events.collect { case evt if evt.event == "Stopped" => evt.id }.toSet.size should ===(4) -// events.collect { case evt if evt.event == "Started" => evt.id }.toSet.size should ===(8) -// topicProbe.expectNoMessage() + } + enterBarrier("sharded-daemon-process-rescaled-to-8") + eventually { + val countsReplyProbe = TestProbe[Collector.Counts]() + collector ! Collector.Get(countsReplyProbe.ref) + countsReplyProbe.expectMessage(500.millis, Collector.Counts(startedCount = 8, stoppedCount = 4)) } enterBarrier("sharded-daemon-process-rescaled-to-8-acked") @@ -166,18 +197,21 @@ abstract class ShardedDaemonProcessRescaleSpec "rescale to 2 workers" in { runOn(second) { + collector ! Collector.Reset(resetProbe.ref) + resetProbe.expectMessage(Done) + val probe = TestProbe[AnyRef]() sdp ! ChangeNumberOfProcesses(2, probe.ref) probe.expectMessage(30.seconds, StatusReply.Ack) } - enterBarrier("sharded-daemon-process-rescaled-to-2-acked") - runOn(first) { -// FIXME snitch router is dropping messages -// val events = topicProbe.receiveMessages(8 + 2, 10.seconds).map(_.asInstanceOf[ProcessActorEvent]) -// events.collect { case evt if evt.event == "Stopped" => evt.id }.toSet.size should ===(8) -// events.collect { case evt if evt.event == "Started" => evt.id }.toSet.size should ===(2) -// topicProbe.expectNoMessage() + enterBarrier("sharded-daemon-process-rescaled-to-2") + eventually { + val countsReplyProbe = TestProbe[Collector.Counts]() + collector ! Collector.Get(countsReplyProbe.ref) + countsReplyProbe.expectMessage(500.millis, Collector.Counts(startedCount = 2, stoppedCount = 8)) } + + enterBarrier("sharded-daemon-process-rescaled-to-2-acked") runOn(third) { assertNumberOfProcesses(n = 2, revision = 2) }