Skip to content

Commit

Permalink
Thread context through some slower AWS tasks
Browse files Browse the repository at this point in the history
There are still too many context.TODOs here for this to join all the
way up, but we should be able to better understand the slowest tasks.
  • Loading branch information
justinsb committed Oct 21, 2023
1 parent dfc4717 commit a66061d
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 53 deletions.
37 changes: 21 additions & 16 deletions upup/pkg/fi/cloudup/awstasks/autoscalinggroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package awstasks

import (
"context"
"fmt"
"reflect"
"sort"
Expand Down Expand Up @@ -137,9 +138,11 @@ func (e *AutoscalingGroup) GetDependencies(tasks map[string]fi.CloudupTask) []fi

// Find is used to discover the ASG in the cloud provider
func (e *AutoscalingGroup) Find(c *fi.CloudupContext) (*AutoscalingGroup, error) {
ctx := c.Context()

cloud := c.T.Cloud.(awsup.AWSCloud)

g, err := findAutoscalingGroup(cloud, fi.ValueOf(e.Name))
g, err := findAutoscalingGroup(ctx, cloud, fi.ValueOf(e.Name))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -305,13 +308,13 @@ func (e *AutoscalingGroup) Find(c *fi.CloudupContext) (*AutoscalingGroup, error)
}

// findAutoscalingGroup is responsible for finding all the autoscaling groups for us
func findAutoscalingGroup(cloud awsup.AWSCloud, name string) (*autoscaling.Group, error) {
func findAutoscalingGroup(ctx context.Context, cloud awsup.AWSCloud, name string) (*autoscaling.Group, error) {
request := &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: []*string{&name},
}

var found []*autoscaling.Group
err := cloud.Autoscaling().DescribeAutoScalingGroupsPages(request, func(p *autoscaling.DescribeAutoScalingGroupsOutput, lastPage bool) (shouldContinue bool) {
err := cloud.Autoscaling().DescribeAutoScalingGroupsPagesWithContext(ctx, request, func(p *autoscaling.DescribeAutoScalingGroupsOutput, lastPage bool) (shouldContinue bool) {
for _, g := range p.AutoScalingGroups {
// Check for "Delete in progress" (the only use .Status). We won't be able to update or create while
// this is true, but filtering it out here makes the messages slightly clearer.
Expand Down Expand Up @@ -368,6 +371,8 @@ func (e *AutoscalingGroup) CheckChanges(a, ex, changes *AutoscalingGroup) error

// RenderAWS is responsible for building the autoscaling group via AWS API
func (v *AutoscalingGroup) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *AutoscalingGroup) error {
ctx := context.TODO()

// @step: did we find an autoscaling group?
if a == nil {
klog.V(2).Infof("Creating autoscaling group with name: %s", fi.ValueOf(e.Name))
Expand Down Expand Up @@ -445,7 +450,7 @@ func (v *AutoscalingGroup) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Autos
}

// @step: attempt to create the autoscaling group for us
if _, err := t.Cloud.Autoscaling().CreateAutoScalingGroup(request); err != nil {
if _, err := t.Cloud.Autoscaling().CreateAutoScalingGroupWithContext(ctx, request); err != nil {
code := awsup.AWSErrorCode(err)
message := awsup.AWSErrorMessage(err)
if code == "ValidationError" && strings.Contains(message, "Invalid IAM Instance Profile name") {
Expand All @@ -456,7 +461,7 @@ func (v *AutoscalingGroup) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Autos
}

// @step: attempt to enable the metrics for us
if _, err := t.Cloud.Autoscaling().EnableMetricsCollection(&autoscaling.EnableMetricsCollectionInput{
if _, err := t.Cloud.Autoscaling().EnableMetricsCollectionWithContext(ctx, &autoscaling.EnableMetricsCollectionInput{
AutoScalingGroupName: e.Name,
Granularity: e.Granularity,
Metrics: aws.StringSlice(e.Metrics),
Expand All @@ -474,7 +479,7 @@ func (v *AutoscalingGroup) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Autos
processQuery.AutoScalingGroupName = e.Name
processQuery.ScalingProcesses = toSuspend

if _, err := t.Cloud.Autoscaling().SuspendProcesses(processQuery); err != nil {
if _, err := t.Cloud.Autoscaling().SuspendProcessesWithContext(ctx, processQuery); err != nil {
return fmt.Errorf("error suspending processes: %v", err)
}
}
Expand Down Expand Up @@ -632,7 +637,7 @@ func (v *AutoscalingGroup) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Autos
if changes.Metrics != nil || changes.Granularity != nil {
// TODO: Support disabling metrics?
if len(e.Metrics) != 0 {
_, err := t.Cloud.Autoscaling().EnableMetricsCollection(&autoscaling.EnableMetricsCollectionInput{
_, err := t.Cloud.Autoscaling().EnableMetricsCollectionWithContext(ctx, &autoscaling.EnableMetricsCollectionInput{
AutoScalingGroupName: e.Name,
Granularity: e.Granularity,
Metrics: aws.StringSlice(e.Metrics),
Expand All @@ -654,7 +659,7 @@ func (v *AutoscalingGroup) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Autos
suspendProcessQuery.AutoScalingGroupName = e.Name
suspendProcessQuery.ScalingProcesses = toSuspend

_, err := t.Cloud.Autoscaling().SuspendProcesses(suspendProcessQuery)
_, err := t.Cloud.Autoscaling().SuspendProcessesWithContext(ctx, suspendProcessQuery)
if err != nil {
return fmt.Errorf("error suspending processes: %v", err)
}
Expand All @@ -664,7 +669,7 @@ func (v *AutoscalingGroup) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Autos
resumeProcessQuery.AutoScalingGroupName = e.Name
resumeProcessQuery.ScalingProcesses = toResume

_, err := t.Cloud.Autoscaling().ResumeProcesses(resumeProcessQuery)
_, err := t.Cloud.Autoscaling().ResumeProcessesWithContext(ctx, resumeProcessQuery)
if err != nil {
return fmt.Errorf("error resuming processes: %v", err)
}
Expand All @@ -689,41 +694,41 @@ func (v *AutoscalingGroup) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *Autos

klog.V(2).Infof("Updating autoscaling group %s", fi.ValueOf(e.Name))

if _, err := t.Cloud.Autoscaling().UpdateAutoScalingGroup(request); err != nil {
if _, err := t.Cloud.Autoscaling().UpdateAutoScalingGroupWithContext(ctx, request); err != nil {
return fmt.Errorf("error updating AutoscalingGroup: %v", err)
}

if deleteTagsRequest != nil && len(deleteTagsRequest.Tags) > 0 {
if _, err := t.Cloud.Autoscaling().DeleteTags(deleteTagsRequest); err != nil {
if _, err := t.Cloud.Autoscaling().DeleteTagsWithContext(ctx, deleteTagsRequest); err != nil {
return fmt.Errorf("error deleting old AutoscalingGroup tags: %v", err)
}
}
if updateTagsRequest != nil {
if _, err := t.Cloud.Autoscaling().CreateOrUpdateTags(updateTagsRequest); err != nil {
if _, err := t.Cloud.Autoscaling().CreateOrUpdateTagsWithContext(ctx, updateTagsRequest); err != nil {
return fmt.Errorf("error updating AutoscalingGroup tags: %v", err)
}
}

if detachLBRequest != nil {
if _, err := t.Cloud.Autoscaling().DetachLoadBalancers(detachLBRequest); err != nil {
if _, err := t.Cloud.Autoscaling().DetachLoadBalancersWithContext(ctx, detachLBRequest); err != nil {
return fmt.Errorf("error detatching LoadBalancers: %v", err)
}
}
if attachLBRequest != nil {
if _, err := t.Cloud.Autoscaling().AttachLoadBalancers(attachLBRequest); err != nil {
if _, err := t.Cloud.Autoscaling().AttachLoadBalancersWithContext(ctx, attachLBRequest); err != nil {
return fmt.Errorf("error attaching LoadBalancers: %v", err)
}
}
if len(detachTGRequests) > 0 {
for _, detachTGRequest := range detachTGRequests {
if _, err := t.Cloud.Autoscaling().DetachLoadBalancerTargetGroups(detachTGRequest); err != nil {
if _, err := t.Cloud.Autoscaling().DetachLoadBalancerTargetGroupsWithContext(ctx, detachTGRequest); err != nil {
return fmt.Errorf("failed to detach target groups: %v", err)
}
}
}
if len(attachTGRequests) > 0 {
for _, attachTGRequest := range attachTGRequests {
if _, err := t.Cloud.Autoscaling().AttachLoadBalancerTargetGroups(attachTGRequest); err != nil {
if _, err := t.Cloud.Autoscaling().AttachLoadBalancerTargetGroupsWithContext(ctx, attachTGRequest); err != nil {
return fmt.Errorf("failed to attach target groups: %v", err)
}
}
Expand Down
10 changes: 7 additions & 3 deletions upup/pkg/fi/cloudup/awstasks/autoscalinglifecyclehook.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package awstasks

import (
"context"
"fmt"

"github.com/aws/aws-sdk-go/service/autoscaling"
Expand Down Expand Up @@ -53,14 +54,15 @@ func (h *AutoscalingLifecycleHook) CompareWithID() *string {
}

func (h *AutoscalingLifecycleHook) Find(c *fi.CloudupContext) (*AutoscalingLifecycleHook, error) {
ctx := c.Context()
cloud := c.T.Cloud.(awsup.AWSCloud)

request := &autoscaling.DescribeLifecycleHooksInput{
AutoScalingGroupName: h.AutoscalingGroup.Name,
LifecycleHookNames: []*string{h.GetHookName()},
}

response, err := cloud.Autoscaling().DescribeLifecycleHooks(request)
response, err := cloud.Autoscaling().DescribeLifecycleHooksWithContext(ctx, request)
if err != nil {
return nil, fmt.Errorf("error listing ASG Lifecycle Hooks: %v", err)
}
Expand Down Expand Up @@ -109,6 +111,8 @@ func (_ *AutoscalingLifecycleHook) CheckChanges(a, e, changes *AutoscalingLifecy
}

func (*AutoscalingLifecycleHook) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *AutoscalingLifecycleHook) error {
ctx := context.TODO()

if changes != nil {
if fi.ValueOf(e.Enabled) {
request := &autoscaling.PutLifecycleHookInput{
Expand All @@ -118,7 +122,7 @@ func (*AutoscalingLifecycleHook) RenderAWS(t *awsup.AWSAPITarget, a, e, changes
LifecycleHookName: e.GetHookName(),
LifecycleTransition: e.LifecycleTransition,
}
_, err := t.Cloud.Autoscaling().PutLifecycleHook(request)
_, err := t.Cloud.Autoscaling().PutLifecycleHookWithContext(ctx, request)
if err != nil {
return err
}
Expand All @@ -127,7 +131,7 @@ func (*AutoscalingLifecycleHook) RenderAWS(t *awsup.AWSAPITarget, a, e, changes
AutoScalingGroupName: e.AutoscalingGroup.Name,
LifecycleHookName: e.GetHookName(),
}
_, err := t.Cloud.Autoscaling().DeleteLifecycleHook(request)
_, err := t.Cloud.Autoscaling().DeleteLifecycleHookWithContext(ctx, request)
if err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions upup/pkg/fi/cloudup/awstasks/launchtemplate_target_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ func (t *LaunchTemplate) Find(c *fi.CloudupContext) (*LaunchTemplate, error) {

// findAllLaunchTemplates returns all the launch templates for us
func (t *LaunchTemplate) findAllLaunchTemplates(c *fi.CloudupContext) ([]*ec2.LaunchTemplate, error) {
ctx := c.Context()

cloud, ok := c.T.Cloud.(awsup.AWSCloud)
if !ok {
return nil, fmt.Errorf("invalid cloud provider: %v, expected: %s", c.T.Cloud, "awsup.AWSCloud")
Expand All @@ -357,7 +359,7 @@ func (t *LaunchTemplate) findAllLaunchTemplates(c *fi.CloudupContext) ([]*ec2.La
}

var list []*ec2.LaunchTemplate
err := cloud.EC2().DescribeLaunchTemplatesPages(input, func(p *ec2.DescribeLaunchTemplatesOutput, lastPage bool) (shouldContinue bool) {
err := cloud.EC2().DescribeLaunchTemplatesPagesWithContext(ctx, input, func(p *ec2.DescribeLaunchTemplatesOutput, lastPage bool) (shouldContinue bool) {
list = append(list, p.LaunchTemplates...)
return true
})
Expand All @@ -370,6 +372,8 @@ func (t *LaunchTemplate) findAllLaunchTemplates(c *fi.CloudupContext) ([]*ec2.La

// findLatestLaunchTemplateVersion returns the latest template version
func (t *LaunchTemplate) findLatestLaunchTemplateVersion(c *fi.CloudupContext) (*ec2.LaunchTemplateVersion, error) {
ctx := c.Context()

cloud, ok := c.T.Cloud.(awsup.AWSCloud)
if !ok {
return nil, fmt.Errorf("invalid cloud provider: %v, expected: awsup.AWSCloud", c.T.Cloud)
Expand All @@ -380,7 +384,7 @@ func (t *LaunchTemplate) findLatestLaunchTemplateVersion(c *fi.CloudupContext) (
Versions: []*string{aws.String("$Latest")},
}

output, err := cloud.EC2().DescribeLaunchTemplateVersions(input)
output, err := cloud.EC2().DescribeLaunchTemplateVersionsWithContext(ctx, input)
if err != nil {
if awsup.AWSErrorCode(err) == "InvalidLaunchTemplateName.NotFoundException" {
klog.V(4).Infof("Got InvalidLaunchTemplateName.NotFoundException error describing latest launch template version: %q", aws.StringValue(t.Name))
Expand Down
9 changes: 6 additions & 3 deletions upup/pkg/fi/cloudup/awstasks/warmpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package awstasks

import (
"context"
"fmt"

"github.com/aws/aws-sdk-go/service/autoscaling"
Expand Down Expand Up @@ -45,9 +46,10 @@ type WarmPool struct {

// Find is used to discover the ASG in the cloud provider.
func (e *WarmPool) Find(c *fi.CloudupContext) (*WarmPool, error) {
ctx := c.Context()
cloud := c.T.Cloud.(awsup.AWSCloud)
svc := cloud.Autoscaling()
warmPool, err := svc.DescribeWarmPool(&autoscaling.DescribeWarmPoolInput{
warmPool, err := svc.DescribeWarmPoolWithContext(ctx, &autoscaling.DescribeWarmPoolInput{
AutoScalingGroupName: e.AutoscalingGroup.Name,
})
if err != nil {
Expand Down Expand Up @@ -85,6 +87,7 @@ func (*WarmPool) CheckChanges(a, e, changes *WarmPool) error {
}

func (*WarmPool) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *WarmPool) error {
ctx := context.TODO()
svc := t.Cloud.Autoscaling()
if changes != nil {
if fi.ValueOf(e.Enabled) {
Expand All @@ -99,15 +102,15 @@ func (*WarmPool) RenderAWS(t *awsup.AWSAPITarget, a, e, changes *WarmPool) error
MinSize: fi.PtrTo(minSize),
}

_, err := svc.PutWarmPool(request)
_, err := svc.PutWarmPoolWithContext(ctx, request)
if err != nil {
if awsup.AWSErrorCode(err) == "ValidationError" {
return fi.NewTryAgainLaterError("waiting for ASG to become ready").WithError(err)
}
return fmt.Errorf("error modifying warm pool: %w", err)
}
} else if a != nil {
_, err := svc.DeleteWarmPool(&autoscaling.DeleteWarmPoolInput{
_, err := svc.DeleteWarmPoolWithContext(ctx, &autoscaling.DeleteWarmPoolInput{
AutoScalingGroupName: e.AutoscalingGroup.Name,
// We don't need to do any cleanup so, the faster the better
ForceDelete: fi.PtrTo(true),
Expand Down
Loading

0 comments on commit a66061d

Please sign in to comment.