Skip to content

Commit

Permalink
Merge pull request #58 from buildkite/cleaner-tool-handles-sns-subscr…
Browse files Browse the repository at this point in the history
…iptions

Delete SNS subscriptions in queue cleaner tool
  • Loading branch information
lox authored Dec 12, 2018
2 parents 49e590d + 674e435 commit d94da31
Showing 1 changed file with 91 additions and 2 deletions.
93 changes: 91 additions & 2 deletions tools/lifecycled-queue-cleaner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,49 @@ import (
"sync"
"sync/atomic"
"time"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sns"
)

func main() {
parallel := flag.Int("parallel", 20, "The number of parallel deletes to run")
flag.Parse()

for {
count, err := deleteInactiveSubscriptions(session.New())
if err != nil {
log.Fatal(err)
}

if count == 0 {
break
} else {
log.Printf("Deleted %d subscriptions, running again as aws limits subscriptions returned to 100", count)
time.Sleep(time.Second * 2)
}
}

for {
count, err := deleteInactiveQueues(session.New(), *parallel)
if err != nil {
log.Fatal(err)
}

if count == 0 {
log.Printf("Done!")
return
break
} else {
log.Printf("Deleted %d queues, running again as aws limits queues returned to 1000", count)
time.Sleep(time.Second * 60)
}
}

log.Printf("Done! Sorry for the inconvenience!")
}

func deleteInactiveQueues(sess *session.Session, parallel int) (uint64, error) {
Expand Down Expand Up @@ -177,3 +194,75 @@ func deleteQueue(sess *session.Session, queueUrl string) error {
})
return err
}

func topicExists(sess *session.Session, snsTopic string) (bool, error) {
_, err := sns.New(sess).GetTopicAttributes(&sns.GetTopicAttributesInput{
TopicArn: aws.String(snsTopic),
})
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == `NotFound` {
return false, nil
}
}
log.Printf("%#v", err.Error())
return false, err
}
return true, nil
}

func listInactiveSubscriptions(sess *session.Session) ([]string, error) {
var subs []string
var topics = make(map[string]bool,0)
var count int

err := sns.New(sess).ListSubscriptionsPages(&sns.ListSubscriptionsInput{},
func(page *sns.ListSubscriptionsOutput, lastPage bool) bool {
count = count + len(page.Subscriptions)
for _, s := range page.Subscriptions {
if !strings.Contains(*s.Endpoint, "lifecycled-i") {
continue
}
if exists, ok := topics[*s.TopicArn]; ok {
if !exists {
subs = append(subs, *s.SubscriptionArn)
}
continue
}
if exists, _ := topicExists(sess, *s.TopicArn); exists {
topics[*s.TopicArn] = true
} else {
topics[*s.TopicArn] = false
subs = append(subs, *s.SubscriptionArn)
}
}
return lastPage
})
if err != nil {
return nil, err
}

log.Printf("Found %d sns subscriptions in total", count)
return subs, nil
}

func deleteInactiveSubscriptions(sess *session.Session) (int, error) {
subs, err := listInactiveSubscriptions(sess)
if err != nil {
return 0, err
}
log.Printf("Found %d inactive subscriptions", len(subs))
var deleted int
for idx, s := range subs {
log.Printf("Deleting sns subscription %s (%d of %d)", s, idx+1, len(subs))
_, err := sns.New(sess).Unsubscribe(&sns.UnsubscribeInput{
SubscriptionArn: aws.String(s),
})
if err != nil {
return deleted, err
}
deleted++
}
return deleted, nil
}

0 comments on commit d94da31

Please sign in to comment.