From f56d4ada065da854e822946049e6b9be64fddc84 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Mon, 10 Jul 2023 15:25:39 +1200 Subject: [PATCH 01/39] Implement Loki client --- .github/.golangci.yml | 4 +- .../overlord/logstate/clienterr/clienterr.go | 81 ++++++ internals/overlord/logstate/gatherer.go | 59 +++-- .../overlord/logstate/loki/export_test.go | 25 ++ internals/overlord/logstate/loki/loki.go | 201 +++++++++++++++ internals/overlord/logstate/loki/loki_test.go | 241 ++++++++++++++++++ 6 files changed, 590 insertions(+), 21 deletions(-) create mode 100644 internals/overlord/logstate/clienterr/clienterr.go create mode 100644 internals/overlord/logstate/loki/export_test.go create mode 100644 internals/overlord/logstate/loki/loki.go create mode 100644 internals/overlord/logstate/loki/loki_test.go diff --git a/.github/.golangci.yml b/.github/.golangci.yml index 2e368060..7d784fdf 100644 --- a/.github/.golangci.yml +++ b/.github/.golangci.yml @@ -12,7 +12,5 @@ issues: # these values ensure that all issues will be surfaced max-issues-per-linter: 0 max-same-issues: 0 - run: - timeout: 5m - + timeout: 5m \ No newline at end of file diff --git a/internals/overlord/logstate/clienterr/clienterr.go b/internals/overlord/logstate/clienterr/clienterr.go new file mode 100644 index 00000000..c756ae13 --- /dev/null +++ b/internals/overlord/logstate/clienterr/clienterr.go @@ -0,0 +1,81 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +/* +Package clienterr contains common error types which are recognised by the log +gatherer. logstate.logClient implementations should return these error types +to communicate with the gatherer. + +Errors in this package should be pattern-matched using errors.As: + + err := client.Flush(ctx) + backoff := &clienterr.Backoff{} + if errors.As(err, &backoff) { + ... + } +*/ +package clienterr + +import ( + "bytes" + "fmt" + "io" + "net/http" + "time" +) + +// Backoff should be returned if the server indicates we are sending too many +// requests (e.g. an HTTP 429 response). +type Backoff struct { + RetryAfter *time.Time +} + +func (e *Backoff) Error() string { + errStr := "too many requests" + if e.RetryAfter != nil { + errStr += ", retry after " + e.RetryAfter.String() + } + return errStr +} + +// ErrorResponse represents an HTTP error response from the server +// (4xx or 5xx). +type ErrorResponse struct { + StatusCode int + Body bytes.Buffer + ReadErr error +} + +func (e *ErrorResponse) Error() string { + errStr := fmt.Sprintf("server returned HTTP %d\n", e.StatusCode) + if e.Body.Len() > 0 { + errStr += fmt.Sprintf(`response body: +%s +`, e.Body.String()) + } + if e.ReadErr != nil { + errStr += "cannot read response body: " + e.ReadErr.Error() + } + return errStr +} + +// ErrorFromResponse generates a *ErrorResponse from a failed *http.Response. +// NB: this function reads the response body. +func ErrorFromResponse(resp *http.Response) *ErrorResponse { + err := &ErrorResponse{} + err.StatusCode = resp.StatusCode + _, readErr := io.CopyN(&err.Body, resp.Body, 1024) + err.ReadErr = readErr + return err +} diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index 001b9e2f..f764d7bc 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -16,12 +16,15 @@ package logstate import ( "context" + "errors" "fmt" "time" "gopkg.in/tomb.v2" "github.com/canonical/pebble/internals/logger" + "github.com/canonical/pebble/internals/overlord/logstate/clienterr" + "github.com/canonical/pebble/internals/overlord/logstate/loki" "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" ) @@ -71,6 +74,8 @@ type logGatherer struct { pullers *pullerGroup // All pullers send logs on this channel, received by main loop entryCh chan servicelog.Entry + + timer timer } // logGathererArgs allows overriding the newLogClient method and time values @@ -103,6 +108,7 @@ func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logG client: client, entryCh: make(chan servicelog.Entry), pullers: newPullerGroup(target.Name), + timer: newTimer(), } g.clientCtx, g.clientCancel = context.WithCancel(context.Background()) g.tomb.Go(g.loop) @@ -169,8 +175,7 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R // pullers on entryCh, and writes them to the client. It also flushes the // client periodically, and exits when the gatherer's tomb is killed. func (g *logGatherer) loop() error { - timer := newTimer() - defer timer.Stop() + defer g.timer.Stop() mainLoop: for { @@ -178,20 +183,14 @@ mainLoop: case <-g.tomb.Dying(): break mainLoop - case <-timer.Expired(): + case <-g.timer.Expired(): // Mark timer as unset - timer.Stop() - err := g.client.Flush(g.clientCtx) - if err != nil { - logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) - } + g.timer.Stop() + g.handleClientErr(g.client.Flush(g.clientCtx)) case entry := <-g.entryCh: - err := g.client.Write(g.clientCtx, entry) - if err != nil { - logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err) - } - timer.EnsureSet(g.bufferTimeout) + g.handleClientErr(g.client.Write(g.clientCtx, entry)) + g.timer.EnsureSet(g.bufferTimeout) } } @@ -199,13 +198,25 @@ mainLoop: // We need to create a new context, as the previous one may have been cancelled. ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush) defer cancel() - err := g.client.Flush(ctx) - if err != nil { - logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) - } + g.handleClientErr(g.client.Flush(ctx)) return nil } +func (g *logGatherer) handleClientErr(err error) { + if err == nil { + return + } + + backoff := &clienterr.Backoff{} + if errors.As(err, &backoff) { + logger.Noticef("Target %q: %v", g.targetName, err) + g.timer.retryAfter = backoff.RetryAfter + return + } + + logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) +} + // Stop tears down the gatherer and associated resources (pullers, client). // This method will block until gatherer teardown is complete. // @@ -250,6 +261,8 @@ func (g *logGatherer) Stop() { type timer struct { timer *time.Timer set bool + // If non-nil, the timer won't expire until after this time + retryAfter *time.Time } func newTimer() timer { @@ -279,6 +292,15 @@ func (t *timer) EnsureSet(timeout time.Duration) { return } + if t.retryAfter != nil { + // We've been told to wait before retrying + retryTime := time.Until(*t.retryAfter) + if retryTime > timeout { + timeout = retryTime + } + t.retryAfter = nil + } + t.timer.Reset(timeout) t.set = true } @@ -311,7 +333,8 @@ type logClient interface { func newLogClient(target *plan.LogTarget) (logClient, error) { switch target.Type { - //case plan.LokiTarget: TODO + case plan.LokiTarget: + return loki.NewClient(target), nil //case plan.SyslogTarget: TODO default: return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name) diff --git a/internals/overlord/logstate/loki/export_test.go b/internals/overlord/logstate/loki/export_test.go new file mode 100644 index 00000000..f1616b2f --- /dev/null +++ b/internals/overlord/logstate/loki/export_test.go @@ -0,0 +1,25 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package loki + +import "time" + +func SetRequestTimeout(new time.Duration) (restore func()) { + oldRequestTimeout := requestTimeout + requestTimeout = new + return func() { + requestTimeout = oldRequestTimeout + } +} diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go new file mode 100644 index 00000000..88516730 --- /dev/null +++ b/internals/overlord/logstate/loki/loki.go @@ -0,0 +1,201 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package loki + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "sort" + "strconv" + "strings" + "time" + + "github.com/canonical/pebble/cmd" + "github.com/canonical/pebble/internals/overlord/logstate/clienterr" + "github.com/canonical/pebble/internals/plan" + "github.com/canonical/pebble/internals/servicelog" +) + +const maxRequestEntries = 1000 + +var requestTimeout = 10 * time.Second + +type Client struct { + remoteURL string + // buffered entries are "sharded" by service name + entries map[string][]lokiEntry + // Keep track of the number of buffered entries, to avoid having to iterate + // the entries map to get the total number. + numEntries int + + httpClient *http.Client + retryAfter *time.Time +} + +func NewClient(target *plan.LogTarget) *Client { + return &Client{ + remoteURL: target.Location, + entries: map[string][]lokiEntry{}, + httpClient: &http.Client{Timeout: requestTimeout}, + } +} + +func (c *Client) Write(ctx context.Context, entry servicelog.Entry) error { + c.entries[entry.Service] = append(c.entries[entry.Service], encodeEntry(entry)) + c.numEntries++ + + if c.numEntries >= maxRequestEntries { + if c.retryAfter != nil { + if time.Now().Before(*c.retryAfter) { + // Retry-After deadline hasn't passed yet, so we shouldn't flush + return nil + } + c.retryAfter = nil + } + return c.Flush(ctx) + } + return nil +} + +func encodeEntry(entry servicelog.Entry) lokiEntry { + return lokiEntry{ + strconv.FormatInt(entry.Time.UnixNano(), 10), + strings.TrimSuffix(entry.Message, "\n"), + } +} + +func (c *Client) Flush(ctx context.Context) error { + if c.numEntries == 0 { + return nil // no-op + } + defer c.emptyBuffer() + + req := c.buildRequest() + jsonReq, err := json.Marshal(req) + if err != nil { + return fmt.Errorf("encoding request to JSON: %v", err) + } + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.remoteURL, bytes.NewReader(jsonReq)) + if err != nil { + return fmt.Errorf("creating HTTP request: %v", err) + } + httpReq.Header.Set("Content-Type", "application/json; charset=utf-8") + httpReq.Header.Set("User-Agent", fmt.Sprintf("pebble/%s", cmd.Version)) + + resp, err := c.httpClient.Do(httpReq) + if err != nil { + return err + } + return c.handleServerResponse(resp) +} + +func (c *Client) emptyBuffer() { + for svc := range c.entries { + c.entries[svc] = c.entries[svc][:0] + } + c.numEntries = 0 +} + +func (c *Client) buildRequest() request { + // Sort keys to guarantee deterministic output + services := make([]string, 0, len(c.entries)) + for svc, entries := range c.entries { + if len(entries) == 0 { + continue + } + services = append(services, svc) + } + sort.Strings(services) + + var req request + for _, service := range services { + entries := c.entries[service] + stream := stream{ + Labels: map[string]string{ + "pebble_service": service, + }, + Entries: entries, + } + req.Streams = append(req.Streams, stream) + } + return req +} + +type request struct { + Streams []stream `json:"streams"` +} + +type stream struct { + Labels map[string]string `json:"stream"` + Entries []lokiEntry `json:"values"` +} + +type lokiEntry [2]string + +func (c *Client) handleServerResponse(resp *http.Response) error { + defer func() { + // Drain request body to allow connection reuse + // see https://pkg.go.dev/net/http#Response.Body + _, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, 1024*1024)) + _ = resp.Body.Close() + }() + + code := resp.StatusCode + switch { + case code == http.StatusTooManyRequests: + err := &clienterr.Backoff{} + retryAfter, ok := getRetryAfter(resp) + if ok { + err.RetryAfter = &retryAfter + c.retryAfter = &retryAfter + } + return err + + case code >= 400: + // Request to Loki failed + return clienterr.ErrorFromResponse(resp) + } + + return nil +} + +// Gets the parsed value of Retry-After from HTTP response headers. +func getRetryAfter(resp *http.Response) (time.Time, bool) { + retryAfterRaw := resp.Header.Get("Retry-After") + if retryAfterRaw == "" { + // Header unset + return time.Time{}, false + } + + // The Retry-After value can be a date-time + t, err := http.ParseTime(retryAfterRaw) + if err == nil { + return t, true + } + + // It can also be an integer number of seconds + n, err := strconv.Atoi(retryAfterRaw) + if err == nil && n > 0 { + t := time.Now().Add(time.Duration(n) * time.Second) + return t, true + } + + return time.Time{}, false +} diff --git a/internals/overlord/logstate/loki/loki_test.go b/internals/overlord/logstate/loki/loki_test.go new file mode 100644 index 00000000..29e9ca92 --- /dev/null +++ b/internals/overlord/logstate/loki/loki_test.go @@ -0,0 +1,241 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package loki_test + +import ( + "context" + "encoding/json" + "errors" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + . "gopkg.in/check.v1" + + "github.com/canonical/pebble/internals/overlord/logstate/clienterr" + "github.com/canonical/pebble/internals/overlord/logstate/loki" + "github.com/canonical/pebble/internals/plan" + "github.com/canonical/pebble/internals/servicelog" +) + +type suite struct{} + +var _ = Suite(&suite{}) + +func Test(t *testing.T) { + TestingT(t) +} + +func (*suite) TestRequest(c *C) { + input := []servicelog.Entry{{ + Time: time.Date(2023, 12, 31, 12, 34, 50, 0, time.UTC), + Service: "svc1", + Message: "log line #1\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 51, 0, time.UTC), + Service: "svc2", + Message: "log line #2\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 52, 0, time.UTC), + Service: "svc1", + Message: "log line #3\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 53, 0, time.UTC), + Service: "svc3", + Message: "log line #4\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 54, 0, time.UTC), + Service: "svc1", + Message: "log line #5\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 55, 0, time.UTC), + Service: "svc3", + Message: "log line #6\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 56, 0, time.UTC), + Service: "svc2", + Message: "log line #7\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 57, 0, time.UTC), + Service: "svc1", + Message: "log line #8\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 58, 0, time.UTC), + Service: "svc4", + Message: "log line #9\n", + }} + + expected := ` +{ + "streams": [ + { + "stream": { + "pebble_service": "svc1" + }, + "values": [ + [ "1704026090000000000", "log line #1" ], + [ "1704026092000000000", "log line #3" ], + [ "1704026094000000000", "log line #5" ], + [ "1704026097000000000", "log line #8" ] + ] + }, + { + "stream": { + "pebble_service": "svc2" + }, + "values": [ + [ "1704026091000000000", "log line #2" ], + [ "1704026096000000000", "log line #7" ] + ] + }, + { + "stream": { + "pebble_service": "svc3" + }, + "values": [ + [ "1704026093000000000", "log line #4" ], + [ "1704026095000000000", "log line #6" ] + ] + }, + { + "stream": { + "pebble_service": "svc4" + }, + "values": [ + [ "1704026098000000000", "log line #9" ] + ] + } + ] +} +` + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c.Assert(r.Method, Equals, http.MethodPost) + c.Assert(r.Header.Get("Content-Type"), Equals, "application/json; charset=utf-8") + + reqBody, err := io.ReadAll(r.Body) + c.Assert(err, IsNil) + expFlattened, err := flattenJSON(expected) + c.Assert(err, IsNil) + c.Assert(string(reqBody), Equals, expFlattened) + })) + defer server.Close() + + client := loki.NewClient(&plan.LogTarget{Location: server.URL}) + for _, entry := range input { + err := client.Write(context.Background(), entry) + c.Assert(err, IsNil) + } + + err := client.Flush(context.Background()) + c.Assert(err, IsNil) +} + +func (*suite) TestFlushCancelContext(c *C) { + serverCtx, killServer := context.WithCancel(context.Background()) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + select { + case <-serverCtx.Done(): + // Simulate a slow-responding server + case <-time.After(10 * time.Second): + } + })) + defer server.Close() + defer killServer() + + client := loki.NewClient(&plan.LogTarget{Location: server.URL}) + err := client.Write(context.Background(), servicelog.Entry{ + Time: time.Now(), + Service: "svc1", + Message: "this is a log line\n", + }) + c.Assert(err, IsNil) + + flushReturned := make(chan struct{}) + go func() { + // Cancel the Flush context quickly + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Microsecond) + defer cancel() + + err := client.Flush(ctx) + c.Assert(err, ErrorMatches, ".*context deadline exceeded.*") + close(flushReturned) + }() + + // Check Flush returns quickly after context timeout + select { + case <-flushReturned: + case <-time.After(1 * time.Second): + c.Fatal("lokiClient.Flush took too long to return after context timeout") + } +} + +func (*suite) TestServerTimeout(c *C) { + restore := loki.SetRequestTimeout(1 * time.Microsecond) + defer restore() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(2 * time.Microsecond) + })) + defer server.Close() + + client := loki.NewClient(&plan.LogTarget{Location: server.URL}) + err := client.Write(context.Background(), servicelog.Entry{ + Time: time.Now(), + Service: "svc1", + Message: "this is a log line\n", + }) + c.Assert(err, IsNil) + + err = client.Flush(context.Background()) + c.Assert(err, ErrorMatches, ".*context deadline exceeded.*") +} + +func (*suite) TestTooManyRequests(c *C) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Retry-After", "Tue, 15 Aug 2023 08:49:37 GMT") + w.WriteHeader(http.StatusTooManyRequests) + })) + defer server.Close() + + client := loki.NewClient(&plan.LogTarget{Location: server.URL}) + err := client.Write(context.Background(), servicelog.Entry{ + Time: time.Now(), + Service: "svc1", + Message: "this is a log line\n", + }) + c.Assert(err, IsNil) + + err = client.Flush(context.Background()) + backoff := &clienterr.Backoff{} + c.Assert(errors.As(err, &backoff), Equals, true) + + expectedTime := time.Date(2023, 8, 15, 8, 49, 37, 0, time.UTC) + c.Check(backoff.RetryAfter, DeepEquals, &expectedTime) +} + +// Strips all extraneous whitespace from JSON +func flattenJSON(s string) (string, error) { + var v any + err := json.Unmarshal([]byte(s), &v) + if err != nil { + return "", err + } + + b, err := json.Marshal(v) + return string(b), err +} From 8ff49d8d860f42f32e06e51e8d3e7cc8985ef193 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Mon, 28 Aug 2023 12:43:50 +0700 Subject: [PATCH 02/39] update README --- README.md | 53 ++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index ab9a0cc2..b10e1548 100644 --- a/README.md +++ b/README.md @@ -403,30 +403,26 @@ $ pebble run --verbose ... ``` - ## Container usage @@ -721,6 +716,37 @@ checks: # (Optional) Working directory to run command in. By default, the # command is run in the service manager's current directory. working-dir: + +# (Optional) A list of remote log receivers, to which service logs can be sent. +log-targets: + + : + + # (Required) Control how this log target definition is combined with + # other pre-existing definitions with the same name in the Pebble plan. + # + # The value 'merge' will ensure that values in this layer specification + # are merged over existing definitions, whereas 'replace' will entirely + # override the existing target spec in the plan with the same name. + override: merge | replace + + # (Required) The type of log target, which determines the format in + # which logs will be sent. Currently, the only supported type is 'loki', + # but more protocols may be added in the future. + type: loki + + # (Required) The URL of the remote log target. + # For Loki, this needs to be the fully-qualified URL of the push API, + # including the API endpoint, e.g. + # http://:3100/loki/api/v1/push + location: + + # (Optional) A list of services whose logs will be sent to this target. + # Use the special keyword 'all' to match all services in the plan. + # When merging log targets, the 'services' lists are appended. Prefix a + # service name with a minus (e.g. '-svc1') to remove a previously added + # service. '-all' will remove all services. + services: [] ``` ## API and clients @@ -753,7 +779,8 @@ Here are some of the things coming soon: - [x] Automatically restart services that fail - [x] Support for custom health checks (HTTP, TCP, command) - [x] Terminate all services before exiting run command - - [ ] Log forwarding (syslog and Loki) + - [x] Log forwarding to Loki + - [ ] Log forwarding to syslog - [ ] [Other in-progress PRs](https://github.com/canonical/pebble/pulls) - [ ] [Other requested features](https://github.com/canonical/pebble/issues) From 72942ad308cc1112781bcc31b0548de1fb3d3b15 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Mon, 28 Aug 2023 14:58:27 +0700 Subject: [PATCH 03/39] don't need to allocate for errors.As --- internals/overlord/logstate/clienterr/clienterr.go | 4 +++- internals/overlord/logstate/gatherer.go | 2 +- internals/overlord/logstate/loki/loki_test.go | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/internals/overlord/logstate/clienterr/clienterr.go b/internals/overlord/logstate/clienterr/clienterr.go index c756ae13..7c886875 100644 --- a/internals/overlord/logstate/clienterr/clienterr.go +++ b/internals/overlord/logstate/clienterr/clienterr.go @@ -20,7 +20,7 @@ to communicate with the gatherer. Errors in this package should be pattern-matched using errors.As: err := client.Flush(ctx) - backoff := &clienterr.Backoff{} + var backoff *clienterr.Backoff if errors.As(err, &backoff) { ... } @@ -51,6 +51,8 @@ func (e *Backoff) Error() string { // ErrorResponse represents an HTTP error response from the server // (4xx or 5xx). +// This is a generic, catch-all error type - clients should use a more refined +// type when appropriate (e.g. Backoff for a 429 response). type ErrorResponse struct { StatusCode int Body bytes.Buffer diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index f764d7bc..16e89827 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -207,7 +207,7 @@ func (g *logGatherer) handleClientErr(err error) { return } - backoff := &clienterr.Backoff{} + var backoff *clienterr.Backoff if errors.As(err, &backoff) { logger.Noticef("Target %q: %v", g.targetName, err) g.timer.retryAfter = backoff.RetryAfter diff --git a/internals/overlord/logstate/loki/loki_test.go b/internals/overlord/logstate/loki/loki_test.go index 29e9ca92..dfbaf25e 100644 --- a/internals/overlord/logstate/loki/loki_test.go +++ b/internals/overlord/logstate/loki/loki_test.go @@ -221,7 +221,7 @@ func (*suite) TestTooManyRequests(c *C) { c.Assert(err, IsNil) err = client.Flush(context.Background()) - backoff := &clienterr.Backoff{} + var backoff *clienterr.Backoff c.Assert(errors.As(err, &backoff), Equals, true) expectedTime := time.Date(2023, 8, 15, 8, 49, 37, 0, time.UTC) From e91b3d92a467bd2125bb96c394c90e1d91be9cc5 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 29 Aug 2023 10:37:29 +0700 Subject: [PATCH 04/39] remove clienterr package --- .../overlord/logstate/clienterr/clienterr.go | 83 ------------------- internals/overlord/logstate/gatherer.go | 55 ++++-------- internals/overlord/logstate/loki/loki.go | 52 ++++-------- internals/overlord/logstate/loki/loki_test.go | 25 ------ 4 files changed, 32 insertions(+), 183 deletions(-) delete mode 100644 internals/overlord/logstate/clienterr/clienterr.go diff --git a/internals/overlord/logstate/clienterr/clienterr.go b/internals/overlord/logstate/clienterr/clienterr.go deleted file mode 100644 index 7c886875..00000000 --- a/internals/overlord/logstate/clienterr/clienterr.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright (c) 2023 Canonical Ltd -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License version 3 as -// published by the Free Software Foundation. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -/* -Package clienterr contains common error types which are recognised by the log -gatherer. logstate.logClient implementations should return these error types -to communicate with the gatherer. - -Errors in this package should be pattern-matched using errors.As: - - err := client.Flush(ctx) - var backoff *clienterr.Backoff - if errors.As(err, &backoff) { - ... - } -*/ -package clienterr - -import ( - "bytes" - "fmt" - "io" - "net/http" - "time" -) - -// Backoff should be returned if the server indicates we are sending too many -// requests (e.g. an HTTP 429 response). -type Backoff struct { - RetryAfter *time.Time -} - -func (e *Backoff) Error() string { - errStr := "too many requests" - if e.RetryAfter != nil { - errStr += ", retry after " + e.RetryAfter.String() - } - return errStr -} - -// ErrorResponse represents an HTTP error response from the server -// (4xx or 5xx). -// This is a generic, catch-all error type - clients should use a more refined -// type when appropriate (e.g. Backoff for a 429 response). -type ErrorResponse struct { - StatusCode int - Body bytes.Buffer - ReadErr error -} - -func (e *ErrorResponse) Error() string { - errStr := fmt.Sprintf("server returned HTTP %d\n", e.StatusCode) - if e.Body.Len() > 0 { - errStr += fmt.Sprintf(`response body: -%s -`, e.Body.String()) - } - if e.ReadErr != nil { - errStr += "cannot read response body: " + e.ReadErr.Error() - } - return errStr -} - -// ErrorFromResponse generates a *ErrorResponse from a failed *http.Response. -// NB: this function reads the response body. -func ErrorFromResponse(resp *http.Response) *ErrorResponse { - err := &ErrorResponse{} - err.StatusCode = resp.StatusCode - _, readErr := io.CopyN(&err.Body, resp.Body, 1024) - err.ReadErr = readErr - return err -} diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index 16e89827..0b4955d5 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -16,14 +16,12 @@ package logstate import ( "context" - "errors" "fmt" "time" "gopkg.in/tomb.v2" "github.com/canonical/pebble/internals/logger" - "github.com/canonical/pebble/internals/overlord/logstate/clienterr" "github.com/canonical/pebble/internals/overlord/logstate/loki" "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" @@ -74,8 +72,6 @@ type logGatherer struct { pullers *pullerGroup // All pullers send logs on this channel, received by main loop entryCh chan servicelog.Entry - - timer timer } // logGathererArgs allows overriding the newLogClient method and time values @@ -108,7 +104,6 @@ func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logG client: client, entryCh: make(chan servicelog.Entry), pullers: newPullerGroup(target.Name), - timer: newTimer(), } g.clientCtx, g.clientCancel = context.WithCancel(context.Background()) g.tomb.Go(g.loop) @@ -175,7 +170,8 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R // pullers on entryCh, and writes them to the client. It also flushes the // client periodically, and exits when the gatherer's tomb is killed. func (g *logGatherer) loop() error { - defer g.timer.Stop() + timer := newTimer() + defer timer.Stop() mainLoop: for { @@ -183,14 +179,20 @@ mainLoop: case <-g.tomb.Dying(): break mainLoop - case <-g.timer.Expired(): + case <-timer.Expired(): // Mark timer as unset - g.timer.Stop() - g.handleClientErr(g.client.Flush(g.clientCtx)) + timer.Stop() + err := g.client.Flush(g.clientCtx) + if err != nil { + logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) + } case entry := <-g.entryCh: - g.handleClientErr(g.client.Write(g.clientCtx, entry)) - g.timer.EnsureSet(g.bufferTimeout) + err := g.client.Write(g.clientCtx, entry) + if err != nil { + logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err) + } + timer.EnsureSet(g.bufferTimeout) } } @@ -198,23 +200,11 @@ mainLoop: // We need to create a new context, as the previous one may have been cancelled. ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush) defer cancel() - g.handleClientErr(g.client.Flush(ctx)) - return nil -} - -func (g *logGatherer) handleClientErr(err error) { - if err == nil { - return - } - - var backoff *clienterr.Backoff - if errors.As(err, &backoff) { - logger.Noticef("Target %q: %v", g.targetName, err) - g.timer.retryAfter = backoff.RetryAfter - return + err := g.client.Flush(ctx) + if err != nil { + logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) } - - logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) + return nil } // Stop tears down the gatherer and associated resources (pullers, client). @@ -261,8 +251,6 @@ func (g *logGatherer) Stop() { type timer struct { timer *time.Timer set bool - // If non-nil, the timer won't expire until after this time - retryAfter *time.Time } func newTimer() timer { @@ -292,15 +280,6 @@ func (t *timer) EnsureSet(timeout time.Duration) { return } - if t.retryAfter != nil { - // We've been told to wait before retrying - retryTime := time.Until(*t.retryAfter) - if retryTime > timeout { - timeout = retryTime - } - t.retryAfter = nil - } - t.timer.Reset(timeout) t.set = true } diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index 88516730..19e87513 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -27,7 +28,6 @@ import ( "time" "github.com/canonical/pebble/cmd" - "github.com/canonical/pebble/internals/overlord/logstate/clienterr" "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" ) @@ -157,45 +157,23 @@ func (c *Client) handleServerResponse(resp *http.Response) error { _ = resp.Body.Close() }() - code := resp.StatusCode - switch { - case code == http.StatusTooManyRequests: - err := &clienterr.Backoff{} - retryAfter, ok := getRetryAfter(resp) - if ok { - err.RetryAfter = &retryAfter - c.retryAfter = &retryAfter + if code := resp.StatusCode; code >= 400 { + // Request to Loki failed + errStr := fmt.Sprintf("server returned HTTP %d\n", code) + + // Read response body to get more context + body := make([]byte, 0, 1024) + _, err := resp.Body.Read(body) + if err == nil { + errStr += fmt.Sprintf(`response body: +%s +`, body) + } else { + errStr += "cannot read response body: " + err.Error() } - return err - case code >= 400: - // Request to Loki failed - return clienterr.ErrorFromResponse(resp) + return errors.New(errStr) } return nil } - -// Gets the parsed value of Retry-After from HTTP response headers. -func getRetryAfter(resp *http.Response) (time.Time, bool) { - retryAfterRaw := resp.Header.Get("Retry-After") - if retryAfterRaw == "" { - // Header unset - return time.Time{}, false - } - - // The Retry-After value can be a date-time - t, err := http.ParseTime(retryAfterRaw) - if err == nil { - return t, true - } - - // It can also be an integer number of seconds - n, err := strconv.Atoi(retryAfterRaw) - if err == nil && n > 0 { - t := time.Now().Add(time.Duration(n) * time.Second) - return t, true - } - - return time.Time{}, false -} diff --git a/internals/overlord/logstate/loki/loki_test.go b/internals/overlord/logstate/loki/loki_test.go index dfbaf25e..8175c421 100644 --- a/internals/overlord/logstate/loki/loki_test.go +++ b/internals/overlord/logstate/loki/loki_test.go @@ -17,7 +17,6 @@ package loki_test import ( "context" "encoding/json" - "errors" "io" "net/http" "net/http/httptest" @@ -26,7 +25,6 @@ import ( . "gopkg.in/check.v1" - "github.com/canonical/pebble/internals/overlord/logstate/clienterr" "github.com/canonical/pebble/internals/overlord/logstate/loki" "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" @@ -205,29 +203,6 @@ func (*suite) TestServerTimeout(c *C) { c.Assert(err, ErrorMatches, ".*context deadline exceeded.*") } -func (*suite) TestTooManyRequests(c *C) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Retry-After", "Tue, 15 Aug 2023 08:49:37 GMT") - w.WriteHeader(http.StatusTooManyRequests) - })) - defer server.Close() - - client := loki.NewClient(&plan.LogTarget{Location: server.URL}) - err := client.Write(context.Background(), servicelog.Entry{ - Time: time.Now(), - Service: "svc1", - Message: "this is a log line\n", - }) - c.Assert(err, IsNil) - - err = client.Flush(context.Background()) - var backoff *clienterr.Backoff - c.Assert(errors.As(err, &backoff), Equals, true) - - expectedTime := time.Date(2023, 8, 15, 8, 49, 37, 0, time.UTC) - c.Check(backoff.RetryAfter, DeepEquals, &expectedTime) -} - // Strips all extraneous whitespace from JSON func flattenJSON(s string) (string, error) { var v any From 4d8e66fa1ab4bb56479a1d6b0dfb9a398d8ae09a Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 29 Aug 2023 11:03:17 +0700 Subject: [PATCH 05/39] don't drop logs on 429 / 5xx - warn user when dropping logs --- internals/overlord/logstate/loki/loki.go | 68 +++++++++++++++++------- 1 file changed, 50 insertions(+), 18 deletions(-) diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index 19e87513..268caead 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -28,6 +28,7 @@ import ( "time" "github.com/canonical/pebble/cmd" + "github.com/canonical/pebble/internals/logger" "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" ) @@ -37,7 +38,8 @@ const maxRequestEntries = 1000 var requestTimeout = 10 * time.Second type Client struct { - remoteURL string + targetName string + remoteURL string // buffered entries are "sharded" by service name entries map[string][]lokiEntry // Keep track of the number of buffered entries, to avoid having to iterate @@ -50,6 +52,7 @@ type Client struct { func NewClient(target *plan.LogTarget) *Client { return &Client{ + targetName: target.Name, remoteURL: target.Location, entries: map[string][]lokiEntry{}, httpClient: &http.Client{Timeout: requestTimeout}, @@ -84,7 +87,6 @@ func (c *Client) Flush(ctx context.Context) error { if c.numEntries == 0 { return nil // no-op } - defer c.emptyBuffer() req := c.buildRequest() jsonReq, err := json.Marshal(req) @@ -103,7 +105,16 @@ func (c *Client) Flush(ctx context.Context) error { if err != nil { return err } - return c.handleServerResponse(resp) + + err, emptyBuffer := c.handleServerResponse(resp) + if emptyBuffer { + if err != nil { + logger.Noticef("Target %q: request failed, dropping %d logs", + c.targetName, c.numEntries) + } + c.emptyBuffer() + } + return err } func (c *Client) emptyBuffer() { @@ -149,7 +160,11 @@ type stream struct { type lokiEntry [2]string -func (c *Client) handleServerResponse(resp *http.Response) error { +// handleServerResponse determines what to do based on the response from the +// Loki server. 4xx and 5xx responses indicate errors, so in this case, we will +// bubble up and error to the caller. The returned boolean indicates whether +// the buffer should be emptied. +func (c *Client) handleServerResponse(resp *http.Response) (err error, emptyBuffer bool) { defer func() { // Drain request body to allow connection reuse // see https://pkg.go.dev/net/http#Response.Body @@ -157,23 +172,40 @@ func (c *Client) handleServerResponse(resp *http.Response) error { _ = resp.Body.Close() }() - if code := resp.StatusCode; code >= 400 { - // Request to Loki failed - errStr := fmt.Sprintf("server returned HTTP %d\n", code) + code := resp.StatusCode + switch { + case code == http.StatusTooManyRequests: + // For 429, don't drop logs - just retry later + return errFromResponse(resp), false + + case code >= 500: + // 5xx indicates a problem with the server, so don't drop logs + return errFromResponse(resp), false + + case code >= 400: + // 4xx indicates a client problem, so drop the logs + return errFromResponse(resp), true + } - // Read response body to get more context - body := make([]byte, 0, 1024) - _, err := resp.Body.Read(body) - if err == nil { - errStr += fmt.Sprintf(`response body: + return nil, true +} + +// errFromResponse generates an error from a failed *http.Response. +// NB: this function reads the response body. +func errFromResponse(resp *http.Response) error { + // Request to Loki failed + errStr := fmt.Sprintf("server returned HTTP %d\n", resp.StatusCode) + + // Read response body to get more context + body := make([]byte, 0, 1024) + _, err := resp.Body.Read(body) + if err == nil { + errStr += fmt.Sprintf(`response body: %s `, body) - } else { - errStr += "cannot read response body: " + err.Error() - } - - return errors.New(errStr) + } else { + errStr += "cannot read response body: " + err.Error() } - return nil + return errors.New(errStr) } From a702d3705a3dfec7e60200b1ec5e5e63317d72b1 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 29 Aug 2023 11:13:15 +0700 Subject: [PATCH 06/39] remove retryAfter from loki.Client - use channel to synchronise server in loki test --- internals/overlord/logstate/loki/loki.go | 8 -------- internals/overlord/logstate/loki/loki_test.go | 4 +++- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index 268caead..251c267d 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -47,7 +47,6 @@ type Client struct { numEntries int httpClient *http.Client - retryAfter *time.Time } func NewClient(target *plan.LogTarget) *Client { @@ -64,13 +63,6 @@ func (c *Client) Write(ctx context.Context, entry servicelog.Entry) error { c.numEntries++ if c.numEntries >= maxRequestEntries { - if c.retryAfter != nil { - if time.Now().Before(*c.retryAfter) { - // Retry-After deadline hasn't passed yet, so we shouldn't flush - return nil - } - c.retryAfter = nil - } return c.Flush(ctx) } return nil diff --git a/internals/overlord/logstate/loki/loki_test.go b/internals/overlord/logstate/loki/loki_test.go index 8175c421..a81bb93f 100644 --- a/internals/overlord/logstate/loki/loki_test.go +++ b/internals/overlord/logstate/loki/loki_test.go @@ -186,10 +186,12 @@ func (*suite) TestServerTimeout(c *C) { restore := loki.SetRequestTimeout(1 * time.Microsecond) defer restore() + stopRequest := make(chan struct{}) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - time.Sleep(2 * time.Microsecond) + <-stopRequest })) defer server.Close() + defer close(stopRequest) client := loki.NewClient(&plan.LogTarget{Location: server.URL}) err := client.Write(context.Background(), servicelog.Entry{ From a8b0c3e8d98263fc37cdfbb7372421e5ecc9c54e Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 29 Aug 2023 11:16:41 +0700 Subject: [PATCH 07/39] revert changes to .golangci.yml --- .github/.golangci.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/.golangci.yml b/.github/.golangci.yml index 7d784fdf..2e368060 100644 --- a/.github/.golangci.yml +++ b/.github/.golangci.yml @@ -12,5 +12,7 @@ issues: # these values ensure that all issues will be surfaced max-issues-per-linter: 0 max-same-issues: 0 + run: - timeout: 5m \ No newline at end of file + timeout: 5m + From 2c427e41d4e415e035014a9c32857cb89253f6cb Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 29 Aug 2023 11:19:59 +0700 Subject: [PATCH 08/39] rename gatherer timer -> flushTimer --- internals/overlord/logstate/gatherer.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index 0b4955d5..1b432f26 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -170,8 +170,8 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R // pullers on entryCh, and writes them to the client. It also flushes the // client periodically, and exits when the gatherer's tomb is killed. func (g *logGatherer) loop() error { - timer := newTimer() - defer timer.Stop() + flushTimer := newTimer() + defer flushTimer.Stop() mainLoop: for { @@ -179,9 +179,9 @@ mainLoop: case <-g.tomb.Dying(): break mainLoop - case <-timer.Expired(): + case <-flushTimer.Expired(): // Mark timer as unset - timer.Stop() + flushTimer.Stop() err := g.client.Flush(g.clientCtx) if err != nil { logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) @@ -192,7 +192,7 @@ mainLoop: if err != nil { logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err) } - timer.EnsureSet(g.bufferTimeout) + flushTimer.EnsureSet(g.bufferTimeout) } } From 2d625655d663688ed859c4eace3a8ba6b8872d8a Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 29 Aug 2023 11:21:21 +0700 Subject: [PATCH 09/39] rename SetRequestTimeout -> FakeRequestTimeout --- internals/overlord/logstate/loki/export_test.go | 2 +- internals/overlord/logstate/loki/loki_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internals/overlord/logstate/loki/export_test.go b/internals/overlord/logstate/loki/export_test.go index f1616b2f..17cd1e4d 100644 --- a/internals/overlord/logstate/loki/export_test.go +++ b/internals/overlord/logstate/loki/export_test.go @@ -16,7 +16,7 @@ package loki import "time" -func SetRequestTimeout(new time.Duration) (restore func()) { +func FakeRequestTimeout(new time.Duration) (restore func()) { oldRequestTimeout := requestTimeout requestTimeout = new return func() { diff --git a/internals/overlord/logstate/loki/loki_test.go b/internals/overlord/logstate/loki/loki_test.go index a81bb93f..654af98a 100644 --- a/internals/overlord/logstate/loki/loki_test.go +++ b/internals/overlord/logstate/loki/loki_test.go @@ -183,7 +183,7 @@ func (*suite) TestFlushCancelContext(c *C) { } func (*suite) TestServerTimeout(c *C) { - restore := loki.SetRequestTimeout(1 * time.Microsecond) + restore := loki.FakeRequestTimeout(1 * time.Microsecond) defer restore() stopRequest := make(chan struct{}) From 8a77df6464040b3add786bb3210f1d775e204f31 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 29 Aug 2023 11:24:35 +0700 Subject: [PATCH 10/39] [README] better names for example targets --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index b10e1548..3a94b0fd 100644 --- a/README.md +++ b/README.md @@ -408,21 +408,21 @@ $ pebble run --verbose Pebble supports forwarding its services' logs to a remote Loki server. In the `log-targets` section of the plan, you can specify destinations for log forwarding, for example: ```yaml log-targets: - example1: + staging-logs: override: merge type: loki location: http://10.1.77.205:3100/loki/api/v1/push services: [all] - example2: + production-logs: override: merge type: loki location: http://my.loki.server.com/loki/api/v1/push services: [svc1, svc2] ``` -For each log target, use the `services` key to specify a list of services to collect logs from. In the above example, the `example2` target will collect logs from `svc1` and `svc2`. +For each log target, use the `services` key to specify a list of services to collect logs from. In the above example, the `production-logs` target will collect logs from `svc1` and `svc2`. -Use the special keyword `all` to match all services, including services that might be added in future layers. In the above example, `example1` will collect logs from all services. +Use the special keyword `all` to match all services, including services that might be added in future layers. In the above example, `staging-logs` will collect logs from all services. To remove a service from a log target when merging, prefix the service name with a minus `-`. For example, if we have a base layer with ```yaml From 0af606a91f5dc89edf61302cef766539e11ccd16 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 30 Aug 2023 16:06:45 +0700 Subject: [PATCH 11/39] empty buffer inside handleServerResponse --- internals/overlord/logstate/loki/loki.go | 27 ++++++++++-------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index 251c267d..85192b83 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -98,15 +98,7 @@ func (c *Client) Flush(ctx context.Context) error { return err } - err, emptyBuffer := c.handleServerResponse(resp) - if emptyBuffer { - if err != nil { - logger.Noticef("Target %q: request failed, dropping %d logs", - c.targetName, c.numEntries) - } - c.emptyBuffer() - } - return err + return c.handleServerResponse(resp) } func (c *Client) emptyBuffer() { @@ -154,9 +146,8 @@ type lokiEntry [2]string // handleServerResponse determines what to do based on the response from the // Loki server. 4xx and 5xx responses indicate errors, so in this case, we will -// bubble up and error to the caller. The returned boolean indicates whether -// the buffer should be emptied. -func (c *Client) handleServerResponse(resp *http.Response) (err error, emptyBuffer bool) { +// bubble up the error to the caller. +func (c *Client) handleServerResponse(resp *http.Response) error { defer func() { // Drain request body to allow connection reuse // see https://pkg.go.dev/net/http#Response.Body @@ -168,18 +159,22 @@ func (c *Client) handleServerResponse(resp *http.Response) (err error, emptyBuff switch { case code == http.StatusTooManyRequests: // For 429, don't drop logs - just retry later - return errFromResponse(resp), false + return errFromResponse(resp) case code >= 500: // 5xx indicates a problem with the server, so don't drop logs - return errFromResponse(resp), false + return errFromResponse(resp) case code >= 400: // 4xx indicates a client problem, so drop the logs - return errFromResponse(resp), true + logger.Noticef("Target %q: request failed with status %d, dropping %d logs", + c.targetName, code, c.numEntries) + c.emptyBuffer() + return errFromResponse(resp) } - return nil, true + c.emptyBuffer() + return nil } // errFromResponse generates an error from a failed *http.Response. From 7e00ccd0538cd4e35a350723e7b3bbfafc9f2ec5 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 30 Aug 2023 16:08:45 +0700 Subject: [PATCH 12/39] fix errFromResponse --- internals/overlord/logstate/loki/loki.go | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index 85192b83..cc10d1ad 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -18,7 +18,6 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "io" "net/http" @@ -180,19 +179,10 @@ func (c *Client) handleServerResponse(resp *http.Response) error { // errFromResponse generates an error from a failed *http.Response. // NB: this function reads the response body. func errFromResponse(resp *http.Response) error { - // Request to Loki failed - errStr := fmt.Sprintf("server returned HTTP %d\n", resp.StatusCode) - // Read response body to get more context - body := make([]byte, 0, 1024) - _, err := resp.Body.Read(body) - if err == nil { - errStr += fmt.Sprintf(`response body: -%s -`, body) - } else { - errStr += "cannot read response body: " + err.Error() + body, err := io.ReadAll(io.LimitReader(resp.Body, 1024)) + if err != nil { + return fmt.Errorf("HTTP %d error, but cannot read response: %v", resp.StatusCode, err) } - - return errors.New(errStr) + return fmt.Errorf("HTTP %d error, response %q", resp.StatusCode, body) } From 5d21565361718f3311bd181649e34957436445e9 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 30 Aug 2023 16:53:37 +0700 Subject: [PATCH 13/39] elaborate on retry comments --- internals/overlord/logstate/loki/loki.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index cc10d1ad..eeefa4da 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -161,11 +161,11 @@ func (c *Client) handleServerResponse(resp *http.Response) error { return errFromResponse(resp) case code >= 500: - // 5xx indicates a problem with the server, so don't drop logs + // 5xx indicates a problem with the server, so don't drop logs (retry later) return errFromResponse(resp) case code >= 400: - // 4xx indicates a client problem, so drop the logs + // 4xx indicates a client problem, so drop the logs (retrying won't help) logger.Noticef("Target %q: request failed with status %d, dropping %d logs", c.targetName, code, c.numEntries) c.emptyBuffer() From 2b78abefbfcafcb35f51ff491bd6fbc695754c73 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 19 Sep 2023 08:45:45 +0700 Subject: [PATCH 14/39] address Gustavo's review comments - emptyBuffer -> resetBuffer - request -> lokiRequest - stream -> lokiStream - remove unnecessary whitespace from inline JSON --- internals/overlord/logstate/loki/loki.go | 18 ++--- internals/overlord/logstate/loki/loki_test.go | 66 +++++++------------ 2 files changed, 34 insertions(+), 50 deletions(-) diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index eeefa4da..e6a221ec 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -100,14 +100,14 @@ func (c *Client) Flush(ctx context.Context) error { return c.handleServerResponse(resp) } -func (c *Client) emptyBuffer() { +func (c *Client) resetBuffer() { for svc := range c.entries { c.entries[svc] = c.entries[svc][:0] } c.numEntries = 0 } -func (c *Client) buildRequest() request { +func (c *Client) buildRequest() lokiRequest { // Sort keys to guarantee deterministic output services := make([]string, 0, len(c.entries)) for svc, entries := range c.entries { @@ -118,10 +118,10 @@ func (c *Client) buildRequest() request { } sort.Strings(services) - var req request + var req lokiRequest for _, service := range services { entries := c.entries[service] - stream := stream{ + stream := lokiStream{ Labels: map[string]string{ "pebble_service": service, }, @@ -132,11 +132,11 @@ func (c *Client) buildRequest() request { return req } -type request struct { - Streams []stream `json:"streams"` +type lokiRequest struct { + Streams []lokiStream `json:"streams"` } -type stream struct { +type lokiStream struct { Labels map[string]string `json:"stream"` Entries []lokiEntry `json:"values"` } @@ -168,11 +168,11 @@ func (c *Client) handleServerResponse(resp *http.Response) error { // 4xx indicates a client problem, so drop the logs (retrying won't help) logger.Noticef("Target %q: request failed with status %d, dropping %d logs", c.targetName, code, c.numEntries) - c.emptyBuffer() + c.resetBuffer() return errFromResponse(resp) } - c.emptyBuffer() + c.resetBuffer() return nil } diff --git a/internals/overlord/logstate/loki/loki_test.go b/internals/overlord/logstate/loki/loki_test.go index 654af98a..1076a69a 100644 --- a/internals/overlord/logstate/loki/loki_test.go +++ b/internals/overlord/logstate/loki/loki_test.go @@ -78,48 +78,32 @@ func (*suite) TestRequest(c *C) { }} expected := ` -{ - "streams": [ - { - "stream": { - "pebble_service": "svc1" - }, - "values": [ - [ "1704026090000000000", "log line #1" ], - [ "1704026092000000000", "log line #3" ], - [ "1704026094000000000", "log line #5" ], - [ "1704026097000000000", "log line #8" ] - ] - }, - { - "stream": { - "pebble_service": "svc2" - }, - "values": [ - [ "1704026091000000000", "log line #2" ], - [ "1704026096000000000", "log line #7" ] - ] - }, - { - "stream": { - "pebble_service": "svc3" - }, - "values": [ - [ "1704026093000000000", "log line #4" ], - [ "1704026095000000000", "log line #6" ] - ] - }, - { - "stream": { - "pebble_service": "svc4" - }, - "values": [ - [ "1704026098000000000", "log line #9" ] - ] - } +{"streams": [{ + "stream": {"pebble_service": "svc1"}, + "values": [ + [ "1704026090000000000", "log line #1" ], + [ "1704026092000000000", "log line #3" ], + [ "1704026094000000000", "log line #5" ], + [ "1704026097000000000", "log line #8" ] ] -} -` +}, { + "stream": {"pebble_service": "svc2"}, + "values": [ + [ "1704026091000000000", "log line #2" ], + [ "1704026096000000000", "log line #7" ] + ] +}, { + "stream": {"pebble_service": "svc3"}, + "values": [ + [ "1704026093000000000", "log line #4" ], + [ "1704026095000000000", "log line #6" ] + ] +}, { + "stream": {"pebble_service": "svc4"}, + "values": [ + [ "1704026098000000000", "log line #9" ] + ] +}]}` server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { c.Assert(r.Method, Equals, http.MethodPost) From 0142e7e1041ee97bb9d026856af3af1aff0a9b17 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 19 Sep 2023 12:13:13 +0700 Subject: [PATCH 15/39] rejig loki error handling - send response body to debug logs --- internals/overlord/logstate/loki/loki.go | 28 +++++++++++++++--------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index e6a221ec..6ce1e915 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -156,24 +156,30 @@ func (c *Client) handleServerResponse(resp *http.Response) error { code := resp.StatusCode switch { + case 200 <= code && code < 300: + // 2xx is a success - safe to drop logs + c.resetBuffer() + return nil + case code == http.StatusTooManyRequests: // For 429, don't drop logs - just retry later return errFromResponse(resp) - case code >= 500: - // 5xx indicates a problem with the server, so don't drop logs (retry later) - return errFromResponse(resp) - - case code >= 400: + case 400 <= code && code < 500: // 4xx indicates a client problem, so drop the logs (retrying won't help) logger.Noticef("Target %q: request failed with status %d, dropping %d logs", c.targetName, code, c.numEntries) c.resetBuffer() return errFromResponse(resp) - } - c.resetBuffer() - return nil + case 500 <= code && code < 600: + // 5xx indicates a problem with the server, so don't drop logs (retry later) + return errFromResponse(resp) + + default: + // Unexpected response + return fmt.Errorf("unexpected response from server: %v", resp.Status) + } } // errFromResponse generates an error from a failed *http.Response. @@ -182,7 +188,9 @@ func errFromResponse(resp *http.Response) error { // Read response body to get more context body, err := io.ReadAll(io.LimitReader(resp.Body, 1024)) if err != nil { - return fmt.Errorf("HTTP %d error, but cannot read response: %v", resp.StatusCode, err) + logger.Debugf("HTTP %d error, but cannot read response: %v", resp.StatusCode, err) } - return fmt.Errorf("HTTP %d error, response %q", resp.StatusCode, body) + logger.Debugf("HTTP %d error, response %q", resp.StatusCode, body) + + return fmt.Errorf("server returned HTTP %v", resp.Status) } From 79187af6d71ecf82f18344e005876e9029508a84 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 19 Sep 2023 13:05:06 +0700 Subject: [PATCH 16/39] if-else for debug logging error --- internals/overlord/logstate/loki/loki.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index 6ce1e915..788ee78e 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -187,10 +187,11 @@ func (c *Client) handleServerResponse(resp *http.Response) error { func errFromResponse(resp *http.Response) error { // Read response body to get more context body, err := io.ReadAll(io.LimitReader(resp.Body, 1024)) - if err != nil { + if err == nil { + logger.Debugf("HTTP %d error, response %q", resp.StatusCode, body) + } else { logger.Debugf("HTTP %d error, but cannot read response: %v", resp.StatusCode, err) } - logger.Debugf("HTTP %d error, response %q", resp.StatusCode, body) return fmt.Errorf("server returned HTTP %v", resp.Status) } From 1c694b34b018c6f894ac5292e56f19626531e903 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 20 Sep 2023 10:53:12 +0700 Subject: [PATCH 17/39] success is only 200 or 204 --- internals/overlord/logstate/loki/loki.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index 788ee78e..e73bceba 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -156,8 +156,8 @@ func (c *Client) handleServerResponse(resp *http.Response) error { code := resp.StatusCode switch { - case 200 <= code && code < 300: - // 2xx is a success - safe to drop logs + case code == 200 || code == 204: + // Success - safe to drop logs c.resetBuffer() return nil From 0ea5f3487b3afae175b7c65be2bb23e15dce79d6 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 20 Sep 2023 11:03:43 +0700 Subject: [PATCH 18/39] pull log-counting / flushing logic into gatherer - modify logClient interface --- internals/overlord/logstate/gatherer.go | 59 ++++++++++++------------ internals/overlord/logstate/loki/loki.go | 13 +++--- 2 files changed, 35 insertions(+), 37 deletions(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index 1b432f26..aeb469f6 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -28,8 +28,9 @@ import ( ) const ( - parserSize = 4 * 1024 - bufferTimeout = 1 * time.Second + parserSize = 4 * 1024 + bufferTimeout = 1 * time.Second + maxBufferedEntries = 100 // These constants control the maximum time allowed for each teardown step. timeoutCurrentFlush = 1 * time.Second @@ -173,6 +174,15 @@ func (g *logGatherer) loop() error { flushTimer := newTimer() defer flushTimer.Stop() + flushClient := func(ctx context.Context) { + // Mark timer as unset + flushTimer.Stop() + err := g.client.Flush(ctx) + if err != nil { + logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) + } + } + mainLoop: for { select { @@ -180,18 +190,19 @@ mainLoop: break mainLoop case <-flushTimer.Expired(): - // Mark timer as unset - flushTimer.Stop() - err := g.client.Flush(g.clientCtx) - if err != nil { - logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) - } + flushClient(g.clientCtx) case entry := <-g.entryCh: - err := g.client.Write(g.clientCtx, entry) + err := g.client.AddLog(entry) if err != nil { logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err) + continue + } + // Check if buffer is full + if g.client.NumBuffered() > maxBufferedEntries { + flushClient(g.clientCtx) } + // Otherwise, set the timeout flushTimer.EnsureSet(g.bufferTimeout) } } @@ -200,10 +211,7 @@ mainLoop: // We need to create a new context, as the previous one may have been cancelled. ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush) defer cancel() - err := g.client.Flush(ctx) - if err != nil { - logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) - } + flushClient(ctx) return nil } @@ -289,24 +297,15 @@ func (t *timer) EnsureSet(timeout time.Duration) { // protocol required by that log target. // For example, a logClient for Loki would encode the log messages in the // JSON format expected by Loki, and send them over HTTP(S). -// -// logClient implementations have some freedom about the semantics of these -// methods. For a buffering client (e.g. HTTP): -// - Write could add the log to the client's internal buffer, calling Flush -// when this buffer reaches capacity. -// - Flush would prepare and send a request with the buffered logs. -// -// For a non-buffering client (e.g. TCP), Write could serialise the log -// directly to the open connection, while Flush would be a no-op. type logClient interface { - // Write adds the given log entry to the client. Depending on the - // implementation of the client, this may send the log to the remote target, - // or simply add the log to an internal buffer, flushing that buffer when - // required. - Write(context.Context, servicelog.Entry) error - - // Flush sends buffered logs (if any) to the remote target. For clients which - // don't buffer logs, Flush should be a no-op. + // AddLog adds the given log entry to the client's buffer. + AddLog(servicelog.Entry) error + + // NumBuffered returns the number of log entries currently buffered in the + // client. + NumBuffered() int + + // Flush sends buffered logs (if any) to the remote target. Flush(context.Context) error } diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index e73bceba..54633d90 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -32,7 +32,7 @@ import ( "github.com/canonical/pebble/internals/servicelog" ) -const maxRequestEntries = 1000 +const maxRequestEntries = 100 var requestTimeout = 10 * time.Second @@ -57,13 +57,8 @@ func NewClient(target *plan.LogTarget) *Client { } } -func (c *Client) Write(ctx context.Context, entry servicelog.Entry) error { +func (c *Client) AddLog(entry servicelog.Entry) error { c.entries[entry.Service] = append(c.entries[entry.Service], encodeEntry(entry)) - c.numEntries++ - - if c.numEntries >= maxRequestEntries { - return c.Flush(ctx) - } return nil } @@ -74,6 +69,10 @@ func encodeEntry(entry servicelog.Entry) lokiEntry { } } +func (c *Client) NumBuffered() int { + return c.numEntries +} + func (c *Client) Flush(ctx context.Context) error { if c.numEntries == 0 { return nil // no-op From f7ef7ff02ccd0571cd6785fcda680d09a4f2dc7a Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 20 Sep 2023 11:15:58 +0700 Subject: [PATCH 19/39] store entries in slice - allows easy truncation --- internals/overlord/logstate/loki/loki.go | 56 +++++++++++++----------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index 54633d90..d5ceb6ee 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -21,7 +21,6 @@ import ( "fmt" "io" "net/http" - "sort" "strconv" "strings" "time" @@ -39,11 +38,7 @@ var requestTimeout = 10 * time.Second type Client struct { targetName string remoteURL string - // buffered entries are "sharded" by service name - entries map[string][]lokiEntry - // Keep track of the number of buffered entries, to avoid having to iterate - // the entries map to get the total number. - numEntries int + entries []lokiEntryWithService httpClient *http.Client } @@ -52,13 +47,16 @@ func NewClient(target *plan.LogTarget) *Client { return &Client{ targetName: target.Name, remoteURL: target.Location, - entries: map[string][]lokiEntry{}, httpClient: &http.Client{Timeout: requestTimeout}, } } func (c *Client) AddLog(entry servicelog.Entry) error { - c.entries[entry.Service] = append(c.entries[entry.Service], encodeEntry(entry)) + c.entries = append(c.entries, lokiEntryWithService{ + entry: encodeEntry(entry), + service: entry.Service, + }) + c.truncateBuffer() return nil } @@ -70,11 +68,11 @@ func encodeEntry(entry servicelog.Entry) lokiEntry { } func (c *Client) NumBuffered() int { - return c.numEntries + return len(c.entries) } func (c *Client) Flush(ctx context.Context) error { - if c.numEntries == 0 { + if c.NumBuffered() == 0 { return nil // no-op } @@ -99,27 +97,30 @@ func (c *Client) Flush(ctx context.Context) error { return c.handleServerResponse(resp) } +// resetBuffer drops all buffered logs (in the case of a successful send, or an +// unrecoverable error). func (c *Client) resetBuffer() { - for svc := range c.entries { - c.entries[svc] = c.entries[svc][:0] + c.entries = c.entries[:0] +} + +// truncateBuffer truncates the buffer to maxRequestEntries, removing the +// oldest logs first. +func (c *Client) truncateBuffer() { + l := len(c.entries) + if l > maxRequestEntries { + c.entries = c.entries[(l - maxRequestEntries):] } - c.numEntries = 0 } func (c *Client) buildRequest() lokiRequest { - // Sort keys to guarantee deterministic output - services := make([]string, 0, len(c.entries)) - for svc, entries := range c.entries { - if len(entries) == 0 { - continue - } - services = append(services, svc) + // Put entries into service "buckets" + bucketedEntries := map[string][]lokiEntry{} + for _, data := range c.entries { + bucketedEntries[data.service] = append(bucketedEntries[data.service], data.entry) } - sort.Strings(services) var req lokiRequest - for _, service := range services { - entries := c.entries[service] + for service, entries := range bucketedEntries { stream := lokiStream{ Labels: map[string]string{ "pebble_service": service, @@ -142,6 +143,11 @@ type lokiStream struct { type lokiEntry [2]string +type lokiEntryWithService struct { + entry lokiEntry + service string +} + // handleServerResponse determines what to do based on the response from the // Loki server. 4xx and 5xx responses indicate errors, so in this case, we will // bubble up the error to the caller. @@ -167,7 +173,7 @@ func (c *Client) handleServerResponse(resp *http.Response) error { case 400 <= code && code < 500: // 4xx indicates a client problem, so drop the logs (retrying won't help) logger.Noticef("Target %q: request failed with status %d, dropping %d logs", - c.targetName, code, c.numEntries) + c.targetName, code, c.NumBuffered()) c.resetBuffer() return errFromResponse(resp) @@ -176,7 +182,7 @@ func (c *Client) handleServerResponse(resp *http.Response) error { return errFromResponse(resp) default: - // Unexpected response + // Unexpected response - don't drop logs to be safe return fmt.Errorf("unexpected response from server: %v", resp.Status) } } From f9fea04aa7c079ac03f77b833d0e51c6ad4fef31 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 20 Sep 2023 11:22:29 +0700 Subject: [PATCH 20/39] fix up the tests - allow overriding maxBufferedEntries in test --- internals/overlord/logstate/gatherer.go | 10 +++++++--- internals/overlord/logstate/gatherer_test.go | 10 ++++++---- internals/overlord/logstate/loki/loki_test.go | 6 +++--- internals/overlord/logstate/manager_test.go | 6 +++++- 4 files changed, 21 insertions(+), 11 deletions(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index aeb469f6..e697dbbb 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -78,8 +78,9 @@ type logGatherer struct { // logGathererArgs allows overriding the newLogClient method and time values // in testing. type logGathererArgs struct { - bufferTimeout time.Duration - timeoutFinalFlush time.Duration + bufferTimeout time.Duration + maxBufferedEntries int + timeoutFinalFlush time.Duration // method to get a new client newClient func(*plan.LogTarget) (logClient, error) } @@ -117,6 +118,9 @@ func fillDefaultArgs(args logGathererArgs) logGathererArgs { if args.bufferTimeout == 0 { args.bufferTimeout = bufferTimeout } + if args.maxBufferedEntries == 0 { + args.maxBufferedEntries = maxBufferedEntries + } if args.timeoutFinalFlush == 0 { args.timeoutFinalFlush = timeoutFinalFlush } @@ -199,7 +203,7 @@ mainLoop: continue } // Check if buffer is full - if g.client.NumBuffered() > maxBufferedEntries { + if g.client.NumBuffered() >= g.maxBufferedEntries { flushClient(g.clientCtx) } // Otherwise, set the timeout diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go index b7220cbe..a9f28856 100644 --- a/internals/overlord/logstate/gatherer_test.go +++ b/internals/overlord/logstate/gatherer_test.go @@ -33,6 +33,7 @@ var _ = Suite(&gathererSuite{}) func (s *gathererSuite) TestGatherer(c *C) { received := make(chan []servicelog.Entry, 1) gathererArgs := logGathererArgs{ + maxBufferedEntries: 5, newClient: func(target *plan.LogTarget) (logClient, error) { return &testClient{ bufferSize: 5, @@ -151,14 +152,15 @@ type testClient struct { sendCh chan []servicelog.Entry } -func (c *testClient) Write(ctx context.Context, entry servicelog.Entry) error { +func (c *testClient) AddLog(entry servicelog.Entry) error { c.buffered = append(c.buffered, entry) - if len(c.buffered) >= c.bufferSize { - return c.Flush(ctx) - } return nil } +func (c *testClient) NumBuffered() int { + return len(c.buffered) +} + func (c *testClient) Flush(ctx context.Context) (err error) { if len(c.buffered) == 0 { return diff --git a/internals/overlord/logstate/loki/loki_test.go b/internals/overlord/logstate/loki/loki_test.go index 1076a69a..f2b79446 100644 --- a/internals/overlord/logstate/loki/loki_test.go +++ b/internals/overlord/logstate/loki/loki_test.go @@ -119,7 +119,7 @@ func (*suite) TestRequest(c *C) { client := loki.NewClient(&plan.LogTarget{Location: server.URL}) for _, entry := range input { - err := client.Write(context.Background(), entry) + err := client.AddLog(entry) c.Assert(err, IsNil) } @@ -140,7 +140,7 @@ func (*suite) TestFlushCancelContext(c *C) { defer killServer() client := loki.NewClient(&plan.LogTarget{Location: server.URL}) - err := client.Write(context.Background(), servicelog.Entry{ + err := client.AddLog(servicelog.Entry{ Time: time.Now(), Service: "svc1", Message: "this is a log line\n", @@ -178,7 +178,7 @@ func (*suite) TestServerTimeout(c *C) { defer close(stopRequest) client := loki.NewClient(&plan.LogTarget{Location: server.URL}) - err := client.Write(context.Background(), servicelog.Entry{ + err := client.AddLog(servicelog.Entry{ Time: time.Now(), Service: "svc1", Message: "this is a log line\n", diff --git a/internals/overlord/logstate/manager_test.go b/internals/overlord/logstate/manager_test.go index 366250be..fc68772d 100644 --- a/internals/overlord/logstate/manager_test.go +++ b/internals/overlord/logstate/manager_test.go @@ -171,11 +171,15 @@ type slowFlushingClient struct { flushTime time.Duration } -func (c *slowFlushingClient) Write(_ context.Context, _ servicelog.Entry) error { +func (c *slowFlushingClient) AddLog(_ servicelog.Entry) error { // no-op return nil } +func (c *slowFlushingClient) NumBuffered() int { + return 0 +} + func (c *slowFlushingClient) Flush(ctx context.Context) error { select { case <-ctx.Done(): From 892c037422bc8934e5c9bc3ba71a150b06a503bb Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 20 Sep 2023 12:50:20 +0700 Subject: [PATCH 21/39] rename AddLog to Add --- internals/overlord/logstate/gatherer.go | 6 +++--- internals/overlord/logstate/gatherer_test.go | 2 +- internals/overlord/logstate/loki/loki.go | 2 +- internals/overlord/logstate/loki/loki_test.go | 6 +++--- internals/overlord/logstate/manager_test.go | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index e697dbbb..f70a9602 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -197,7 +197,7 @@ mainLoop: flushClient(g.clientCtx) case entry := <-g.entryCh: - err := g.client.AddLog(entry) + err := g.client.Add(entry) if err != nil { logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err) continue @@ -302,8 +302,8 @@ func (t *timer) EnsureSet(timeout time.Duration) { // For example, a logClient for Loki would encode the log messages in the // JSON format expected by Loki, and send them over HTTP(S). type logClient interface { - // AddLog adds the given log entry to the client's buffer. - AddLog(servicelog.Entry) error + // Add adds the given log entry to the client's buffer. + Add(servicelog.Entry) error // NumBuffered returns the number of log entries currently buffered in the // client. diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go index a9f28856..e6f9f489 100644 --- a/internals/overlord/logstate/gatherer_test.go +++ b/internals/overlord/logstate/gatherer_test.go @@ -152,7 +152,7 @@ type testClient struct { sendCh chan []servicelog.Entry } -func (c *testClient) AddLog(entry servicelog.Entry) error { +func (c *testClient) Add(entry servicelog.Entry) error { c.buffered = append(c.buffered, entry) return nil } diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index d5ceb6ee..ed00399a 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -51,7 +51,7 @@ func NewClient(target *plan.LogTarget) *Client { } } -func (c *Client) AddLog(entry servicelog.Entry) error { +func (c *Client) Add(entry servicelog.Entry) error { c.entries = append(c.entries, lokiEntryWithService{ entry: encodeEntry(entry), service: entry.Service, diff --git a/internals/overlord/logstate/loki/loki_test.go b/internals/overlord/logstate/loki/loki_test.go index f2b79446..970b5b9c 100644 --- a/internals/overlord/logstate/loki/loki_test.go +++ b/internals/overlord/logstate/loki/loki_test.go @@ -119,7 +119,7 @@ func (*suite) TestRequest(c *C) { client := loki.NewClient(&plan.LogTarget{Location: server.URL}) for _, entry := range input { - err := client.AddLog(entry) + err := client.Add(entry) c.Assert(err, IsNil) } @@ -140,7 +140,7 @@ func (*suite) TestFlushCancelContext(c *C) { defer killServer() client := loki.NewClient(&plan.LogTarget{Location: server.URL}) - err := client.AddLog(servicelog.Entry{ + err := client.Add(servicelog.Entry{ Time: time.Now(), Service: "svc1", Message: "this is a log line\n", @@ -178,7 +178,7 @@ func (*suite) TestServerTimeout(c *C) { defer close(stopRequest) client := loki.NewClient(&plan.LogTarget{Location: server.URL}) - err := client.AddLog(servicelog.Entry{ + err := client.Add(servicelog.Entry{ Time: time.Now(), Service: "svc1", Message: "this is a log line\n", diff --git a/internals/overlord/logstate/manager_test.go b/internals/overlord/logstate/manager_test.go index fc68772d..aeb90866 100644 --- a/internals/overlord/logstate/manager_test.go +++ b/internals/overlord/logstate/manager_test.go @@ -171,7 +171,7 @@ type slowFlushingClient struct { flushTime time.Duration } -func (c *slowFlushingClient) AddLog(_ servicelog.Entry) error { +func (c *slowFlushingClient) Add(_ servicelog.Entry) error { // no-op return nil } From bce2a862cd83b3cea4b8bd5db7be3df33ca1dc5f Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 20 Sep 2023 12:57:51 +0700 Subject: [PATCH 22/39] continue stmt after flush - some minor naming/comment improvements --- internals/overlord/logstate/gatherer.go | 1 + internals/overlord/logstate/loki/loki.go | 10 +++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index f70a9602..1e89b6ab 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -205,6 +205,7 @@ mainLoop: // Check if buffer is full if g.client.NumBuffered() >= g.maxBufferedEntries { flushClient(g.clientCtx) + continue } // Otherwise, set the timeout flushTimer.EnsureSet(g.bufferTimeout) diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index ed00399a..a96ce2b4 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -106,9 +106,9 @@ func (c *Client) resetBuffer() { // truncateBuffer truncates the buffer to maxRequestEntries, removing the // oldest logs first. func (c *Client) truncateBuffer() { - l := len(c.entries) - if l > maxRequestEntries { - c.entries = c.entries[(l - maxRequestEntries):] + numEntries := len(c.entries) + if numEntries > maxRequestEntries { + c.entries = c.entries[(numEntries - maxRequestEntries):] } } @@ -161,7 +161,7 @@ func (c *Client) handleServerResponse(resp *http.Response) error { code := resp.StatusCode switch { - case code == 200 || code == 204: + case code == http.StatusOK || code == http.StatusNoContent: // Success - safe to drop logs c.resetBuffer() return nil @@ -188,7 +188,7 @@ func (c *Client) handleServerResponse(resp *http.Response) error { } // errFromResponse generates an error from a failed *http.Response. -// NB: this function reads the response body. +// Note: this function reads the response body. func errFromResponse(resp *http.Response) error { // Read response body to get more context body, err := io.ReadAll(io.LimitReader(resp.Body, 1024)) From 2dbbfb676303f4a17ffb3d7285dfc3c9e48fa8c4 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 20 Sep 2023 13:12:51 +0700 Subject: [PATCH 23/39] count # of written logs in gatherer - remove NumBuffered method --- internals/overlord/logstate/gatherer.go | 10 +++++----- internals/overlord/logstate/gatherer_test.go | 4 ---- internals/overlord/logstate/loki/loki.go | 8 ++------ internals/overlord/logstate/manager_test.go | 4 ---- 4 files changed, 7 insertions(+), 19 deletions(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index 1e89b6ab..4379a257 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -177,6 +177,8 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R func (g *logGatherer) loop() error { flushTimer := newTimer() defer flushTimer.Stop() + // Keep track of number of logs written since last flush + numWritten := 0 flushClient := func(ctx context.Context) { // Mark timer as unset @@ -185,6 +187,7 @@ func (g *logGatherer) loop() error { if err != nil { logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) } + numWritten = 0 } mainLoop: @@ -202,8 +205,9 @@ mainLoop: logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err) continue } + numWritten++ // Check if buffer is full - if g.client.NumBuffered() >= g.maxBufferedEntries { + if numWritten >= g.maxBufferedEntries { flushClient(g.clientCtx) continue } @@ -306,10 +310,6 @@ type logClient interface { // Add adds the given log entry to the client's buffer. Add(servicelog.Entry) error - // NumBuffered returns the number of log entries currently buffered in the - // client. - NumBuffered() int - // Flush sends buffered logs (if any) to the remote target. Flush(context.Context) error } diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go index e6f9f489..5e769860 100644 --- a/internals/overlord/logstate/gatherer_test.go +++ b/internals/overlord/logstate/gatherer_test.go @@ -157,10 +157,6 @@ func (c *testClient) Add(entry servicelog.Entry) error { return nil } -func (c *testClient) NumBuffered() int { - return len(c.buffered) -} - func (c *testClient) Flush(ctx context.Context) (err error) { if len(c.buffered) == 0 { return diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index a96ce2b4..3077974e 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -67,12 +67,8 @@ func encodeEntry(entry servicelog.Entry) lokiEntry { } } -func (c *Client) NumBuffered() int { - return len(c.entries) -} - func (c *Client) Flush(ctx context.Context) error { - if c.NumBuffered() == 0 { + if len(c.entries) == 0 { return nil // no-op } @@ -173,7 +169,7 @@ func (c *Client) handleServerResponse(resp *http.Response) error { case 400 <= code && code < 500: // 4xx indicates a client problem, so drop the logs (retrying won't help) logger.Noticef("Target %q: request failed with status %d, dropping %d logs", - c.targetName, code, c.NumBuffered()) + c.targetName, code, len(c.entries)) c.resetBuffer() return errFromResponse(resp) diff --git a/internals/overlord/logstate/manager_test.go b/internals/overlord/logstate/manager_test.go index aeb90866..9d6ff3b8 100644 --- a/internals/overlord/logstate/manager_test.go +++ b/internals/overlord/logstate/manager_test.go @@ -176,10 +176,6 @@ func (c *slowFlushingClient) Add(_ servicelog.Entry) error { return nil } -func (c *slowFlushingClient) NumBuffered() int { - return 0 -} - func (c *slowFlushingClient) Flush(ctx context.Context) error { select { case <-ctx.Done(): From 9b72978e4cab3a6d1fca57eeb874d9d75f244d88 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 20 Sep 2023 13:41:13 +0700 Subject: [PATCH 24/39] reallocate buffer periodically to avoid memory leaks --- internals/overlord/logstate/loki/loki.go | 27 ++++++++++++++---------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index 3077974e..c9f59283 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -31,7 +31,13 @@ import ( "github.com/canonical/pebble/internals/servicelog" ) -const maxRequestEntries = 100 +const ( + // maxRequestEntries is the size of the sliding window of entries in the buffer. + maxRequestEntries = 100 + + // reallocBufferThreshold is the size of the buffer's memory. + reallocBufferThreshold = maxRequestEntries * 2 +) var requestTimeout = 10 * time.Second @@ -52,11 +58,19 @@ func NewClient(target *plan.LogTarget) *Client { } func (c *Client) Add(entry servicelog.Entry) error { + if N := len(c.entries); N >= maxRequestEntries { + // make room for 1 entry + c.entries = c.entries[(N - maxRequestEntries + 1):] + } + if cap(c.entries)-len(c.entries) == 0 { + // There is no room left in the slice + // Reallocate the entire buffer to avoid memory leaking over time + c.entries = append(make([]lokiEntryWithService, 0, reallocBufferThreshold), c.entries...) + } c.entries = append(c.entries, lokiEntryWithService{ entry: encodeEntry(entry), service: entry.Service, }) - c.truncateBuffer() return nil } @@ -99,15 +113,6 @@ func (c *Client) resetBuffer() { c.entries = c.entries[:0] } -// truncateBuffer truncates the buffer to maxRequestEntries, removing the -// oldest logs first. -func (c *Client) truncateBuffer() { - numEntries := len(c.entries) - if numEntries > maxRequestEntries { - c.entries = c.entries[(numEntries - maxRequestEntries):] - } -} - func (c *Client) buildRequest() lokiRequest { // Put entries into service "buckets" bucketedEntries := map[string][]lokiEntry{} From fabf8a5c64f66f086719a52aba1adf9875ec0a32 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 20 Sep 2023 15:09:43 +0700 Subject: [PATCH 25/39] add test for retry/truncate logic --- internals/overlord/logstate/gatherer_test.go | 82 +++++++++++++++++++ .../overlord/logstate/loki/export_test.go | 8 ++ internals/overlord/logstate/loki/loki.go | 7 +- 3 files changed, 94 insertions(+), 3 deletions(-) diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go index 5e769860..d047804b 100644 --- a/internals/overlord/logstate/gatherer_test.go +++ b/internals/overlord/logstate/gatherer_test.go @@ -18,6 +18,8 @@ import ( "context" "fmt" "io" + "net/http" + "net/http/httptest" "time" . "gopkg.in/check.v1" @@ -138,6 +140,86 @@ logs in client buffer: %v`, len(g.client.(*testClient).buffered)) } } +func (s *gathererSuite) TestRetryLoki(c *C) { + var handler *func(http.ResponseWriter, *http.Request) + patchHandler := func(f func(http.ResponseWriter, *http.Request)) { + handler = &f + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + (*handler)(w, r) + })) + defer server.Close() + + g, err := newLogGathererInternal( + &plan.LogTarget{ + Name: "tgt1", + Type: plan.LokiTarget, + Location: server.URL, + }, + logGathererArgs{ + bufferTimeout: 1 * time.Millisecond, + maxBufferedEntries: 5, + }, + ) + c.Assert(err, IsNil) + + testSvc := newTestService("svc1") + g.ServiceStarted(testSvc.config, testSvc.ringBuffer) + + reqReceived := make(chan struct{}) + // First attempt: server should return a retryable error + patchHandler(func(w http.ResponseWriter, _ *http.Request) { + close(reqReceived) + w.WriteHeader(http.StatusTooManyRequests) + }) + + testSvc.writeLog("log line #1") + testSvc.writeLog("log line #2") + testSvc.writeLog("log line #3") + testSvc.writeLog("log line #4") + testSvc.writeLog("log line #5") + + // Check that request was received + select { + case <-reqReceived: + case <-time.After(1 * time.Millisecond): + c.Fatalf("timed out waiting for request") + } + + reqReceived = make(chan struct{}) + // Second attempt: check that logs were held over from last time + patchHandler(func(w http.ResponseWriter, r *http.Request) { + close(reqReceived) + reqBody, err := io.ReadAll(r.Body) + c.Assert(err, IsNil) + fmt.Println(string(reqBody)) + + expected := `{"streams":\[{"stream":{"pebble_service":"svc1"},"values":\[` + + //`\["\d+","log line #1"\],` + + //`\["\d+","log line #2"\],` + + `\["\d+","log line #3"\],` + + `\["\d+","log line #4"\],` + + `\["\d+","log line #5"\],` + + `\["\d+","log line #6"\],` + + `\["\d+","log line #7"\]` + + `\]}\]}` + c.Assert(string(reqBody), Matches, expected) + // TODO: lower loki.maxRequestEntries and check truncation + }) + + testSvc.writeLog("log line #6") + testSvc.writeLog("log line #7") + // Wait for flush timeout to elapse + + // Check that request was received + select { + case <-reqReceived: + case <-time.After(5 * time.Millisecond): + c.Fatalf("timed out waiting for request") + } +} + func checkLogs(c *C, received []servicelog.Entry, expected []string) { c.Assert(received, HasLen, len(expected)) for i, entry := range received { diff --git a/internals/overlord/logstate/loki/export_test.go b/internals/overlord/logstate/loki/export_test.go index 17cd1e4d..3d9d458c 100644 --- a/internals/overlord/logstate/loki/export_test.go +++ b/internals/overlord/logstate/loki/export_test.go @@ -23,3 +23,11 @@ func FakeRequestTimeout(new time.Duration) (restore func()) { requestTimeout = oldRequestTimeout } } + +func FakeMaxRequestEntries(new int) (restore func()) { + oldMaxRequestEntries := maxRequestEntries + maxRequestEntries = new + return func() { + maxRequestEntries = oldMaxRequestEntries + } +} diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index c9f59283..2e75d914 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -31,7 +31,10 @@ import ( "github.com/canonical/pebble/internals/servicelog" ) -const ( +// These should be consts, but we need to patch them for testing. +var ( + requestTimeout = 10 * time.Second + // maxRequestEntries is the size of the sliding window of entries in the buffer. maxRequestEntries = 100 @@ -39,8 +42,6 @@ const ( reallocBufferThreshold = maxRequestEntries * 2 ) -var requestTimeout = 10 * time.Second - type Client struct { targetName string remoteURL string From 1c0ed80cf199adb7af418c5da49ed3cea1e384bb Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Thu, 21 Sep 2023 07:38:16 +0700 Subject: [PATCH 26/39] fix gathererSuite.TestRetryLoki - introduce loki.ClientArgs - remove loki testing patch functions --- internals/overlord/logstate/gatherer_test.go | 7 ++- .../overlord/logstate/loki/export_test.go | 33 ------------ internals/overlord/logstate/loki/loki.go | 52 ++++++++++++++----- internals/overlord/logstate/loki/loki_test.go | 10 ++-- 4 files changed, 50 insertions(+), 52 deletions(-) delete mode 100644 internals/overlord/logstate/loki/export_test.go diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go index d047804b..807db490 100644 --- a/internals/overlord/logstate/gatherer_test.go +++ b/internals/overlord/logstate/gatherer_test.go @@ -22,6 +22,7 @@ import ( "net/http/httptest" "time" + "github.com/canonical/pebble/internals/overlord/logstate/loki" . "gopkg.in/check.v1" "github.com/canonical/pebble/internals/plan" @@ -154,12 +155,16 @@ func (s *gathererSuite) TestRetryLoki(c *C) { g, err := newLogGathererInternal( &plan.LogTarget{ Name: "tgt1", - Type: plan.LokiTarget, Location: server.URL, }, logGathererArgs{ bufferTimeout: 1 * time.Millisecond, maxBufferedEntries: 5, + newClient: func(target *plan.LogTarget) (logClient, error) { + return loki.NewClientWithArgs(target, loki.ClientArgs{ + MaxRequestEntries: 5, + }), nil + }, }, ) c.Assert(err, IsNil) diff --git a/internals/overlord/logstate/loki/export_test.go b/internals/overlord/logstate/loki/export_test.go deleted file mode 100644 index 3d9d458c..00000000 --- a/internals/overlord/logstate/loki/export_test.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright (c) 2023 Canonical Ltd -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License version 3 as -// published by the Free Software Foundation. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -package loki - -import "time" - -func FakeRequestTimeout(new time.Duration) (restore func()) { - oldRequestTimeout := requestTimeout - requestTimeout = new - return func() { - requestTimeout = oldRequestTimeout - } -} - -func FakeMaxRequestEntries(new int) (restore func()) { - oldMaxRequestEntries := maxRequestEntries - maxRequestEntries = new - return func() { - maxRequestEntries = oldMaxRequestEntries - } -} diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index 2e75d914..dd1f6090 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "net/http" + "sort" "strconv" "strings" "time" @@ -31,18 +32,16 @@ import ( "github.com/canonical/pebble/internals/servicelog" ) -// These should be consts, but we need to patch them for testing. -var ( +const ( requestTimeout = 10 * time.Second // maxRequestEntries is the size of the sliding window of entries in the buffer. maxRequestEntries = 100 - - // reallocBufferThreshold is the size of the buffer's memory. - reallocBufferThreshold = maxRequestEntries * 2 ) type Client struct { + ClientArgs + targetName string remoteURL string entries []lokiEntryWithService @@ -51,22 +50,39 @@ type Client struct { } func NewClient(target *plan.LogTarget) *Client { + return NewClientWithArgs(target, ClientArgs{}) +} + +// ClientArgs allows overriding default parameters (e.g. for testing) +type ClientArgs struct { + RequestTimeout time.Duration + MaxRequestEntries int +} + +func NewClientWithArgs(target *plan.LogTarget, args ClientArgs) *Client { + args = fillDefaultArgs(args) return &Client{ + ClientArgs: args, targetName: target.Name, remoteURL: target.Location, - httpClient: &http.Client{Timeout: requestTimeout}, + httpClient: &http.Client{Timeout: args.RequestTimeout}, } } +func fillDefaultArgs(args ClientArgs) ClientArgs { + if args.RequestTimeout == 0 { + args.RequestTimeout = requestTimeout + } + if args.MaxRequestEntries == 0 { + args.MaxRequestEntries = maxRequestEntries + } + return args +} + func (c *Client) Add(entry servicelog.Entry) error { - if N := len(c.entries); N >= maxRequestEntries { + if N := len(c.entries); N >= c.MaxRequestEntries { // make room for 1 entry - c.entries = c.entries[(N - maxRequestEntries + 1):] - } - if cap(c.entries)-len(c.entries) == 0 { - // There is no room left in the slice - // Reallocate the entire buffer to avoid memory leaking over time - c.entries = append(make([]lokiEntryWithService, 0, reallocBufferThreshold), c.entries...) + c.entries = c.entries[(N - c.MaxRequestEntries + 1):] } c.entries = append(c.entries, lokiEntryWithService{ entry: encodeEntry(entry), @@ -121,8 +137,16 @@ func (c *Client) buildRequest() lokiRequest { bucketedEntries[data.service] = append(bucketedEntries[data.service], data.entry) } + // Sort service names to guarantee deterministic output + var services []string + for service := range bucketedEntries { + services = append(services, service) + } + sort.Strings(services) + var req lokiRequest - for service, entries := range bucketedEntries { + for _, service := range services { + entries := bucketedEntries[service] stream := lokiStream{ Labels: map[string]string{ "pebble_service": service, diff --git a/internals/overlord/logstate/loki/loki_test.go b/internals/overlord/logstate/loki/loki_test.go index 970b5b9c..e2401fc5 100644 --- a/internals/overlord/logstate/loki/loki_test.go +++ b/internals/overlord/logstate/loki/loki_test.go @@ -167,9 +167,6 @@ func (*suite) TestFlushCancelContext(c *C) { } func (*suite) TestServerTimeout(c *C) { - restore := loki.FakeRequestTimeout(1 * time.Microsecond) - defer restore() - stopRequest := make(chan struct{}) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { <-stopRequest @@ -177,7 +174,12 @@ func (*suite) TestServerTimeout(c *C) { defer server.Close() defer close(stopRequest) - client := loki.NewClient(&plan.LogTarget{Location: server.URL}) + client := loki.NewClientWithArgs( + &plan.LogTarget{Location: server.URL}, + loki.ClientArgs{ + RequestTimeout: 1 * time.Microsecond, + }, + ) err := client.Add(servicelog.Entry{ Time: time.Now(), Service: "svc1", From 8ea3930fd99d668c79fea41251b9542810d136b0 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Thu, 21 Sep 2023 07:40:42 +0700 Subject: [PATCH 27/39] fix imports --- internals/overlord/logstate/gatherer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go index 807db490..8cef67f6 100644 --- a/internals/overlord/logstate/gatherer_test.go +++ b/internals/overlord/logstate/gatherer_test.go @@ -22,9 +22,9 @@ import ( "net/http/httptest" "time" - "github.com/canonical/pebble/internals/overlord/logstate/loki" . "gopkg.in/check.v1" + "github.com/canonical/pebble/internals/overlord/logstate/loki" "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" ) From 6a733c0f4d6eb86469182379dbd9bdcc5ed38e3c Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Thu, 21 Sep 2023 08:42:02 +0700 Subject: [PATCH 28/39] increase timeouts to improve test reliability --- internals/overlord/logstate/gatherer_test.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go index 8cef67f6..9872e6a8 100644 --- a/internals/overlord/logstate/gatherer_test.go +++ b/internals/overlord/logstate/gatherer_test.go @@ -63,7 +63,7 @@ func (s *gathererSuite) TestGatherer(c *C) { testSvc.writeLog("log line #5") select { - case <-time.After(5 * time.Millisecond): + case <-time.After(10 * time.Millisecond): c.Fatalf("timeout waiting for logs") case logs := <-received: checkLogs(c, logs, []string{"log line #1", "log line #2", "log line #3", "log line #4", "log line #5"}) @@ -188,7 +188,7 @@ func (s *gathererSuite) TestRetryLoki(c *C) { // Check that request was received select { case <-reqReceived: - case <-time.After(1 * time.Millisecond): + case <-time.After(100 * time.Millisecond): c.Fatalf("timed out waiting for request") } @@ -198,11 +198,9 @@ func (s *gathererSuite) TestRetryLoki(c *C) { close(reqReceived) reqBody, err := io.ReadAll(r.Body) c.Assert(err, IsNil) - fmt.Println(string(reqBody)) expected := `{"streams":\[{"stream":{"pebble_service":"svc1"},"values":\[` + - //`\["\d+","log line #1"\],` + - //`\["\d+","log line #2"\],` + + // First two log lines should have been truncated `\["\d+","log line #3"\],` + `\["\d+","log line #4"\],` + `\["\d+","log line #5"\],` + @@ -210,7 +208,6 @@ func (s *gathererSuite) TestRetryLoki(c *C) { `\["\d+","log line #7"\]` + `\]}\]}` c.Assert(string(reqBody), Matches, expected) - // TODO: lower loki.maxRequestEntries and check truncation }) testSvc.writeLog("log line #6") @@ -220,7 +217,7 @@ func (s *gathererSuite) TestRetryLoki(c *C) { // Check that request was received select { case <-reqReceived: - case <-time.After(5 * time.Millisecond): + case <-time.After(100 * time.Millisecond): c.Fatalf("timed out waiting for request") } } From 3c6e17250521bda934ec52c3d6af169c6da51665 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Thu, 21 Sep 2023 08:58:54 +0700 Subject: [PATCH 29/39] remove unnecessary comment --- internals/overlord/logstate/loki/loki.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index dd1f6090..fe323810 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -33,9 +33,7 @@ import ( ) const ( - requestTimeout = 10 * time.Second - - // maxRequestEntries is the size of the sliding window of entries in the buffer. + requestTimeout = 10 * time.Second maxRequestEntries = 100 ) From 4dab1dd342f103f147bf771162dadfef91afd669 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Thu, 21 Sep 2023 09:02:23 +0700 Subject: [PATCH 30/39] bump test timeouts to 1 sec --- internals/overlord/logstate/gatherer_test.go | 8 ++++---- internals/overlord/logstate/manager_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go index 9872e6a8..bba6fc8a 100644 --- a/internals/overlord/logstate/gatherer_test.go +++ b/internals/overlord/logstate/gatherer_test.go @@ -63,7 +63,7 @@ func (s *gathererSuite) TestGatherer(c *C) { testSvc.writeLog("log line #5") select { - case <-time.After(10 * time.Millisecond): + case <-time.After(1 * time.Second): c.Fatalf("timeout waiting for logs") case logs := <-received: checkLogs(c, logs, []string{"log line #1", "log line #2", "log line #3", "log line #4", "log line #5"}) @@ -126,7 +126,7 @@ func (s *gathererSuite) TestGathererShutdown(c *C) { }() select { - case <-time.After(100 * time.Millisecond): + case <-time.After(1 * time.Second): c.Fatalf("timeout waiting for gatherer to tear down") case <-hasShutdown: } @@ -188,7 +188,7 @@ func (s *gathererSuite) TestRetryLoki(c *C) { // Check that request was received select { case <-reqReceived: - case <-time.After(100 * time.Millisecond): + case <-time.After(1 * time.Second): c.Fatalf("timed out waiting for request") } @@ -217,7 +217,7 @@ func (s *gathererSuite) TestRetryLoki(c *C) { // Check that request was received select { case <-reqReceived: - case <-time.After(100 * time.Millisecond): + case <-time.After(1 * time.Second): c.Fatalf("timed out waiting for request") } } diff --git a/internals/overlord/logstate/manager_test.go b/internals/overlord/logstate/manager_test.go index 9d6ff3b8..c1dbf791 100644 --- a/internals/overlord/logstate/manager_test.go +++ b/internals/overlord/logstate/manager_test.go @@ -162,7 +162,7 @@ func (s *managerSuite) TestTimelyShutdown(c *C) { }() select { case <-done: - case <-time.After(50 * time.Millisecond): + case <-time.After(1 * time.Second): c.Fatal("LogManager.Stop() took too long") } } From 084544bc4c1a0a1ef10a97c5a41e25bced72f3e8 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Thu, 21 Sep 2023 11:01:37 +0700 Subject: [PATCH 31/39] use json.Compact in tests --- internals/overlord/logstate/loki/loki_test.go | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/internals/overlord/logstate/loki/loki_test.go b/internals/overlord/logstate/loki/loki_test.go index e2401fc5..6037f2da 100644 --- a/internals/overlord/logstate/loki/loki_test.go +++ b/internals/overlord/logstate/loki/loki_test.go @@ -15,8 +15,10 @@ package loki_test import ( + "bytes" "context" "encoding/json" + "fmt" "io" "net/http" "net/http/httptest" @@ -77,7 +79,7 @@ func (*suite) TestRequest(c *C) { Message: "log line #9\n", }} - expected := ` + expected := compactJSON(` {"streams": [{ "stream": {"pebble_service": "svc1"}, "values": [ @@ -103,7 +105,7 @@ func (*suite) TestRequest(c *C) { "values": [ [ "1704026098000000000", "log line #9" ] ] -}]}` +}]}`) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { c.Assert(r.Method, Equals, http.MethodPost) @@ -111,9 +113,7 @@ func (*suite) TestRequest(c *C) { reqBody, err := io.ReadAll(r.Body) c.Assert(err, IsNil) - expFlattened, err := flattenJSON(expected) - c.Assert(err, IsNil) - c.Assert(string(reqBody), Equals, expFlattened) + c.Assert(reqBody, DeepEquals, expected) })) defer server.Close() @@ -192,13 +192,11 @@ func (*suite) TestServerTimeout(c *C) { } // Strips all extraneous whitespace from JSON -func flattenJSON(s string) (string, error) { - var v any - err := json.Unmarshal([]byte(s), &v) +func compactJSON(s string) []byte { + var buf bytes.Buffer + err := json.Compact(&buf, []byte(s)) if err != nil { - return "", err + panic(fmt.Sprintf("error compacting JSON: %v", err)) } - - b, err := json.Marshal(v) - return string(b), err + return buf.Bytes() } From a5ab88c78796ff396404c0ffe6e64019740629ec Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Thu, 21 Sep 2023 11:51:16 +0700 Subject: [PATCH 32/39] loki test: handler doesn't need to be pointer --- internals/overlord/logstate/gatherer_test.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go index bba6fc8a..b2800ca6 100644 --- a/internals/overlord/logstate/gatherer_test.go +++ b/internals/overlord/logstate/gatherer_test.go @@ -142,13 +142,9 @@ logs in client buffer: %v`, len(g.client.(*testClient).buffered)) } func (s *gathererSuite) TestRetryLoki(c *C) { - var handler *func(http.ResponseWriter, *http.Request) - patchHandler := func(f func(http.ResponseWriter, *http.Request)) { - handler = &f - } - + var handler func(http.ResponseWriter, *http.Request) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - (*handler)(w, r) + handler(w, r) })) defer server.Close() @@ -174,10 +170,10 @@ func (s *gathererSuite) TestRetryLoki(c *C) { reqReceived := make(chan struct{}) // First attempt: server should return a retryable error - patchHandler(func(w http.ResponseWriter, _ *http.Request) { + handler = func(w http.ResponseWriter, _ *http.Request) { close(reqReceived) w.WriteHeader(http.StatusTooManyRequests) - }) + } testSvc.writeLog("log line #1") testSvc.writeLog("log line #2") @@ -194,7 +190,7 @@ func (s *gathererSuite) TestRetryLoki(c *C) { reqReceived = make(chan struct{}) // Second attempt: check that logs were held over from last time - patchHandler(func(w http.ResponseWriter, r *http.Request) { + handler = func(w http.ResponseWriter, r *http.Request) { close(reqReceived) reqBody, err := io.ReadAll(r.Body) c.Assert(err, IsNil) @@ -208,7 +204,7 @@ func (s *gathererSuite) TestRetryLoki(c *C) { `\["\d+","log line #7"\]` + `\]}\]}` c.Assert(string(reqBody), Matches, expected) - }) + } testSvc.writeLog("log line #6") testSvc.writeLog("log line #7") From c4e153c399cf8c584be4a379716d2fa245094c66 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Fri, 22 Sep 2023 07:35:57 +0700 Subject: [PATCH 33/39] Address Gustavo's review comments - ClientArgs -> options *ClientOptions - rename N -> n - explicitly reallocate buffer when full --- internals/overlord/logstate/loki/loki.go | 39 ++++++++++++++---------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index fe323810..9f48781e 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -38,7 +38,7 @@ const ( ) type Client struct { - ClientArgs + options *ClientOptions targetName string remoteURL string @@ -48,39 +48,46 @@ type Client struct { } func NewClient(target *plan.LogTarget) *Client { - return NewClientWithArgs(target, ClientArgs{}) + return NewClientWithArgs(target, &ClientOptions{}) } -// ClientArgs allows overriding default parameters (e.g. for testing) -type ClientArgs struct { +// ClientOptions allows overriding default parameters (e.g. for testing) +type ClientOptions struct { RequestTimeout time.Duration MaxRequestEntries int } -func NewClientWithArgs(target *plan.LogTarget, args ClientArgs) *Client { - args = fillDefaultArgs(args) +func NewClientWithArgs(target *plan.LogTarget, options *ClientOptions) *Client { + options = fillDefaultOptions(options) return &Client{ - ClientArgs: args, + options: options, targetName: target.Name, remoteURL: target.Location, - httpClient: &http.Client{Timeout: args.RequestTimeout}, + httpClient: &http.Client{Timeout: options.RequestTimeout}, } } -func fillDefaultArgs(args ClientArgs) ClientArgs { - if args.RequestTimeout == 0 { - args.RequestTimeout = requestTimeout +func fillDefaultOptions(options *ClientOptions) *ClientOptions { + if options.RequestTimeout == 0 { + options.RequestTimeout = requestTimeout } - if args.MaxRequestEntries == 0 { - args.MaxRequestEntries = maxRequestEntries + if options.MaxRequestEntries == 0 { + options.MaxRequestEntries = maxRequestEntries } - return args + return options } func (c *Client) Add(entry servicelog.Entry) error { - if N := len(c.entries); N >= c.MaxRequestEntries { + if n := len(c.entries); n >= c.options.MaxRequestEntries { // make room for 1 entry - c.entries = c.entries[(N - c.MaxRequestEntries + 1):] + c.entries = c.entries[(n - c.options.MaxRequestEntries + 1):] + } + if cap(c.entries)-len(c.entries) == 0 { + // Allocate a new slice with capacity equal to double the buffer size, and + // copy over the currently buffered logs + c.entries = append( + make([]lokiEntryWithService, 0, 2*c.options.MaxRequestEntries), + c.entries...) } c.entries = append(c.entries, lokiEntryWithService{ entry: encodeEntry(entry), From 164f9ad2bb60211c812f837c9b91e1f42671596e Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Fri, 22 Sep 2023 07:55:31 +0700 Subject: [PATCH 34/39] Rename logGathererArgs -> *logGathererOptions --- internals/overlord/logstate/gatherer.go | 36 +++++++++---------- internals/overlord/logstate/gatherer_test.go | 16 ++++----- internals/overlord/logstate/loki/loki.go | 4 +-- internals/overlord/logstate/loki/loki_test.go | 4 +-- internals/overlord/logstate/manager_test.go | 8 ++--- 5 files changed, 34 insertions(+), 34 deletions(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index 4379a257..cde95306 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -57,7 +57,7 @@ const ( // Calling the Stop() method will tear down the logGatherer and all of its // associated logPullers. Stop() can be called from an outside goroutine. type logGatherer struct { - logGathererArgs + *logGathererOptions targetName string // tomb for the main loop @@ -75,9 +75,9 @@ type logGatherer struct { entryCh chan servicelog.Entry } -// logGathererArgs allows overriding the newLogClient method and time values +// logGathererOptions allows overriding the newLogClient method and time values // in testing. -type logGathererArgs struct { +type logGathererOptions struct { bufferTimeout time.Duration maxBufferedEntries int timeoutFinalFlush time.Duration @@ -86,21 +86,21 @@ type logGathererArgs struct { } func newLogGatherer(target *plan.LogTarget) (*logGatherer, error) { - return newLogGathererInternal(target, logGathererArgs{}) + return newLogGathererInternal(target, &logGathererOptions{}) } // newLogGathererInternal contains the actual creation code for a logGatherer. // This function is used in the real implementation, but also allows overriding // certain configuration values for testing. -func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logGatherer, error) { - args = fillDefaultArgs(args) - client, err := args.newClient(target) +func newLogGathererInternal(target *plan.LogTarget, options *logGathererOptions) (*logGatherer, error) { + options = fillDefaultOptions(options) + client, err := options.newClient(target) if err != nil { return nil, fmt.Errorf("cannot create log client: %w", err) } g := &logGatherer{ - logGathererArgs: args, + logGathererOptions: options, targetName: target.Name, client: client, @@ -114,20 +114,20 @@ func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logG return g, nil } -func fillDefaultArgs(args logGathererArgs) logGathererArgs { - if args.bufferTimeout == 0 { - args.bufferTimeout = bufferTimeout +func fillDefaultOptions(options *logGathererOptions) *logGathererOptions { + if options.bufferTimeout == 0 { + options.bufferTimeout = bufferTimeout } - if args.maxBufferedEntries == 0 { - args.maxBufferedEntries = maxBufferedEntries + if options.maxBufferedEntries == 0 { + options.maxBufferedEntries = maxBufferedEntries } - if args.timeoutFinalFlush == 0 { - args.timeoutFinalFlush = timeoutFinalFlush + if options.timeoutFinalFlush == 0 { + options.timeoutFinalFlush = timeoutFinalFlush } - if args.newClient == nil { - args.newClient = newLogClient + if options.newClient == nil { + options.newClient = newLogClient } - return args + return options } // PlanChanged is called by the LogManager when the plan is changed, if this diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go index b2800ca6..956f22f3 100644 --- a/internals/overlord/logstate/gatherer_test.go +++ b/internals/overlord/logstate/gatherer_test.go @@ -35,7 +35,7 @@ var _ = Suite(&gathererSuite{}) func (s *gathererSuite) TestGatherer(c *C) { received := make(chan []servicelog.Entry, 1) - gathererArgs := logGathererArgs{ + gathererOptions := logGathererOptions{ maxBufferedEntries: 5, newClient: func(target *plan.LogTarget) (logClient, error) { return &testClient{ @@ -45,7 +45,7 @@ func (s *gathererSuite) TestGatherer(c *C) { }, } - g, err := newLogGathererInternal(&plan.LogTarget{Name: "tgt1"}, gathererArgs) + g, err := newLogGathererInternal(&plan.LogTarget{Name: "tgt1"}, &gathererOptions) c.Assert(err, IsNil) testSvc := newTestService("svc1") @@ -72,7 +72,7 @@ func (s *gathererSuite) TestGatherer(c *C) { func (s *gathererSuite) TestGathererTimeout(c *C) { received := make(chan []servicelog.Entry, 1) - gathererArgs := logGathererArgs{ + gathererOptions := logGathererOptions{ bufferTimeout: 1 * time.Millisecond, newClient: func(target *plan.LogTarget) (logClient, error) { return &testClient{ @@ -82,7 +82,7 @@ func (s *gathererSuite) TestGathererTimeout(c *C) { }, } - g, err := newLogGathererInternal(&plan.LogTarget{Name: "tgt1"}, gathererArgs) + g, err := newLogGathererInternal(&plan.LogTarget{Name: "tgt1"}, &gathererOptions) c.Assert(err, IsNil) testSvc := newTestService("svc1") @@ -99,7 +99,7 @@ func (s *gathererSuite) TestGathererTimeout(c *C) { func (s *gathererSuite) TestGathererShutdown(c *C) { received := make(chan []servicelog.Entry, 1) - gathererArgs := logGathererArgs{ + gathererOptions := logGathererOptions{ bufferTimeout: 1 * time.Microsecond, newClient: func(target *plan.LogTarget) (logClient, error) { return &testClient{ @@ -109,7 +109,7 @@ func (s *gathererSuite) TestGathererShutdown(c *C) { }, } - g, err := newLogGathererInternal(&plan.LogTarget{Name: "tgt1"}, gathererArgs) + g, err := newLogGathererInternal(&plan.LogTarget{Name: "tgt1"}, &gathererOptions) c.Assert(err, IsNil) testSvc := newTestService("svc1") @@ -153,11 +153,11 @@ func (s *gathererSuite) TestRetryLoki(c *C) { Name: "tgt1", Location: server.URL, }, - logGathererArgs{ + &logGathererOptions{ bufferTimeout: 1 * time.Millisecond, maxBufferedEntries: 5, newClient: func(target *plan.LogTarget) (logClient, error) { - return loki.NewClientWithArgs(target, loki.ClientArgs{ + return loki.NewClientWithOptions(target, &loki.ClientOptions{ MaxRequestEntries: 5, }), nil }, diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index 9f48781e..49d51b57 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -48,7 +48,7 @@ type Client struct { } func NewClient(target *plan.LogTarget) *Client { - return NewClientWithArgs(target, &ClientOptions{}) + return NewClientWithOptions(target, &ClientOptions{}) } // ClientOptions allows overriding default parameters (e.g. for testing) @@ -57,7 +57,7 @@ type ClientOptions struct { MaxRequestEntries int } -func NewClientWithArgs(target *plan.LogTarget, options *ClientOptions) *Client { +func NewClientWithOptions(target *plan.LogTarget, options *ClientOptions) *Client { options = fillDefaultOptions(options) return &Client{ options: options, diff --git a/internals/overlord/logstate/loki/loki_test.go b/internals/overlord/logstate/loki/loki_test.go index 6037f2da..41aa44a6 100644 --- a/internals/overlord/logstate/loki/loki_test.go +++ b/internals/overlord/logstate/loki/loki_test.go @@ -174,9 +174,9 @@ func (*suite) TestServerTimeout(c *C) { defer server.Close() defer close(stopRequest) - client := loki.NewClientWithArgs( + client := loki.NewClientWithOptions( &plan.LogTarget{Location: server.URL}, - loki.ClientArgs{ + &loki.ClientOptions{ RequestTimeout: 1 * time.Microsecond, }, ) diff --git a/internals/overlord/logstate/manager_test.go b/internals/overlord/logstate/manager_test.go index c1dbf791..73ef8dfb 100644 --- a/internals/overlord/logstate/manager_test.go +++ b/internals/overlord/logstate/manager_test.go @@ -30,14 +30,14 @@ type managerSuite struct{} var _ = Suite(&managerSuite{}) func (s *managerSuite) TestPlanChange(c *C) { - gathererArgs := logGathererArgs{ + gathererOptions := logGathererOptions{ newClient: func(target *plan.LogTarget) (logClient, error) { return &testClient{}, nil }, } m := NewLogManager() m.newGatherer = func(t *plan.LogTarget) (*logGatherer, error) { - return newLogGathererInternal(t, gathererArgs) + return newLogGathererInternal(t, &gathererOptions) } svc1 := newTestService("svc1") @@ -116,7 +116,7 @@ func checkBuffers(c *C, buffers map[string]*servicelog.RingBuffer, expected []st } func (s *managerSuite) TestTimelyShutdown(c *C) { - gathererArgs := logGathererArgs{ + gathererOptions := logGathererOptions{ timeoutFinalFlush: 5 * time.Millisecond, newClient: func(target *plan.LogTarget) (logClient, error) { return &slowFlushingClient{ @@ -127,7 +127,7 @@ func (s *managerSuite) TestTimelyShutdown(c *C) { m := NewLogManager() m.newGatherer = func(t *plan.LogTarget) (*logGatherer, error) { - return newLogGathererInternal(t, gathererArgs) + return newLogGathererInternal(t, &gathererOptions) } svc1 := newTestService("svc1") From c6b27b03c6589231696ee76fee345461fdbe0029 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Fri, 22 Sep 2023 10:17:40 +0700 Subject: [PATCH 35/39] tweak 4xx comment --- internals/overlord/logstate/loki/loki.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index 49d51b57..f8b61e92 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -202,7 +202,7 @@ func (c *Client) handleServerResponse(resp *http.Response) error { return errFromResponse(resp) case 400 <= code && code < 500: - // 4xx indicates a client problem, so drop the logs (retrying won't help) + // Other 4xx codes indicate a client problem, so drop the logs (retrying won't help) logger.Noticef("Target %q: request failed with status %d, dropping %d logs", c.targetName, code, len(c.entries)) c.resetBuffer() From e80cfcd7ec03486fc3b3b16dd87a0825fef51be2 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Fri, 22 Sep 2023 16:11:16 +0700 Subject: [PATCH 36/39] when truncating, zero the element to allow GC --- internals/overlord/logstate/loki/loki.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index f8b61e92..abbc71c5 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -79,8 +79,10 @@ func fillDefaultOptions(options *ClientOptions) *ClientOptions { func (c *Client) Add(entry servicelog.Entry) error { if n := len(c.entries); n >= c.options.MaxRequestEntries { - // make room for 1 entry - c.entries = c.entries[(n - c.options.MaxRequestEntries + 1):] + // Buffer full - remove the first element to make room + // Zero the removed element to allow garbage collection + c.entries[0] = lokiEntryWithService{} + c.entries = c.entries[1:] } if cap(c.entries)-len(c.entries) == 0 { // Allocate a new slice with capacity equal to double the buffer size, and From 159bbe92723d51998838668eda4be99da1d96dba Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Mon, 25 Sep 2023 09:20:17 +0700 Subject: [PATCH 37/39] reuse same buffer instead of reallocating --- internals/overlord/logstate/loki/loki.go | 34 +++++++++++++++++------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go index abbc71c5..d3f37bfd 100644 --- a/internals/overlord/logstate/loki/loki.go +++ b/internals/overlord/logstate/loki/loki.go @@ -42,9 +42,12 @@ type Client struct { targetName string remoteURL string - entries []lokiEntryWithService - httpClient *http.Client + + // To store log entries, keep a buffer of size 2*MaxRequestEntries with a + // sliding window 'entries' of size MaxRequestEntries + buffer []lokiEntryWithService + entries []lokiEntryWithService } func NewClient(target *plan.LogTarget) *Client { @@ -59,12 +62,16 @@ type ClientOptions struct { func NewClientWithOptions(target *plan.LogTarget, options *ClientOptions) *Client { options = fillDefaultOptions(options) - return &Client{ + c := &Client{ options: options, targetName: target.Name, remoteURL: target.Location, httpClient: &http.Client{Timeout: options.RequestTimeout}, + buffer: make([]lokiEntryWithService, 2*options.MaxRequestEntries), } + // c.entries should be backed by the same array as c.buffer + c.entries = c.buffer[0:0:len(c.buffer)] + return c } func fillDefaultOptions(options *ClientOptions) *ClientOptions { @@ -79,18 +86,25 @@ func fillDefaultOptions(options *ClientOptions) *ClientOptions { func (c *Client) Add(entry servicelog.Entry) error { if n := len(c.entries); n >= c.options.MaxRequestEntries { - // Buffer full - remove the first element to make room + // 'entries' is full - remove the first element to make room // Zero the removed element to allow garbage collection c.entries[0] = lokiEntryWithService{} c.entries = c.entries[1:] } - if cap(c.entries)-len(c.entries) == 0 { - // Allocate a new slice with capacity equal to double the buffer size, and - // copy over the currently buffered logs - c.entries = append( - make([]lokiEntryWithService, 0, 2*c.options.MaxRequestEntries), - c.entries...) + + if len(c.entries) >= cap(c.entries) { + // Copy all the elements to the start of the buffer + copy(c.buffer, c.entries) + + // Reset the view into the buffer + c.entries = c.buffer[0:len(c.entries):len(c.buffer)] + + // Zero removed elements to allow garbage collection + for i := len(c.entries); i < len(c.buffer); i++ { + c.buffer[i] = lokiEntryWithService{} + } } + c.entries = append(c.entries, lokiEntryWithService{ entry: encodeEntry(entry), service: entry.Service, From a94d8ff02d142f2dd9a268b97f7be8ff958c5059 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Mon, 25 Sep 2023 10:12:43 +0700 Subject: [PATCH 38/39] add test for buffer recycling --- .../overlord/logstate/loki/export_test.go | 25 ++++++++ internals/overlord/logstate/loki/loki_test.go | 59 +++++++++++++++++++ 2 files changed, 84 insertions(+) create mode 100644 internals/overlord/logstate/loki/export_test.go diff --git a/internals/overlord/logstate/loki/export_test.go b/internals/overlord/logstate/loki/export_test.go new file mode 100644 index 00000000..8343c361 --- /dev/null +++ b/internals/overlord/logstate/loki/export_test.go @@ -0,0 +1,25 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package loki + +type LokiEntryWithService = lokiEntryWithService + +func GetBuffer(c *Client) []LokiEntryWithService { + return c.buffer +} + +func GetMessage(e LokiEntryWithService) string { + return e.entry[1] +} diff --git a/internals/overlord/logstate/loki/loki_test.go b/internals/overlord/logstate/loki/loki_test.go index 41aa44a6..a3e73d2f 100644 --- a/internals/overlord/logstate/loki/loki_test.go +++ b/internals/overlord/logstate/loki/loki_test.go @@ -191,6 +191,65 @@ func (*suite) TestServerTimeout(c *C) { c.Assert(err, ErrorMatches, ".*context deadline exceeded.*") } +func (*suite) TestBufferFull(c *C) { + client := loki.NewClientWithOptions( + &plan.LogTarget{ + Name: "tgt1", + Location: "fake", + }, + &loki.ClientOptions{ + MaxRequestEntries: 3, + }, + ) + + checkBuffer := func(expected []any) { + checkBuffer(c, client, expected) + } + addEntry := func(s string) { + err := client.Add(servicelog.Entry{Message: s}) + c.Assert(err, IsNil) + } + + checkBuffer([]any{nil, nil, nil, nil, nil, nil}) + addEntry("1") + checkBuffer([]any{"1", nil, nil, nil, nil, nil}) + addEntry("2") + checkBuffer([]any{"1", "2", nil, nil, nil, nil}) + addEntry("3") + checkBuffer([]any{"1", "2", "3", nil, nil, nil}) + addEntry("4") + checkBuffer([]any{nil, "2", "3", "4", nil, nil}) + addEntry("5") + checkBuffer([]any{nil, nil, "3", "4", "5", nil}) + addEntry("6") + checkBuffer([]any{nil, nil, nil, "4", "5", "6"}) + addEntry("7") + checkBuffer([]any{"5", "6", "7", nil, nil, nil}) +} + +// Check if the client's buffer is as expected +func checkBuffer(c *C, client *loki.Client, expected []any) { + buffer := loki.GetBuffer(client) + if len(buffer) != len(expected) { + c.Fatalf("buffer length is %v, expected %v", len(buffer), len(expected)) + } + + for i := range expected { + // 'nil' means c.buffer[i] should be zero + if expected[i] == nil { + c.Assert(buffer[i], DeepEquals, loki.LokiEntryWithService{}, + Commentf("buffer[%d] should be zero, obtained %v", i, buffer[i])) + + } else if msg, ok := expected[i].(string); ok { + // Check buffer message matches + c.Assert(loki.GetMessage(buffer[i]), Equals, msg) + + } else { + c.Fatalf("invalid value expected[%d] = %v %T", i, expected[i], expected[i]) + } + } +} + // Strips all extraneous whitespace from JSON func compactJSON(s string) []byte { var buf bytes.Buffer From 20c76b2f6e9d4eba178043e19aa6b068c27d8266 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Mon, 25 Sep 2023 11:14:48 +0700 Subject: [PATCH 39/39] Address Ben's comments on testing - inline checkBuffer - coerce to string w/o check --- internals/overlord/logstate/loki/loki_test.go | 47 +++++++++---------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/internals/overlord/logstate/loki/loki_test.go b/internals/overlord/logstate/loki/loki_test.go index a3e73d2f..3baef17e 100644 --- a/internals/overlord/logstate/loki/loki_test.go +++ b/internals/overlord/logstate/loki/loki_test.go @@ -202,14 +202,32 @@ func (*suite) TestBufferFull(c *C) { }, ) - checkBuffer := func(expected []any) { - checkBuffer(c, client, expected) - } addEntry := func(s string) { err := client.Add(servicelog.Entry{Message: s}) c.Assert(err, IsNil) } + // Check that the client's buffer is as expected + buffer := loki.GetBuffer(client) + checkBuffer := func(expected []any) { + if len(buffer) != len(expected) { + c.Fatalf("buffer length is %v, expected %v", len(buffer), len(expected)) + } + + for i := range expected { + // 'nil' means c.buffer[i] should be zero + if expected[i] == nil { + c.Assert(buffer[i], DeepEquals, loki.LokiEntryWithService{}, + Commentf("buffer[%d] should be zero, obtained %v", i, buffer[i])) + continue + } + + // Otherwise, check buffer message matches string + msg := expected[i].(string) + c.Assert(loki.GetMessage(buffer[i]), Equals, msg) + } + } + checkBuffer([]any{nil, nil, nil, nil, nil, nil}) addEntry("1") checkBuffer([]any{"1", nil, nil, nil, nil, nil}) @@ -227,29 +245,6 @@ func (*suite) TestBufferFull(c *C) { checkBuffer([]any{"5", "6", "7", nil, nil, nil}) } -// Check if the client's buffer is as expected -func checkBuffer(c *C, client *loki.Client, expected []any) { - buffer := loki.GetBuffer(client) - if len(buffer) != len(expected) { - c.Fatalf("buffer length is %v, expected %v", len(buffer), len(expected)) - } - - for i := range expected { - // 'nil' means c.buffer[i] should be zero - if expected[i] == nil { - c.Assert(buffer[i], DeepEquals, loki.LokiEntryWithService{}, - Commentf("buffer[%d] should be zero, obtained %v", i, buffer[i])) - - } else if msg, ok := expected[i].(string); ok { - // Check buffer message matches - c.Assert(loki.GetMessage(buffer[i]), Equals, msg) - - } else { - c.Fatalf("invalid value expected[%d] = %v %T", i, expected[i], expected[i]) - } - } -} - // Strips all extraneous whitespace from JSON func compactJSON(s string) []byte { var buf bytes.Buffer