From 7e6b35cd4ae7f6ddec6dba0dc6e7f1b6c157429e Mon Sep 17 00:00:00 2001 From: Lachlan Donald Date: Fri, 2 Nov 2018 15:47:40 +1100 Subject: [PATCH 1/2] Delete sns subscriptions in queue cleaner tool --- tools/lifecycled-queue-cleaner/main.go | 88 +++++++++++++++++++++++++- 1 file changed, 86 insertions(+), 2 deletions(-) diff --git a/tools/lifecycled-queue-cleaner/main.go b/tools/lifecycled-queue-cleaner/main.go index 21f316f..eb0f652 100644 --- a/tools/lifecycled-queue-cleaner/main.go +++ b/tools/lifecycled-queue-cleaner/main.go @@ -13,12 +13,27 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go/service/sns" ) func main() { parallel := flag.Int("parallel", 20, "The number of parallel deletes to run") flag.Parse() + for { + count, err := deleteInactiveSubscriptions(session.New()) + if err != nil { + log.Fatal(err) + } + + if count == 0 { + break + } else { + log.Printf("Deleted %d subscriptions, running again as aws limits subscriptions returned to 100", count) + time.Sleep(time.Second * 2) + } + } + for { count, err := deleteInactiveQueues(session.New(), *parallel) if err != nil { @@ -26,13 +41,14 @@ func main() { } if count == 0 { - log.Printf("Done!") - return + break } else { log.Printf("Deleted %d queues, running again as aws limits queues returned to 1000", count) time.Sleep(time.Second * 60) } } + + log.Printf("Done! Sorry for the inconvenience!") } func deleteInactiveQueues(sess *session.Session, parallel int) (uint64, error) { @@ -177,3 +193,71 @@ func deleteQueue(sess *session.Session, queueUrl string) error { }) return err } + +func topicExists(sess *session.Session, snsTopic string) (bool, error) { + _, err := sns.New(sess).GetTopicAttributes(&sns.GetTopicAttributesInput{ + TopicArn: aws.String(snsTopic), + }) + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == `NotFound` { + return false, nil + } + } + log.Printf("%#v", err.Error()) + return false, err + } + return true, nil +} + +func listInactiveSubscriptions(sess *session.Session) ([]string, error) { + var subs []string + var topics = make(map[string]bool,0) + var count int + + err := sns.New(sess).ListSubscriptionsPages(&sns.ListSubscriptionsInput{}, + func(page *sns.ListSubscriptionsOutput, lastPage bool) bool { + count = count + len(page.Subscriptions) + for _, s := range page.Subscriptions { + if exists, ok := topics[*s.TopicArn]; ok { + if !exists { + subs = append(subs, *s.SubscriptionArn) + } + continue + } + if exists, _ := topicExists(sess, *s.TopicArn); exists { + topics[*s.TopicArn] = true + } else { + topics[*s.TopicArn] = false + subs = append(subs, *s.SubscriptionArn) + } + } + return lastPage + }) + if err != nil { + return nil, err + } + + log.Printf("Found %d sns subscriptions in total", count) + return subs, nil +} + +func deleteInactiveSubscriptions(sess *session.Session) (int, error) { + subs, err := listInactiveSubscriptions(sess) + if err != nil { + return 0, err + } + var deleted int + for idx, s := range subs { + log.Printf("Deleting SNS Subscription %s (%d of %d)", s, idx+1, len(subs)) + _, err := sns.New(sess).Unsubscribe(&sns.UnsubscribeInput{ + SubscriptionArn: aws.String(s), + }) + if err != nil { + return deleted, err + } + deleted++ + } + return deleted, nil +} + From 674e435ee00a464eecfcd5331f6217e6661a3f49 Mon Sep 17 00:00:00 2001 From: Lachlan Donald Date: Fri, 2 Nov 2018 15:52:40 +1100 Subject: [PATCH 2/2] Only delete lifecycled related subscriptions --- tools/lifecycled-queue-cleaner/main.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tools/lifecycled-queue-cleaner/main.go b/tools/lifecycled-queue-cleaner/main.go index eb0f652..2bf03f1 100644 --- a/tools/lifecycled-queue-cleaner/main.go +++ b/tools/lifecycled-queue-cleaner/main.go @@ -7,6 +7,7 @@ import ( "sync" "sync/atomic" "time" + "strings" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -219,6 +220,9 @@ func listInactiveSubscriptions(sess *session.Session) ([]string, error) { func(page *sns.ListSubscriptionsOutput, lastPage bool) bool { count = count + len(page.Subscriptions) for _, s := range page.Subscriptions { + if !strings.Contains(*s.Endpoint, "lifecycled-i") { + continue + } if exists, ok := topics[*s.TopicArn]; ok { if !exists { subs = append(subs, *s.SubscriptionArn) @@ -247,9 +251,10 @@ func deleteInactiveSubscriptions(sess *session.Session) (int, error) { if err != nil { return 0, err } + log.Printf("Found %d inactive subscriptions", len(subs)) var deleted int for idx, s := range subs { - log.Printf("Deleting SNS Subscription %s (%d of %d)", s, idx+1, len(subs)) + log.Printf("Deleting sns subscription %s (%d of %d)", s, idx+1, len(subs)) _, err := sns.New(sess).Unsubscribe(&sns.UnsubscribeInput{ SubscriptionArn: aws.String(s), })