Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,16 @@ private final class ShardedDaemonProcessCoordinator private (
case ShardStopped(shard) =>
val newShardsStillRunning = shardsStillRunning - shard
if (newShardsStillRunning.isEmpty) {
context.log.info(
"{}: All shards stopped. Continue rescaling of Sharded Daemon Process to [{}] processes, rev [{}]",
daemonProcessName,
state.numberOfProcesses,
state.revision)
timers.cancel(ShardStopTimeout)
rescalingComplete(state, request)
} else waitForShardsStopping(state, request, previousNumberOfProcesses, newShardsStillRunning)
case ShardStopTimeout =>
context.log.debug("{}: Stopping shards timed out after [{}] retrying", daemonProcessName, stopShardsTimeout)
context.log.info("{}: Stopping shards timed out after [{}] retrying", daemonProcessName, stopShardsTimeout)
stopAllShards(state, request, previousNumberOfProcesses)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.annotation.InternalApi
import akka.cluster.ddata.Replicator.ReadAll
import akka.cluster.ddata.Replicator.ReadConsistency
import akka.cluster.ddata.Replicator.ReadMajorityPlus
import akka.cluster.ddata.typed.scaladsl.DistributedData
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion.EntityId
Expand Down Expand Up @@ -166,13 +169,21 @@ private[akka] final class ShardedDaemonProcessImpl(system: ActorSystem[_])
shardingBaseSettings.leaseSettings)
}

val stateReadConsistency: ReadConsistency =
shardingSettings.tuningParameters.coordinatorStateReadMajorityPlus match {
case Int.MaxValue => ReadAll(shardingSettings.tuningParameters.waitingForStateTimeout)
case additional =>
val majorityMinCap =
system.settings.config.getInt("akka.cluster.sharding.distributed-data.majority-min-cap")
ReadMajorityPlus(shardingSettings.tuningParameters.waitingForStateTimeout, additional, majorityMinCap)
}

val entity = Entity(entityTypeKey) { ctx =>
val decodedId = decodeEntityId(ctx.entityId, initialNumberOfProcesses = numberOfInstances)
val sdContext =
ShardedDaemonProcessContextImpl(decodedId.processNumber, decodedId.totalCount, name, decodedId.revision)
if (supportsRescale) verifyRevisionBeforeStarting(behaviorFactory)(sdContext)
else
behaviorFactory(sdContext)
if (supportsRescale) verifyRevisionBeforeStarting(stateReadConsistency, behaviorFactory)(sdContext)
else behaviorFactory(sdContext)
}.withSettings(shardingSettings).withMessageExtractor(new MessageExtractor())

val entityWithStop = stopMessage match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.cluster.ddata.Key
import akka.cluster.ddata.ReplicatedData
import akka.cluster.ddata.Replicator.ReadConsistency
import akka.cluster.ddata.typed.scaladsl.DistributedData
import akka.cluster.ddata.typed.scaladsl.Replicator
import akka.cluster.sharding.typed.ShardedDaemonProcessContext
Expand Down Expand Up @@ -68,6 +69,7 @@ private[akka] object ShardedDaemonProcessState {
started = Instant.now())

