Skip to content

Commit

Permalink
Re-enable ASG scaling before final stabilisation
Browse files Browse the repository at this point in the history
This aims to address, to some extent, issue #1342 -
the problem that *apps can not auto-scale* until an autoscaling deploy has
successfully completed. On 22nd May 2024, this inability to auto-scale led
to a severe outage in the Ophan Tracker.

Ever since #83 in
April 2013, Riff Raff has disabled ASG scaling alarms at the start of a deploy
(`SuspendAlarmNotifications`), and only re-enabled them at the end of the deploy,
(`ResumeAlarmNotifications`) once deployment has successfully completed.

In December 2016, with #403, an
additional `WaitForStabilization` was added as a penultimate deploy step,
with the aim of ensuring that the cull of old instances has _completed_
before the deploy ends. However, the `WaitForStabilization` step was added _before_
`ResumeAlarmNotifications`, rather than _after_, and if the ASG instances are
already overloaded and recycling, the ASG will _never_ stabilise, because it _needs
to scale up_ to handle the load it's experiencing.

In this change, we introduce a new task, `WaitForCullToComplete`, that can establish
whether the cull has completed or not, regardless of whether the ASG is scaling -
it simply checks that there are no remaining instances tagged for termination.
Consequently, once we've executed `CullInstancesWithTerminationTag` to _request_ old
instances terminate, we can immediately allow scaling with `ResumeAlarmNotifications`,
and then `WaitForCullToComplete` _afterwards_.

With this change in place, the Ophan outage would have been shortened from
1 hour to ~2 minutes, a much better outcome!

Common code between `CullInstancesWithTerminationTag` and `WaitForCullToComplete` has
been factored out into a new `CullSummary` class.
  • Loading branch information
rtyley committed Jun 4, 2024
1 parent c001861 commit 0f75e11
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,17 @@ object AutoScaling extends DeploymentType with BucketParameters {
target.region,
terminationGrace(pkg, target, reporter)
),
WaitForStabilization(
ResumeAlarmNotifications(autoScalingGroup, target.region),
WaitForCullToComplete(
autoScalingGroup,
secondsToWait(pkg, target, reporter),
target.region
),
ResumeAlarmNotifications(autoScalingGroup, target.region)
WaitForStabilization(
autoScalingGroup,
secondsToWait(pkg, target, reporter),
target.region
)
)
}
val groupsToUpdate: List[AutoScalingGroupInfo] =
Expand Down
202 changes: 101 additions & 101 deletions magenta-lib/src/main/scala/magenta/tasks/ASGTasks.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package magenta.tasks

import magenta.deployment_type.AutoScalingGroupInfo
import magenta.tasks.EC2.withEc2Client
import magenta.tasks.autoscaling.CullSummary
import magenta.{KeyRing, _}
import software.amazon.awssdk.awscore.exception.AwsServiceException
import software.amazon.awssdk.services.autoscaling.AutoScalingClient
import software.amazon.awssdk.services.autoscaling.model.{
AutoScalingGroup,
Instance,
LifecycleState,
SetInstanceProtectionRequest
}
import software.amazon.awssdk.services.ec2.Ec2Client

