Skip to content

Commit

Permalink
throttling + internal refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
lucagrulla committed Jul 29, 2017
1 parent 6d738bc commit f84df72
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 38 deletions.
74 changes: 38 additions & 36 deletions cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func params(logGroupName string, streamNames []*string, epochStartTime int64, ep
return params
}

func Tail(logGroupName *string, logStreamName *string, follow *bool, startTime *time.Time, endTime *time.Time, grep *string, printTimestamp *bool, printStreamName *bool) <-chan *string {
func Tail(logGroupName *string, logStreamName *string, follow *bool, startTime *time.Time, endTime *time.Time, grep *string, printTimestamp *bool, printStreamName *bool, printEventId *bool) <-chan *string {
cwl := cwClient()

startTimeEpoch := timeutil.ParseTime(startTime.Format(timeutil.TimeFormat)).Unix()
Expand All @@ -59,36 +59,35 @@ func Tail(logGroupName *string, logStreamName *string, follow *bool, startTime *

ch := make(chan *string)
pageHandler := func(res *cloudwatchlogs.FilterLogEventsOutput, lastPage bool) bool {
if len(res.Events) == 0 {
time.Sleep(2 * time.Second)
} else {
for _, event := range res.Events {
eventTimestamp := *event.Timestamp / 1000
if eventTimestamp != lastSeenTimestamp {
ids = nil
lastSeenTimestamp = eventTimestamp
} else {
sort.Strings(ids)
}
idx := sort.SearchStrings(ids, *event.EventId)
if ids == nil || (idx == len(ids) || ids[idx] != *event.EventId) {
d := timeutil.FormatTimestamp(eventTimestamp)
var msg string
if *printTimestamp {
msg = fmt.Sprintf("%s - ", color.GreenString(d))
}
if *printStreamName {
msg = fmt.Sprintf("%s%s - ", msg, color.BlueString(*event.LogStreamName))
}
msg = fmt.Sprintf("%s%s", msg, *event.Message)
ch <- &msg
for _, event := range res.Events {
eventTimestamp := *event.Timestamp / 1000
if eventTimestamp != lastSeenTimestamp {
ids = nil
lastSeenTimestamp = eventTimestamp
} else {
sort.Strings(ids)
}
idx := sort.SearchStrings(ids, *event.EventId)
if ids == nil || (idx == len(ids) || ids[idx] != *event.EventId) {

msg := fmt.Sprintf("%s", *event.Message)
if *printEventId {
msg = fmt.Sprintf("%s - %s", color.YellowString(*event.EventId), msg)
}
ids = append(ids, *event.EventId)
if *printStreamName {
msg = fmt.Sprintf("%s - %s", color.BlueString(*event.LogStreamName), msg)
}
if *printTimestamp {
msg = fmt.Sprintf("%s - %s", color.GreenString(timeutil.FormatTimestamp(eventTimestamp)), msg)
}

ch <- &msg

}
ids = append(ids, *event.EventId)
}

if lastPage && !*follow {
time.Sleep(1 * time.Second)
close(ch)
}
return !lastPage
Expand All @@ -103,18 +102,21 @@ func Tail(logGroupName *string, logStreamName *string, follow *bool, startTime *
os.Exit(1)
}
}
go func() {
for *follow || lastSeenTimestamp == startTimeEpoch {
logParam := params(*logGroupName, streams, lastSeenTimestamp, endTimeEpoch, grep, follow)
error := cwl.FilterLogEventsPages(logParam, pageHandler)
if error != nil {
if awsErr, ok := error.(awserr.Error); ok {
fmt.Println(awsErr.Message())
os.Exit(1)
if *follow || lastSeenTimestamp == startTimeEpoch {
ticker := time.NewTicker(time.Millisecond * 50)
go func() {
for range ticker.C {
logParam := params(*logGroupName, streams, lastSeenTimestamp, endTimeEpoch, grep, follow)
error := cwl.FilterLogEventsPages(logParam, pageHandler)
if error != nil {
if awsErr, ok := error.(awserr.Error); ok {
fmt.Println(awsErr.Message())
os.Exit(1)
}
}
}
}
}()
}()
}
return ch
}

Expand Down
5 changes: 3 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var (

follow = tailCommand.Flag("follow", "Don't stop when the end of stream is reached, but rather wait for additional data to be appended.").Short('f').Default("false").Bool()
printTimestamp = tailCommand.Flag("timestamp", "Print the event timestamp.").Short('t').Default("false").Bool()
printEventId = tailCommand.Flag("event Id", "Print the event Id").Short('i').Default("false").Bool()
printStreamName = tailCommand.Flag("stream name", "Print the log stream name this event belongs to.").Short('s').Default("false").Bool()
grep = tailCommand.Flag("grep", "Pattern to filter logs by. See http://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html for syntax.").Short('g').Default("").String()
logGroupName = tailCommand.Arg("group", "The log group name.").Required().HintAction(groupsCompletion).String()
Expand Down Expand Up @@ -76,7 +77,7 @@ func timestampToUTC(timeStamp *string) time.Time {
}

func main() {
kingpin.Version("1.2.0")
kingpin.Version("1.3.0")
command := kingpin.Parse()

switch command {
Expand All @@ -95,7 +96,7 @@ func main() {
et = timestampToUTC(endTime)
}

for msg := range cloudwatch.Tail(logGroupName, logStreamName, follow, &st, &et, grep, printTimestamp, printStreamName) {
for msg := range cloudwatch.Tail(logGroupName, logStreamName, follow, &st, &et, grep, printTimestamp, printStreamName, printEventId) {
fmt.Println(*msg)
}
}
Expand Down

0 comments on commit f84df72

Please sign in to comment.