def verifyRevisionBeforeStarting[T](
stateReadConsistency: ReadConsistency,
behaviorFactory: ShardedDaemonProcessContext => Behavior[T]): ShardedDaemonProcessContext => Behavior[T] = {
sdpContext =>
Behaviors.setup { context =>
Expand All @@ -88,7 +90,7 @@ private[akka] object ShardedDaemonProcessState {

// we can't anyway turn reply into T so no need for the usual adapter
val distributedData = DistributedData(context.system)
distributedData.replicator ! Replicator.Get(key, Replicator.ReadLocal, context.self.unsafeUpcast)
distributedData.replicator ! Replicator.Get(key, stateReadConsistency, context.self.unsafeUpcast)
Behaviors.receiveMessagePartial {
case reply @ Replicator.GetSuccess(`key`) =>
val state = reply.get(key)
Expand All @@ -101,7 +103,7 @@ private[akka] object ShardedDaemonProcessState {
revision)
behaviorFactory(sdpContext).unsafeCast
} else {
context.log.warn(
context.log.info(
"{}: Tried to start an old revision of worker ([{}] but latest revision is [{}], started at {})",
sdpContext.name,
sdpContext.revision,
Expand All @@ -126,6 +128,16 @@ private[akka] object ShardedDaemonProcessState {
sdpContext.revision)
Behaviors.stopped
}
case Replicator.GetFailure(_) =>
context.log.info(
"{}: Tried to start Sharded Daemon Process [{}] out of a total [{}], but current revision couldn't " +
"be retrieved. Retrying. (revision [{}])",
sdpContext.name,
sdpContext.processNumber,
sdpContext.totalProcesses,
revision)
distributedData.replicator ! Replicator.Get(key, stateReadConsistency, context.self.unsafeUpcast)
Behaviors.same
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ object ShardedDaemonProcessRescaleSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
val sixth = role("sixth")
val seventh = role("seventh")

val SnitchServiceKey = ServiceKey[AnyRef]("snitch")

Expand All @@ -39,26 +43,36 @@ object ShardedDaemonProcessRescaleSpec extends MultiNodeConfig {
case object Stop extends Command

def apply(id: Int): Behavior[Command] = Behaviors.setup { ctx =>
ctx.log.info("Started [{}]", id)
val snitchRouter = ctx.spawn(Routers.group(SnitchServiceKey), "router")
snitchRouter ! ProcessActorEvent(id, "Started")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not reliable, sometimes dead letters. I have to find a better way to collect these events.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would it drop messages?


Behaviors.receiveMessagePartial {
case Stop =>
ctx.log.info("Stopped [{}]", id)
snitchRouter ! ProcessActorEvent(id, "Stopped")
Behaviors.stopped
}
}
}

commonConfig(ConfigFactory.parseString("""
commonConfig(
ConfigFactory.parseString("""
akka.loglevel = DEBUG
akka.cluster.sharded-daemon-process {
sharding {
# First is likely to be ignored as shard coordinator not ready
retry-interval = 0.2s
}
# quick ping to make test swift
keep-alive-interval = 1s
# quick ping to make test swift, stress the start/stop guarantees during rescaling
keep-alive-interval = 20 ms
# disable throttle to stress the start/stop guarantees during rescaling
keep-alive-throttle-interval = 0 s
}
akka.cluster.sharding {
distributed-data.majority-min-cap = 4
coordinator-state.write-majority-plus = 0
coordinator-state.read-majority-plus = 0
}
""").withFallback(MultiNodeClusterSpec.clusterConfig))

Expand All @@ -67,6 +81,10 @@ object ShardedDaemonProcessRescaleSpec extends MultiNodeConfig {
class ShardedDaemonProcessRescaleMultiJvmNode1 extends ShardedDaemonProcessRescaleSpec
class ShardedDaemonProcessRescaleMultiJvmNode2 extends ShardedDaemonProcessRescaleSpec
class ShardedDaemonProcessRescaleMultiJvmNode3 extends ShardedDaemonProcessRescaleSpec
class ShardedDaemonProcessRescaleMultiJvmNode4 extends ShardedDaemonProcessRescaleSpec
class ShardedDaemonProcessRescaleMultiJvmNode5 extends ShardedDaemonProcessRescaleSpec
class ShardedDaemonProcessRescaleMultiJvmNode6 extends ShardedDaemonProcessRescaleSpec
class ShardedDaemonProcessRescaleMultiJvmNode7 extends ShardedDaemonProcessRescaleSpec

abstract class ShardedDaemonProcessRescaleSpec
extends MultiNodeSpec(ShardedDaemonProcessRescaleSpec)
Expand All @@ -78,9 +96,18 @@ abstract class ShardedDaemonProcessRescaleSpec
val topicProbe: TestProbe[AnyRef] = TestProbe[AnyRef]()
private var sdp: ActorRef[ShardedDaemonProcessCommand] = _

private def assertNumberOfProcesses(n: Int, revision: Int): Unit = {
val probe = TestProbe[NumberOfProcesses]()
sdp ! GetNumberOfProcesses(probe.ref)
val reply = probe.receiveMessage()
reply.numberOfProcesses should ===(n)
reply.revision should ===(revision)
reply.rescaleInProgress === (false)
}

"Cluster sharding in multi dc cluster" must {
"form cluster" in {
formCluster(first, second, third)
formCluster(first, second, third, fourth, fifth, sixth, seventh)
runOn(first) {
typedSystem.receptionist ! Receptionist.Register(SnitchServiceKey, topicProbe.ref, topicProbe.ref)
topicProbe.expectMessageType[Receptionist.Registered]
Expand Down Expand Up @@ -109,6 +136,11 @@ abstract class ShardedDaemonProcessRescaleSpec
event.id
}.toSet
startedIds.size should ===(4)
topicProbe.expectNoMessage()
}
enterBarrier("sharded-daemon-process-started-acked")
runOn(third) {
assertNumberOfProcesses(n = 4, revision = 0)
}
enterBarrier("sharded-daemon-process-started")
}
Expand All @@ -118,6 +150,16 @@ abstract class ShardedDaemonProcessRescaleSpec
val probe = TestProbe[AnyRef]()
sdp ! ChangeNumberOfProcesses(8, probe.ref)
probe.expectMessage(30.seconds, StatusReply.Ack)
// FIXME snitch router is dropping messages
// val events = topicProbe.receiveMessages(4 + 8, 10.seconds).map(_.asInstanceOf[ProcessActorEvent])
// events.collect { case evt if evt.event == "Stopped" => evt.id }.toSet.size should ===(4)
// events.collect { case evt if evt.event == "Started" => evt.id }.toSet.size should ===(8)
// topicProbe.expectNoMessage()
}

enterBarrier("sharded-daemon-process-rescaled-to-8-acked")
runOn(third) {
assertNumberOfProcesses(n = 8, revision = 1)
}
enterBarrier("sharded-daemon-process-rescaled-to-8")
}
Expand All @@ -128,17 +170,23 @@ abstract class ShardedDaemonProcessRescaleSpec
sdp ! ChangeNumberOfProcesses(2, probe.ref)
probe.expectMessage(30.seconds, StatusReply.Ack)
}
enterBarrier("sharded-daemon-process-rescaled-to-2-acked")
runOn(first) {
// FIXME snitch router is dropping messages
// val events = topicProbe.receiveMessages(8 + 2, 10.seconds).map(_.asInstanceOf[ProcessActorEvent])
// events.collect { case evt if evt.event == "Stopped" => evt.id }.toSet.size should ===(8)
// events.collect { case evt if evt.event == "Started" => evt.id }.toSet.size should ===(2)
// topicProbe.expectNoMessage()
}
runOn(third) {
assertNumberOfProcesses(n = 2, revision = 2)
}
enterBarrier("sharded-daemon-process-rescaled-to-2")
}

"query the state" in {
runOn(third) {
val probe = TestProbe[NumberOfProcesses]()
sdp ! GetNumberOfProcesses(probe.ref)
val reply = probe.receiveMessage()
reply.numberOfProcesses should ===(2)
reply.revision should ===(2)
reply.rescaleInProgress === (false)
assertNumberOfProcesses(n = 2, revision = 2)
}
enterBarrier("sharded-daemon-process-query")
}
Expand Down