Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@ package akka.cluster.sharding.typed
import scala.concurrent.duration._

import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.Span

import akka.Done
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Routers
import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps
import akka.cluster.MultiNodeClusterSpec
import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
import akka.cluster.typed.ClusterSingleton
import akka.cluster.typed.MultiNodeTypedClusterSpec
import akka.cluster.typed.SingletonActor
import akka.pattern.StatusReply
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
Expand All @@ -34,28 +37,57 @@ object ShardedDaemonProcessRescaleSpec extends MultiNodeConfig {
val sixth = role("sixth")
val seventh = role("seventh")

val SnitchServiceKey = ServiceKey[AnyRef]("snitch")

case class ProcessActorEvent(id: Int, event: Any) extends CborSerializable

object ProcessActor {
trait Command
case object Stop extends Command

def apply(id: Int): Behavior[Command] = Behaviors.setup { ctx =>
def apply(id: Int, collector: ActorRef[Collector.Command]): Behavior[Command] = Behaviors.setup { ctx =>
ctx.log.info("Started [{}]", id)
val snitchRouter = ctx.spawn(Routers.group(SnitchServiceKey), "router")
snitchRouter ! ProcessActorEvent(id, "Started")
collector ! Collector.Started(id)

Behaviors.receiveMessagePartial {
case Stop =>
ctx.log.info("Stopped [{}]", id)
snitchRouter ! ProcessActorEvent(id, "Stopped")
collector ! Collector.Stopped(id)
Behaviors.stopped
}
}
}

object Collector {
sealed trait Command extends CborSerializable
final case class Started(id: Int) extends Command
final case class Stopped(id: Int) extends Command
final case class Get(replyTo: ActorRef[Counts]) extends Command
final case class Counts(startedCount: Int, stoppedCount: Int) extends CborSerializable
final case class Reset(replyTo: ActorRef[Done]) extends Command

def init(system: ActorSystem[_]): ActorRef[Command] = {
ClusterSingleton(system).init(SingletonActor(Collector(), "collector"))
}

def apply(): Behavior[Command] = {
behavior(Counts(startedCount = 0, stoppedCount = 0))
}

private def behavior(counts: Counts): Behavior[Command] = {
Behaviors.receiveMessage {
case Started(_) =>
behavior(counts.copy(startedCount = counts.startedCount + 1))
case Stopped(_) =>
behavior(counts.copy(stoppedCount = counts.stoppedCount + 1))
case Get(replyTo) =>
replyTo ! counts
Behaviors.same
case Reset(replyTo) =>
replyTo ! Done
behavior(Counts(0, 0))
}
}
}

commonConfig(
ConfigFactory.parseString("""
akka.loglevel = DEBUG
Expand Down Expand Up @@ -89,12 +121,19 @@ class ShardedDaemonProcessRescaleMultiJvmNode7 extends ShardedDaemonProcessResca
abstract class ShardedDaemonProcessRescaleSpec
extends MultiNodeSpec(ShardedDaemonProcessRescaleSpec)
with MultiNodeTypedClusterSpec
with ScalaFutures {
with ScalaFutures
with Eventually {

import ShardedDaemonProcessRescaleSpec._

val topicProbe: TestProbe[AnyRef] = TestProbe[AnyRef]()
implicit val patience: PatienceConfig = {
import akka.testkit.TestDuration
PatienceConfig(testKitSettings.DefaultTimeout.duration.dilated * 2, Span(500, org.scalatest.time.Millis))
}

private var sdp: ActorRef[ShardedDaemonProcessCommand] = _
private var collector: ActorRef[Collector.Command] = _
private val resetProbe = TestProbe[Done]()

private def assertNumberOfProcesses(n: Int, revision: Int): Unit = {
val probe = TestProbe[NumberOfProcesses]()
Expand All @@ -108,35 +147,23 @@ abstract class ShardedDaemonProcessRescaleSpec
"Cluster sharding in multi dc cluster" must {
"form cluster" in {
formCluster(first, second, third, fourth, fifth, sixth, seventh)
runOn(first) {
typedSystem.receptionist ! Receptionist.Register(SnitchServiceKey, topicProbe.ref, topicProbe.ref)
topicProbe.expectMessageType[Receptionist.Registered]
}
enterBarrier("snitch-registered")

topicProbe.awaitAssert({
typedSystem.receptionist ! Receptionist.Find(SnitchServiceKey, topicProbe.ref)
topicProbe.expectMessageType[Receptionist.Listing].serviceInstances(SnitchServiceKey).size should ===(1)
}, 5.seconds)
enterBarrier("snitch-seen")
collector = Collector.init(system.toTyped)
enterBarrier("collector-started")
}

"init actor set" in {
sdp = ShardedDaemonProcess(typedSystem).initWithContext(
"the-fearless",
4,
ctx => ProcessActor(ctx.processNumber),
ctx => ProcessActor(ctx.processNumber, collector),
ShardedDaemonProcessSettings(system.toTyped),
ProcessActor.Stop)
enterBarrier("sharded-daemon-process-initialized")
runOn(first) {
val startedIds = (0 to 3).map { _ =>
val event = topicProbe.expectMessageType[ProcessActorEvent](5.seconds)
event.event should ===("Started")
event.id
}.toSet
startedIds.size should ===(4)
topicProbe.expectNoMessage()
eventually {
val countsReplyProbe = TestProbe[Collector.Counts]()
collector ! Collector.Get(countsReplyProbe.ref)
countsReplyProbe.expectMessage(500.millis, Collector.Counts(startedCount = 4, stoppedCount = 0))
}
enterBarrier("sharded-daemon-process-started-acked")
runOn(third) {
Expand All @@ -147,14 +174,18 @@ abstract class ShardedDaemonProcessRescaleSpec

"rescale to 8 workers" in {
runOn(first) {
collector ! Collector.Reset(resetProbe.ref)
resetProbe.expectMessage(Done)

val probe = TestProbe[AnyRef]()
sdp ! ChangeNumberOfProcesses(8, probe.ref)
probe.expectMessage(30.seconds, StatusReply.Ack)
// FIXME snitch router is dropping messages
// val events = topicProbe.receiveMessages(4 + 8, 10.seconds).map(_.asInstanceOf[ProcessActorEvent])
// events.collect { case evt if evt.event == "Stopped" => evt.id }.toSet.size should ===(4)
// events.collect { case evt if evt.event == "Started" => evt.id }.toSet.size should ===(8)
// topicProbe.expectNoMessage()
}
enterBarrier("sharded-daemon-process-rescaled-to-8")
eventually {
val countsReplyProbe = TestProbe[Collector.Counts]()
collector ! Collector.Get(countsReplyProbe.ref)
countsReplyProbe.expectMessage(500.millis, Collector.Counts(startedCount = 8, stoppedCount = 4))
}

enterBarrier("sharded-daemon-process-rescaled-to-8-acked")
Expand All @@ -166,18 +197,21 @@ abstract class ShardedDaemonProcessRescaleSpec

"rescale to 2 workers" in {
runOn(second) {
collector ! Collector.Reset(resetProbe.ref)
resetProbe.expectMessage(Done)

val probe = TestProbe[AnyRef]()
sdp ! ChangeNumberOfProcesses(2, probe.ref)
probe.expectMessage(30.seconds, StatusReply.Ack)
}
enterBarrier("sharded-daemon-process-rescaled-to-2-acked")
runOn(first) {
// FIXME snitch router is dropping messages
// val events = topicProbe.receiveMessages(8 + 2, 10.seconds).map(_.asInstanceOf[ProcessActorEvent])
// events.collect { case evt if evt.event == "Stopped" => evt.id }.toSet.size should ===(8)
// events.collect { case evt if evt.event == "Started" => evt.id }.toSet.size should ===(2)
// topicProbe.expectNoMessage()
enterBarrier("sharded-daemon-process-rescaled-to-2")
eventually {
val countsReplyProbe = TestProbe[Collector.Counts]()
collector ! Collector.Get(countsReplyProbe.ref)
countsReplyProbe.expectMessage(500.millis, Collector.Counts(startedCount = 2, stoppedCount = 8))
}

enterBarrier("sharded-daemon-process-rescaled-to-2-acked")
runOn(third) {
assertNumberOfProcesses(n = 2, revision = 2)
}
Expand Down