diff --git a/internals/overlord/logstate/clienterr/err.go b/internals/overlord/logstate/clienterr/err.go index 7159b8fba..56a5a3237 100644 --- a/internals/overlord/logstate/clienterr/err.go +++ b/internals/overlord/logstate/clienterr/err.go @@ -15,18 +15,21 @@ package clienterr import ( - "fmt" "time" ) // Backoff should be returned if the server indicates we are sending too many // requests (e.g. an HTTP 429 response). type Backoff struct { - RetryAfter time.Time + RetryAfter *time.Time } func (b Backoff) Error() string { - return fmt.Sprintf("too many requests, retry after %v", b.RetryAfter) + errStr := "too many requests" + if b.RetryAfter != nil { + errStr += ", retry after " + b.RetryAfter.String() + } + return errStr } // Timeout should be returned if the client's Flush times out. diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index 4d4b4a84a..18f764492 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -16,12 +16,14 @@ package logstate import ( "context" + "errors" "fmt" "time" "gopkg.in/tomb.v2" "github.com/canonical/pebble/internals/logger" + "github.com/canonical/pebble/internals/overlord/logstate/clienterr" "github.com/canonical/pebble/internals/overlord/logstate/loki" "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" @@ -72,6 +74,8 @@ type logGatherer struct { pullers *pullerGroup // All pullers send logs on this channel, received by main loop entryCh chan servicelog.Entry + + timer timer } // logGathererArgs allows overriding the newLogClient method and time values @@ -104,6 +108,7 @@ func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logG client: client, entryCh: make(chan servicelog.Entry), pullers: newPullerGroup(target.Name), + timer: newTimer(), } g.clientCtx, g.clientCancel = context.WithCancel(context.Background()) g.tomb.Go(g.loop) @@ -170,8 +175,7 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R // pullers on entryCh, and writes them to the client. It also flushes the // client periodically, and exits when the gatherer's tomb is killed. func (g *logGatherer) loop() error { - timer := newTimer() - defer timer.Stop() + defer g.timer.Stop() mainLoop: for { @@ -179,20 +183,14 @@ mainLoop: case <-g.tomb.Dying(): break mainLoop - case <-timer.Expired(): + case <-g.timer.Expired(): // Mark timer as unset - timer.Stop() - err := g.client.Flush(g.clientCtx) - if err != nil { - logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) - } + g.timer.Stop() + g.handleClientErr(g.client.Flush(g.clientCtx)) case entry := <-g.entryCh: - err := g.client.Write(g.clientCtx, entry) - if err != nil { - logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err) - } - timer.EnsureSet(g.bufferTimeout) + g.handleClientErr(g.client.Write(g.clientCtx, entry)) + g.timer.EnsureSet(g.bufferTimeout) } } @@ -200,13 +198,27 @@ mainLoop: // We need to create a new context, as the previous one may have been cancelled. ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush) defer cancel() - err := g.client.Flush(ctx) - if err != nil { - logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) - } + g.handleClientErr(g.client.Flush(ctx)) return nil } +func (g *logGatherer) handleClientErr(err error) { + if err == nil { + return + } + + if e := (clienterr.Backoff{}); errors.As(err, &e) { + logger.Noticef("Target %q: too many requests, backing off", g.targetName) + g.timer.retryAfter = e.RetryAfter + } + + if errors.As(err, &clienterr.Timeout{}) { + logger.Noticef("Timeout flushing logs for target %q", g.targetName) + } + + logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) +} + // Stop tears down the gatherer and associated resources (pullers, client). // This method will block until gatherer teardown is complete. // @@ -251,6 +263,8 @@ func (g *logGatherer) Stop() { type timer struct { timer *time.Timer set bool + // If non-nil, the timer won't expire until after this time + retryAfter *time.Time } func newTimer() timer { @@ -280,6 +294,15 @@ func (t *timer) EnsureSet(timeout time.Duration) { return } + if t.retryAfter != nil { + // We've been told to wait before retrying + retryTime := time.Until(*t.retryAfter) + if retryTime > timeout { + timeout = retryTime + } + t.retryAfter = nil + } + t.timer.Reset(timeout) t.set = true } diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index 2c0fe4d6f..17ca3f56e 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -118,19 +118,24 @@ func handleServerResponse(resp *http.Response) error { switch { case code == 429: // Try to get Retry-After value from response headers - retryAfter := resp.Header.Get("Retry-After") + var retryAfter *time.Time + retryAfterRaw := resp.Header.Get("Retry-After") + // The Retry-After value can be a date-time - t, err := http.ParseTime(retryAfter) - if err != nil { + t, err := http.ParseTime(retryAfterRaw) + if err == nil { + retryAfter = &t + } else { // It can also be an integer number of seconds - n, err := strconv.Atoi(retryAfter) + n, err := strconv.Atoi(retryAfterRaw) if err == nil && n > 0 { - t = time.Now().Add(time.Duration(n) * time.Second) + t := time.Now().Add(time.Duration(n) * time.Second) + retryAfter = &t } } return clienterr.Backoff{ - RetryAfter: t, + RetryAfter: retryAfter, } case code < 200 || code >= 300: diff --git a/internals/overlord/logstate/loki/loki_test.go b/internals/overlord/logstate/loki/loki_test.go index 00b01e2b6..540369525 100644 --- a/internals/overlord/logstate/loki/loki_test.go +++ b/internals/overlord/logstate/loki/loki_test.go @@ -220,11 +220,12 @@ func (*suite) TestTooManyRequests(c *C) { }) c.Assert(err, IsNil) - expectedErr := clienterr.Backoff{ - RetryAfter: time.Date(2023, 8, 15, 8, 49, 37, 0, time.UTC), - } err = client.Flush(context.Background()) - c.Assert(errors.Is(err, expectedErr), Equals, true) + backoffErr := clienterr.Backoff{} + c.Assert(errors.As(err, &backoffErr), Equals, true) + + expectedTime := time.Date(2023, 8, 15, 8, 49, 37, 0, time.UTC) + c.Check(backoffErr.RetryAfter, DeepEquals, &expectedTime) } // Strips all extraneous whitespace from JSON