import java.time.Duration
import scala.jdk.CollectionConverters._
Expand All @@ -19,9 +23,8 @@ case class CheckGroupSize(info: AutoScalingGroupInfo, region: Region)(implicit
override def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit = {
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit = {
val doubleCapacity = asg.desiredCapacity * 2
resources.reporter.verbose(
s"ASG desired = ${asg.desiredCapacity}; ASG max = ${asg.maxSize}; Target = $doubleCapacity"
Expand All @@ -45,11 +48,10 @@ case class TagCurrentInstancesWithTerminationTag(
override def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit = {
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit = {
if (asg.instances.asScala.nonEmpty) {
EC2.withEc2Client(keyRing, region, resources) { ec2Client =>
withEc2Client(keyRing, region, resources) { ec2Client =>
resources.reporter.verbose(
s"Tagging ${asg.instances.asScala.toList.map(_.instanceId).mkString(", ")}"
)
Expand All @@ -75,9 +77,8 @@ case class ProtectCurrentInstances(info: AutoScalingGroupInfo, region: Region)(
override def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit = {
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit = {
val instances = asg.instances.asScala.toList
val instancesInService =
instances.filter(_.lifecycleState == LifecycleState.IN_SERVICE)
Expand Down Expand Up @@ -106,9 +107,8 @@ case class DoubleSize(info: AutoScalingGroupInfo, region: Region)(implicit
override def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit = {
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit = {
val targetCapacity = asg.desiredCapacity * 2
resources.reporter.verbose(s"Doubling capacity to $targetCapacity")
ASG.desiredCapacity(asg.autoScalingGroupName, targetCapacity, asgClient)
Expand All @@ -127,9 +127,8 @@ sealed abstract class Pause(duration: Duration)(implicit val keyRing: KeyRing)
def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit = {
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit = {
if (asg.desiredCapacity == 0 && asg.instances.isEmpty)
resources.reporter.verbose(
"Skipping pause as there are no instances and desired capacity is zero"
Expand Down Expand Up @@ -177,9 +176,8 @@ case class WaitForStabilization(
override def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit = {
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit = {
ELB.withClient(keyRing, region, resources) { elbClient =>
check(resources.reporter, stopFlag) {
try {
Expand All @@ -190,10 +188,9 @@ case class WaitForStabilization(
case Right(()) => true
}
} catch {
case e: AwsServiceException if isRateExceeded(e) => {
case e: AwsServiceException if isRateExceeded(e) =>
resources.reporter.info(e.getMessage)
false
}
}
}
}
Expand All @@ -215,74 +212,14 @@ case class CullInstancesWithTerminationTag(
override def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit = {
EC2.withEc2Client(keyRing, region, resources) { ec2Client =>
ELB.withClient(keyRing, region, resources) { elbClient =>
val allInstances = asg.instances.asScala
resources.reporter.verbose(
s"Found the following instances: ${allInstances.map(_.instanceId).mkString(", ")}"
)
val instancesToKill = allInstances
.filter(instance => {
if (
instance.lifecycleState == LifecycleState.UNKNOWN_TO_SDK_VERSION
) {
logger.warn(
s"Instance lifecycle state ${instance.lifecycleStateAsString} isn't recognised in the AWS SDK. Is there a later version of the AWS SDK available?"
)
}

// See https://docs.aws.amazon.com/autoscaling/ec2/userguide/lifecycle-hooks.html#lifecycle-hooks-overview
val terminatingStates = List(
LifecycleState.TERMINATING,
LifecycleState.TERMINATING_WAIT,
LifecycleState.TERMINATING_PROCEED,
LifecycleState.TERMINATED
).map(_.toString)

val isAlreadyTerminating =
terminatingStates.contains(instance.lifecycleStateAsString)
val isTaggedForTermination =
EC2.hasTag(instance, "Magenta", "Terminate", ec2Client)

isTaggedForTermination && !isAlreadyTerminating
})

val instancesToRetain = allInstances.diff(instancesToKill).toList
resources.reporter.verbose(
s"Decided to keep the following instances: ${instancesToRetain.map(_.instanceId).mkString(", ")}"
)

if (instancesToRetain.size != instancesToKill.size) {
resources.reporter.warning(
s"Terminating ${instancesToKill.size} instances and retaining ${instancesToRetain.size} instances"
)
logger.warn(
s"Unusual number of instances terminated as part of autoscaling deployment"
)
instancesToRetain.foreach(instanceToRetain => {
val tags = EC2
.allTags(instanceToRetain, ec2Client)
.toList
.map(tag => s"${tag.key}:${tag.value}")
.mkString(", ")
resources.reporter.verbose(
s"Will not terminate $instanceToRetain. State: ${instanceToRetain.lifecycleStateAsString}. Tags: $tags"
)
})
}

val orderedInstancesToKill =
instancesToKill.toSeq.transposeBy(_.availabilityZone)
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit = {
withEc2Client(keyRing, region, resources) { implicit ec2Client =>
val instancesToKill =
prepareOrderedInstancesToKill(asg, resources.reporter)
ELB.withClient(keyRing, region, resources) { implicit elbClient =>
try {
resources.reporter.verbose(
s"Culling instances: ${orderedInstancesToKill.map(_.instanceId).mkString(", ")}"
)
orderedInstancesToKill.foreach(instance =>
ASG.cull(asg, instance, asgClient, elbClient)
)
instancesToKill.foreach(instance => ASG.cull(asg, instance))
} catch {
case e: AwsServiceException if desiredSizeReset(e) =>
resources.reporter.warning(
Expand All @@ -302,8 +239,75 @@ case class CullInstancesWithTerminationTag(
.contains("ValidationError")
}

private def prepareOrderedInstancesToKill(
asg: AutoScalingGroup,
reporter: DeployReporter
)(implicit ec2Client: Ec2Client): Seq[Instance] = {
val cullSummary = CullSummary.forAsg(asg, reporter)
reportOnRetainedInstances(reporter, cullSummary)
val orderedInstancesToKill =
cullSummary.instancesToKill.toSeq.transposeBy(_.availabilityZone)
reporter.verbose(
s"Culling instances: ${orderedInstancesToKill.map(_.instanceId).mkString(", ")}"
)
orderedInstancesToKill
}

private def reportOnRetainedInstances(
reporter: DeployReporter,
cullSummary: CullSummary
)(implicit ec2Client: Ec2Client): Unit = {
val instancesToKill = cullSummary.instancesToKill
val instancesToRetain =
cullSummary.allInstances.diff(instancesToKill).toList
reporter.verbose(
s"Decided to keep the following instances: ${instancesToRetain.map(_.instanceId).mkString(", ")}"
)

if (instancesToRetain.size != instancesToKill.size) {
reporter.warning(
s"Terminating ${instancesToKill.size} instances and retaining ${instancesToRetain.size} instances"
)
logger.warn(
s"Unusual number of instances terminated as part of autoscaling deployment"
)
instancesToRetain.foreach(instanceToRetain => {
val tags = EC2
.allTags(instanceToRetain, ec2Client)
.map(tag => s"${tag.key}:${tag.value}")
.mkString(", ")
reporter.verbose(
s"Will not terminate $instanceToRetain. State: ${instanceToRetain.lifecycleStateAsString}. Tags: $tags"
)
})
}
}

lazy val description =
s"Terminate instances in $asgName with the termination tag for this deploy"
s"Request termination for instances in $asgName with the termination tag for this deploy"
}

case class WaitForCullToComplete(
info: AutoScalingGroupInfo,
duration: Duration,
region: Region
)(implicit val keyRing: KeyRing)
extends ASGTask
with SlowRepeatedPollingCheck {

override def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit =
withEc2Client(keyRing, region, resources) { implicit ec2Client =>
check(resources.reporter, stopFlag) {
CullSummary.forAsg(asg, resources.reporter).isCullComplete
}
}

lazy val description: String =
s"Check that all instances tagged for termination in $asgName have been terminated"
}

case class SuspendAlarmNotifications(
Expand All @@ -315,9 +319,8 @@ case class SuspendAlarmNotifications(
override def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit = {
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit = {
ASG.suspendAlarmNotifications(asg.autoScalingGroupName, asgClient)
}

Expand All @@ -332,11 +335,9 @@ case class ResumeAlarmNotifications(info: AutoScalingGroupInfo, region: Region)(
override def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit = {
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit =
ASG.resumeAlarmNotifications(asg.autoScalingGroupName, asgClient)
}

lazy val description =
s"Resuming Alarm Notifications - $asgName will scale on any configured alarms"
Expand All @@ -353,15 +354,14 @@ trait ASGTask extends Task {
def execute(
asg: AutoScalingGroup,
resources: DeploymentResources,
stopFlag: => Boolean,
asgClient: AutoScalingClient
): Unit
stopFlag: => Boolean
)(implicit asgClient: AutoScalingClient): Unit

override def execute(
resources: DeploymentResources,
stopFlag: => Boolean
): Unit = {
ASG.withAsgClient(keyRing, region, resources) { asgClient =>
ASG.withAsgClient(keyRing, region, resources) { implicit asgClient =>
resources.reporter.verbose(
s"Looked up group matching tags ${info.tagRequirements}; identified ${info.asg.autoScalingGroupARN}"
)
Expand All @@ -370,7 +370,7 @@ trait ASGTask extends Task {
// For example, we need to make sure that we have the latest desired capacity when doubling the size of the ASG.
val latestAsgState =
ASG.getGroupByName(asgName, asgClient, resources.reporter)
execute(latestAsgState, resources, stopFlag, asgClient)
execute(latestAsgState, resources, stopFlag)
}
}
}
7 changes: 4 additions & 3 deletions magenta-lib/src/main/scala/magenta/tasks/AWS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,10 @@ object ASG {

def cull(
asg: AutoScalingGroup,
instance: ASGInstance,
asgClient: AutoScalingClient,
elbClient: ELB.Client
instance: ASGInstance
)(implicit
elbClient: ELB.Client,
asgClient: AutoScalingClient
): TerminateInstanceInAutoScalingGroupResponse = {
ELB.deregister(elbNames(asg), elbTargetArns(asg), instance, elbClient)

Expand Down
Loading

0 comments on commit 0f75e11

Please sign in to comment.