From 9e7c9e9c9b6f19b232e0ab152cc17a5138f5d876 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 25 Nov 2025 13:09:05 +0100 Subject: [PATCH] fix: Avoid start of ShardedDaemonProcess for old revision * when rescaling the revision is bumped, and all previous processes are stopped * if a keep-alive message is in flight that can trigger a new start of a process that belongs to the previous revision, because the revision check was using local read consistency * change to use same consistency as the ShardedDaemonProcessCoordinator --- .../ShardedDaemonProcessCoordinator.scala | 7 +- .../internal/ShardedDaemonProcessImpl.scala | 17 ++++- .../internal/ShardedDaemonProcessState.scala | 16 ++++- .../ShardedDaemonProcessRescaleSpec.scala | 68 ++++++++++++++++--- 4 files changed, 92 insertions(+), 16 deletions(-) diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessCoordinator.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessCoordinator.scala index 64ec91a9a20..60a1ff69fd5 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessCoordinator.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessCoordinator.scala @@ -306,11 +306,16 @@ private final class ShardedDaemonProcessCoordinator private ( case ShardStopped(shard) => val newShardsStillRunning = shardsStillRunning - shard if (newShardsStillRunning.isEmpty) { + context.log.info( + "{}: All shards stopped. Continue rescaling of Sharded Daemon Process to [{}] processes, rev [{}]", + daemonProcessName, + state.numberOfProcesses, + state.revision) timers.cancel(ShardStopTimeout) rescalingComplete(state, request) } else waitForShardsStopping(state, request, previousNumberOfProcesses, newShardsStillRunning) case ShardStopTimeout => - context.log.debug("{}: Stopping shards timed out after [{}] retrying", daemonProcessName, stopShardsTimeout) + context.log.info("{}: Stopping shards timed out after [{}] retrying", daemonProcessName, stopShardsTimeout) stopAllShards(state, request, previousNumberOfProcesses) } } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala index 417b6566141..d3298861fd2 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala @@ -15,6 +15,9 @@ import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.annotation.InternalApi +import akka.cluster.ddata.Replicator.ReadAll +import akka.cluster.ddata.Replicator.ReadConsistency +import akka.cluster.ddata.Replicator.ReadMajorityPlus import akka.cluster.ddata.typed.scaladsl.DistributedData import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardRegion.EntityId @@ -166,13 +169,21 @@ private[akka] final class ShardedDaemonProcessImpl(system: ActorSystem[_]) shardingBaseSettings.leaseSettings) } + val stateReadConsistency: ReadConsistency = + shardingSettings.tuningParameters.coordinatorStateReadMajorityPlus match { + case Int.MaxValue => ReadAll(shardingSettings.tuningParameters.waitingForStateTimeout) + case additional => + val majorityMinCap = + system.settings.config.getInt("akka.cluster.sharding.distributed-data.majority-min-cap") + ReadMajorityPlus(shardingSettings.tuningParameters.waitingForStateTimeout, additional, majorityMinCap) + } + val entity = Entity(entityTypeKey) { ctx => val decodedId = decodeEntityId(ctx.entityId, initialNumberOfProcesses = numberOfInstances) val sdContext = ShardedDaemonProcessContextImpl(decodedId.processNumber, decodedId.totalCount, name, decodedId.revision) - if (supportsRescale) verifyRevisionBeforeStarting(behaviorFactory)(sdContext) - else - behaviorFactory(sdContext) + if (supportsRescale) verifyRevisionBeforeStarting(stateReadConsistency, behaviorFactory)(sdContext) + else behaviorFactory(sdContext) }.withSettings(shardingSettings).withMessageExtractor(new MessageExtractor()) val entityWithStop = stopMessage match { diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessState.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessState.scala index 273dbe5064d..218f375ac42 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessState.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessState.scala @@ -11,6 +11,7 @@ import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi import akka.cluster.ddata.Key import akka.cluster.ddata.ReplicatedData +import akka.cluster.ddata.Replicator.ReadConsistency import akka.cluster.ddata.typed.scaladsl.DistributedData import akka.cluster.ddata.typed.scaladsl.Replicator import akka.cluster.sharding.typed.ShardedDaemonProcessContext @@ -68,6 +69,7 @@ private[akka] object ShardedDaemonProcessState { started = Instant.now()) def verifyRevisionBeforeStarting[T]( + stateReadConsistency: ReadConsistency, behaviorFactory: ShardedDaemonProcessContext => Behavior[T]): ShardedDaemonProcessContext => Behavior[T] = { sdpContext => Behaviors.setup { context => @@ -88,7 +90,7 @@ private[akka] object ShardedDaemonProcessState { // we can't anyway turn reply into T so no need for the usual adapter val distributedData = DistributedData(context.system) - distributedData.replicator ! Replicator.Get(key, Replicator.ReadLocal, context.self.unsafeUpcast) + distributedData.replicator ! Replicator.Get(key, stateReadConsistency, context.self.unsafeUpcast) Behaviors.receiveMessagePartial { case reply @ Replicator.GetSuccess(`key`) => val state = reply.get(key) @@ -101,7 +103,7 @@ private[akka] object ShardedDaemonProcessState { revision) behaviorFactory(sdpContext).unsafeCast } else { - context.log.warn( + context.log.info( "{}: Tried to start an old revision of worker ([{}] but latest revision is [{}], started at {})", sdpContext.name, sdpContext.revision, @@ -126,6 +128,16 @@ private[akka] object ShardedDaemonProcessState { sdpContext.revision) Behaviors.stopped } + case Replicator.GetFailure(_) => + context.log.info( + "{}: Tried to start Sharded Daemon Process [{}] out of a total [{}], but current revision couldn't " + + "be retrieved. Retrying. (revision [{}])", + sdpContext.name, + sdpContext.processNumber, + sdpContext.totalProcesses, + revision) + distributedData.replicator ! Replicator.Get(key, stateReadConsistency, context.self.unsafeUpcast) + Behaviors.same } } diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessRescaleSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessRescaleSpec.scala index 32f5acb3a27..4e47b184874 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessRescaleSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessRescaleSpec.scala @@ -29,6 +29,10 @@ object ShardedDaemonProcessRescaleSpec extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") + val fourth = role("fourth") + val fifth = role("fifth") + val sixth = role("sixth") + val seventh = role("seventh") val SnitchServiceKey = ServiceKey[AnyRef]("snitch") @@ -39,26 +43,36 @@ object ShardedDaemonProcessRescaleSpec extends MultiNodeConfig { case object Stop extends Command def apply(id: Int): Behavior[Command] = Behaviors.setup { ctx => + ctx.log.info("Started [{}]", id) val snitchRouter = ctx.spawn(Routers.group(SnitchServiceKey), "router") snitchRouter ! ProcessActorEvent(id, "Started") Behaviors.receiveMessagePartial { case Stop => + ctx.log.info("Stopped [{}]", id) snitchRouter ! ProcessActorEvent(id, "Stopped") Behaviors.stopped } } } - commonConfig(ConfigFactory.parseString(""" + commonConfig( + ConfigFactory.parseString(""" akka.loglevel = DEBUG akka.cluster.sharded-daemon-process { sharding { # First is likely to be ignored as shard coordinator not ready retry-interval = 0.2s } - # quick ping to make test swift - keep-alive-interval = 1s + # quick ping to make test swift, stress the start/stop guarantees during rescaling + keep-alive-interval = 20 ms + # disable throttle to stress the start/stop guarantees during rescaling + keep-alive-throttle-interval = 0 s + } + akka.cluster.sharding { + distributed-data.majority-min-cap = 4 + coordinator-state.write-majority-plus = 0 + coordinator-state.read-majority-plus = 0 } """).withFallback(MultiNodeClusterSpec.clusterConfig)) @@ -67,6 +81,10 @@ object ShardedDaemonProcessRescaleSpec extends MultiNodeConfig { class ShardedDaemonProcessRescaleMultiJvmNode1 extends ShardedDaemonProcessRescaleSpec class ShardedDaemonProcessRescaleMultiJvmNode2 extends ShardedDaemonProcessRescaleSpec class ShardedDaemonProcessRescaleMultiJvmNode3 extends ShardedDaemonProcessRescaleSpec +class ShardedDaemonProcessRescaleMultiJvmNode4 extends ShardedDaemonProcessRescaleSpec +class ShardedDaemonProcessRescaleMultiJvmNode5 extends ShardedDaemonProcessRescaleSpec +class ShardedDaemonProcessRescaleMultiJvmNode6 extends ShardedDaemonProcessRescaleSpec +class ShardedDaemonProcessRescaleMultiJvmNode7 extends ShardedDaemonProcessRescaleSpec abstract class ShardedDaemonProcessRescaleSpec extends MultiNodeSpec(ShardedDaemonProcessRescaleSpec) @@ -78,9 +96,18 @@ abstract class ShardedDaemonProcessRescaleSpec val topicProbe: TestProbe[AnyRef] = TestProbe[AnyRef]() private var sdp: ActorRef[ShardedDaemonProcessCommand] = _ + private def assertNumberOfProcesses(n: Int, revision: Int): Unit = { + val probe = TestProbe[NumberOfProcesses]() + sdp ! GetNumberOfProcesses(probe.ref) + val reply = probe.receiveMessage() + reply.numberOfProcesses should ===(n) + reply.revision should ===(revision) + reply.rescaleInProgress === (false) + } + "Cluster sharding in multi dc cluster" must { "form cluster" in { - formCluster(first, second, third) + formCluster(first, second, third, fourth, fifth, sixth, seventh) runOn(first) { typedSystem.receptionist ! Receptionist.Register(SnitchServiceKey, topicProbe.ref, topicProbe.ref) topicProbe.expectMessageType[Receptionist.Registered] @@ -109,6 +136,11 @@ abstract class ShardedDaemonProcessRescaleSpec event.id }.toSet startedIds.size should ===(4) + topicProbe.expectNoMessage() + } + enterBarrier("sharded-daemon-process-started-acked") + runOn(third) { + assertNumberOfProcesses(n = 4, revision = 0) } enterBarrier("sharded-daemon-process-started") } @@ -118,6 +150,16 @@ abstract class ShardedDaemonProcessRescaleSpec val probe = TestProbe[AnyRef]() sdp ! ChangeNumberOfProcesses(8, probe.ref) probe.expectMessage(30.seconds, StatusReply.Ack) +// FIXME snitch router is dropping messages +// val events = topicProbe.receiveMessages(4 + 8, 10.seconds).map(_.asInstanceOf[ProcessActorEvent]) +// events.collect { case evt if evt.event == "Stopped" => evt.id }.toSet.size should ===(4) +// events.collect { case evt if evt.event == "Started" => evt.id }.toSet.size should ===(8) +// topicProbe.expectNoMessage() + } + + enterBarrier("sharded-daemon-process-rescaled-to-8-acked") + runOn(third) { + assertNumberOfProcesses(n = 8, revision = 1) } enterBarrier("sharded-daemon-process-rescaled-to-8") } @@ -128,17 +170,23 @@ abstract class ShardedDaemonProcessRescaleSpec sdp ! ChangeNumberOfProcesses(2, probe.ref) probe.expectMessage(30.seconds, StatusReply.Ack) } + enterBarrier("sharded-daemon-process-rescaled-to-2-acked") + runOn(first) { +// FIXME snitch router is dropping messages +// val events = topicProbe.receiveMessages(8 + 2, 10.seconds).map(_.asInstanceOf[ProcessActorEvent]) +// events.collect { case evt if evt.event == "Stopped" => evt.id }.toSet.size should ===(8) +// events.collect { case evt if evt.event == "Started" => evt.id }.toSet.size should ===(2) +// topicProbe.expectNoMessage() + } + runOn(third) { + assertNumberOfProcesses(n = 2, revision = 2) + } enterBarrier("sharded-daemon-process-rescaled-to-2") } "query the state" in { runOn(third) { - val probe = TestProbe[NumberOfProcesses]() - sdp ! GetNumberOfProcesses(probe.ref) - val reply = probe.receiveMessage() - reply.numberOfProcesses should ===(2) - reply.revision should ===(2) - reply.rescaleInProgress === (false) + assertNumberOfProcesses(n = 2, revision = 2) } enterBarrier("sharded-daemon-process-query") }