Skip to content

Commit

Permalink
add backoff logic to client
Browse files Browse the repository at this point in the history
- make clienterr.Backoff.RetryAfter a pointer
  • Loading branch information
barrettj12 committed Aug 15, 2023
1 parent 81fc9a0 commit 3e47aa7
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 30 deletions.
9 changes: 6 additions & 3 deletions internals/overlord/logstate/clienterr/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,21 @@
package clienterr

import (
"fmt"
"time"
)

// Backoff should be returned if the server indicates we are sending too many
// requests (e.g. an HTTP 429 response).
type Backoff struct {
RetryAfter time.Time
RetryAfter *time.Time
}

func (b Backoff) Error() string {
return fmt.Sprintf("too many requests, retry after %v", b.RetryAfter)
errStr := "too many requests"
if b.RetryAfter != nil {
errStr += ", retry after " + b.RetryAfter.String()
}
return errStr
}

// Timeout should be returned if the client's Flush times out.
Expand Down
57 changes: 40 additions & 17 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package logstate

import (
"context"
"errors"
"fmt"
"time"

"gopkg.in/tomb.v2"

"github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/overlord/logstate/clienterr"
"github.com/canonical/pebble/internals/overlord/logstate/loki"
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
Expand Down Expand Up @@ -72,6 +74,8 @@ type logGatherer struct {
pullers *pullerGroup
// All pullers send logs on this channel, received by main loop
entryCh chan servicelog.Entry

timer timer
}

// logGathererArgs allows overriding the newLogClient method and time values
Expand Down Expand Up @@ -104,6 +108,7 @@ func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logG
client: client,
entryCh: make(chan servicelog.Entry),
pullers: newPullerGroup(target.Name),
timer: newTimer(),
}
g.clientCtx, g.clientCancel = context.WithCancel(context.Background())
g.tomb.Go(g.loop)
Expand Down Expand Up @@ -170,43 +175,50 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R
// pullers on entryCh, and writes them to the client. It also flushes the
// client periodically, and exits when the gatherer's tomb is killed.
func (g *logGatherer) loop() error {
timer := newTimer()
defer timer.Stop()
defer g.timer.Stop()

mainLoop:
for {
select {
case <-g.tomb.Dying():
break mainLoop

case <-timer.Expired():
case <-g.timer.Expired():
// Mark timer as unset
timer.Stop()
err := g.client.Flush(g.clientCtx)
if err != nil {
logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
}
g.timer.Stop()
g.handleClientErr(g.client.Flush(g.clientCtx))

case entry := <-g.entryCh:
err := g.client.Write(g.clientCtx, entry)
if err != nil {
logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err)
}
timer.EnsureSet(g.bufferTimeout)
g.handleClientErr(g.client.Write(g.clientCtx, entry))
g.timer.EnsureSet(g.bufferTimeout)
}
}

// Final flush to send any remaining logs buffered in the client
// We need to create a new context, as the previous one may have been cancelled.
ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush)
defer cancel()
err := g.client.Flush(ctx)
if err != nil {
logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
}
g.handleClientErr(g.client.Flush(ctx))
return nil
}

func (g *logGatherer) handleClientErr(err error) {
if err == nil {
return
}

if e := (clienterr.Backoff{}); errors.As(err, &e) {
logger.Noticef("Target %q: too many requests, backing off", g.targetName)
g.timer.retryAfter = e.RetryAfter
}

if errors.As(err, &clienterr.Timeout{}) {
logger.Noticef("Timeout flushing logs for target %q", g.targetName)
}

logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
}

// Stop tears down the gatherer and associated resources (pullers, client).
// This method will block until gatherer teardown is complete.
//
Expand Down Expand Up @@ -251,6 +263,8 @@ func (g *logGatherer) Stop() {
type timer struct {
timer *time.Timer
set bool
// If non-nil, the timer won't expire until after this time
retryAfter *time.Time
}

func newTimer() timer {
Expand Down Expand Up @@ -280,6 +294,15 @@ func (t *timer) EnsureSet(timeout time.Duration) {
return
}

if t.retryAfter != nil {
// We've been told to wait before retrying
retryTime := time.Until(*t.retryAfter)
if retryTime > timeout {
timeout = retryTime
}
t.retryAfter = nil
}

t.timer.Reset(timeout)
t.set = true
}
Expand Down
17 changes: 11 additions & 6 deletions internals/overlord/logstate/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,24 @@ func handleServerResponse(resp *http.Response) error {
switch {
case code == 429:
// Try to get Retry-After value from response headers
retryAfter := resp.Header.Get("Retry-After")
var retryAfter *time.Time
retryAfterRaw := resp.Header.Get("Retry-After")

// The Retry-After value can be a date-time
t, err := http.ParseTime(retryAfter)
if err != nil {
t, err := http.ParseTime(retryAfterRaw)
if err == nil {
retryAfter = &t
} else {
// It can also be an integer number of seconds
n, err := strconv.Atoi(retryAfter)
n, err := strconv.Atoi(retryAfterRaw)
if err == nil && n > 0 {
t = time.Now().Add(time.Duration(n) * time.Second)
t := time.Now().Add(time.Duration(n) * time.Second)
retryAfter = &t
}
}

return clienterr.Backoff{
RetryAfter: t,
RetryAfter: retryAfter,
}

case code < 200 || code >= 300:
Expand Down
9 changes: 5 additions & 4 deletions internals/overlord/logstate/loki/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,12 @@ func (*suite) TestTooManyRequests(c *C) {
})
c.Assert(err, IsNil)

expectedErr := clienterr.Backoff{
RetryAfter: time.Date(2023, 8, 15, 8, 49, 37, 0, time.UTC),
}
err = client.Flush(context.Background())
c.Assert(errors.Is(err, expectedErr), Equals, true)
backoffErr := clienterr.Backoff{}
c.Assert(errors.As(err, &backoffErr), Equals, true)

expectedTime := time.Date(2023, 8, 15, 8, 49, 37, 0, time.UTC)
c.Check(backoffErr.RetryAfter, DeepEquals, &expectedTime)
}

// Strips all extraneous whitespace from JSON
Expand Down

0 comments on commit 3e47aa7

Please sign in to comment.