From f56d4ada065da854e822946049e6b9be64fddc84 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Mon, 10 Jul 2023 15:25:39 +1200 Subject: [PATCH] Implement Loki client --- .github/.golangci.yml | 4 +- .../overlord/logstate/clienterr/clienterr.go | 81 ++++++ internals/overlord/logstate/gatherer.go | 59 +++-- .../overlord/logstate/loki/export_test.go | 25 ++ internals/overlord/logstate/loki/loki.go | 201 +++++++++++++++ internals/overlord/logstate/loki/loki_test.go | 241 ++++++++++++++++++ 6 files changed, 590 insertions(+), 21 deletions(-) create mode 100644 internals/overlord/logstate/clienterr/clienterr.go create mode 100644 internals/overlord/logstate/loki/export_test.go create mode 100644 internals/overlord/logstate/loki/loki.go create mode 100644 internals/overlord/logstate/loki/loki_test.go diff --git a/.github/.golangci.yml b/.github/.golangci.yml index 2e368060..7d784fdf 100644 --- a/.github/.golangci.yml +++ b/.github/.golangci.yml @@ -12,7 +12,5 @@ issues: # these values ensure that all issues will be surfaced max-issues-per-linter: 0 max-same-issues: 0 - run: - timeout: 5m - + timeout: 5m \ No newline at end of file diff --git a/internals/overlord/logstate/clienterr/clienterr.go b/internals/overlord/logstate/clienterr/clienterr.go new file mode 100644 index 00000000..c756ae13 --- /dev/null +++ b/internals/overlord/logstate/clienterr/clienterr.go @@ -0,0 +1,81 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +/* +Package clienterr contains common error types which are recognised by the log +gatherer. logstate.logClient implementations should return these error types +to communicate with the gatherer. + +Errors in this package should be pattern-matched using errors.As: + + err := client.Flush(ctx) + backoff := &clienterr.Backoff{} + if errors.As(err, &backoff) { + ... + } +*/ +package clienterr + +import ( + "bytes" + "fmt" + "io" + "net/http" + "time" +) + +// Backoff should be returned if the server indicates we are sending too many +// requests (e.g. an HTTP 429 response). +type Backoff struct { + RetryAfter *time.Time +} + +func (e *Backoff) Error() string { + errStr := "too many requests" + if e.RetryAfter != nil { + errStr += ", retry after " + e.RetryAfter.String() + } + return errStr +} + +// ErrorResponse represents an HTTP error response from the server +// (4xx or 5xx). +type ErrorResponse struct { + StatusCode int + Body bytes.Buffer + ReadErr error +} + +func (e *ErrorResponse) Error() string { + errStr := fmt.Sprintf("server returned HTTP %d\n", e.StatusCode) + if e.Body.Len() > 0 { + errStr += fmt.Sprintf(`response body: +%s +`, e.Body.String()) + } + if e.ReadErr != nil { + errStr += "cannot read response body: " + e.ReadErr.Error() + } + return errStr +} + +// ErrorFromResponse generates a *ErrorResponse from a failed *http.Response. +// NB: this function reads the response body. +func ErrorFromResponse(resp *http.Response) *ErrorResponse { + err := &ErrorResponse{} + err.StatusCode = resp.StatusCode + _, readErr := io.CopyN(&err.Body, resp.Body, 1024) + err.ReadErr = readErr + return err +} diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index 001b9e2f..f764d7bc 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -16,12 +16,15 @@ package logstate import ( "context" + "errors" "fmt" "time" "gopkg.in/tomb.v2" "github.com/canonical/pebble/internals/logger" + "github.com/canonical/pebble/internals/overlord/logstate/clienterr" + "github.com/canonical/pebble/internals/overlord/logstate/loki" "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" ) @@ -71,6 +74,8 @@ type logGatherer struct { pullers *pullerGroup // All pullers send logs on this channel, received by main loop entryCh chan servicelog.Entry + + timer timer } // logGathererArgs allows overriding the newLogClient method and time values @@ -103,6 +108,7 @@ func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logG client: client, entryCh: make(chan servicelog.Entry), pullers: newPullerGroup(target.Name), + timer: newTimer(), } g.clientCtx, g.clientCancel = context.WithCancel(context.Background()) g.tomb.Go(g.loop) @@ -169,8 +175,7 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R // pullers on entryCh, and writes them to the client. It also flushes the // client periodically, and exits when the gatherer's tomb is killed. func (g *logGatherer) loop() error { - timer := newTimer() - defer timer.Stop() + defer g.timer.Stop() mainLoop: for { @@ -178,20 +183,14 @@ mainLoop: case <-g.tomb.Dying(): break mainLoop - case <-timer.Expired(): + case <-g.timer.Expired(): // Mark timer as unset - timer.Stop() - err := g.client.Flush(g.clientCtx) - if err != nil { - logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) - } + g.timer.Stop() + g.handleClientErr(g.client.Flush(g.clientCtx)) case entry := <-g.entryCh: - err := g.client.Write(g.clientCtx, entry) - if err != nil { - logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err) - } - timer.EnsureSet(g.bufferTimeout) + g.handleClientErr(g.client.Write(g.clientCtx, entry)) + g.timer.EnsureSet(g.bufferTimeout) } } @@ -199,13 +198,25 @@ mainLoop: // We need to create a new context, as the previous one may have been cancelled. ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush) defer cancel() - err := g.client.Flush(ctx) - if err != nil { - logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) - } + g.handleClientErr(g.client.Flush(ctx)) return nil } +func (g *logGatherer) handleClientErr(err error) { + if err == nil { + return + } + + backoff := &clienterr.Backoff{} + if errors.As(err, &backoff) { + logger.Noticef("Target %q: %v", g.targetName, err) + g.timer.retryAfter = backoff.RetryAfter + return + } + + logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) +} + // Stop tears down the gatherer and associated resources (pullers, client). // This method will block until gatherer teardown is complete. // @@ -250,6 +261,8 @@ func (g *logGatherer) Stop() { type timer struct { timer *time.Timer set bool + // If non-nil, the timer won't expire until after this time + retryAfter *time.Time } func newTimer() timer { @@ -279,6 +292,15 @@ func (t *timer) EnsureSet(timeout time.Duration) { return } + if t.retryAfter != nil { + // We've been told to wait before retrying + retryTime := time.Until(*t.retryAfter) + if retryTime > timeout { + timeout = retryTime + } + t.retryAfter = nil + } + t.timer.Reset(timeout) t.set = true } @@ -311,7 +333,8 @@ type logClient interface { func newLogClient(target *plan.LogTarget) (logClient, error) { switch target.Type { - //case plan.LokiTarget: TODO + case plan.LokiTarget: + return loki.NewClient(target), nil //case plan.SyslogTarget: TODO default: return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name) diff --git a/internals/overlord/logstate/loki/export_test.go b/internals/overlord/logstate/loki/export_test.go new file mode 100644 index 00000000..f1616b2f --- /dev/null +++ b/internals/overlord/logstate/loki/export_test.go @@ -0,0 +1,25 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package loki + +import "time" + +func SetRequestTimeout(new time.Duration) (restore func()) { + oldRequestTimeout := requestTimeout + requestTimeout = new + return func() { + requestTimeout = oldRequestTimeout + } +} diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go new file mode 100644 index 00000000..88516730 --- /dev/null +++ b/internals/overlord/logstate/loki/loki.go @@ -0,0 +1,201 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package loki + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "sort" + "strconv" + "strings" + "time" + + "github.com/canonical/pebble/cmd" + "github.com/canonical/pebble/internals/overlord/logstate/clienterr" + "github.com/canonical/pebble/internals/plan" + "github.com/canonical/pebble/internals/servicelog" +) + +const maxRequestEntries = 1000 + +var requestTimeout = 10 * time.Second + +type Client struct { + remoteURL string + // buffered entries are "sharded" by service name + entries map[string][]lokiEntry + // Keep track of the number of buffered entries, to avoid having to iterate + // the entries map to get the total number. + numEntries int + + httpClient *http.Client + retryAfter *time.Time +} + +func NewClient(target *plan.LogTarget) *Client { + return &Client{ + remoteURL: target.Location, + entries: map[string][]lokiEntry{}, + httpClient: &http.Client{Timeout: requestTimeout}, + } +} + +func (c *Client) Write(ctx context.Context, entry servicelog.Entry) error { + c.entries[entry.Service] = append(c.entries[entry.Service], encodeEntry(entry)) + c.numEntries++ + + if c.numEntries >= maxRequestEntries { + if c.retryAfter != nil { + if time.Now().Before(*c.retryAfter) { + // Retry-After deadline hasn't passed yet, so we shouldn't flush + return nil + } + c.retryAfter = nil + } + return c.Flush(ctx) + } + return nil +} + +func encodeEntry(entry servicelog.Entry) lokiEntry { + return lokiEntry{ + strconv.FormatInt(entry.Time.UnixNano(), 10), + strings.TrimSuffix(entry.Message, "\n"), + } +} + +func (c *Client) Flush(ctx context.Context) error { + if c.numEntries == 0 { + return nil // no-op + } + defer c.emptyBuffer() + + req := c.buildRequest() + jsonReq, err := json.Marshal(req) + if err != nil { + return fmt.Errorf("encoding request to JSON: %v", err) + } + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.remoteURL, bytes.NewReader(jsonReq)) + if err != nil { + return fmt.Errorf("creating HTTP request: %v", err) + } + httpReq.Header.Set("Content-Type", "application/json; charset=utf-8") + httpReq.Header.Set("User-Agent", fmt.Sprintf("pebble/%s", cmd.Version)) + + resp, err := c.httpClient.Do(httpReq) + if err != nil { + return err + } + return c.handleServerResponse(resp) +} + +func (c *Client) emptyBuffer() { + for svc := range c.entries { + c.entries[svc] = c.entries[svc][:0] + } + c.numEntries = 0 +} + +func (c *Client) buildRequest() request { + // Sort keys to guarantee deterministic output + services := make([]string, 0, len(c.entries)) + for svc, entries := range c.entries { + if len(entries) == 0 { + continue + } + services = append(services, svc) + } + sort.Strings(services) + + var req request + for _, service := range services { + entries := c.entries[service] + stream := stream{ + Labels: map[string]string{ + "pebble_service": service, + }, + Entries: entries, + } + req.Streams = append(req.Streams, stream) + } + return req +} + +type request struct { + Streams []stream `json:"streams"` +} + +type stream struct { + Labels map[string]string `json:"stream"` + Entries []lokiEntry `json:"values"` +} + +type lokiEntry [2]string + +func (c *Client) handleServerResponse(resp *http.Response) error { + defer func() { + // Drain request body to allow connection reuse + // see https://pkg.go.dev/net/http#Response.Body + _, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, 1024*1024)) + _ = resp.Body.Close() + }() + + code := resp.StatusCode + switch { + case code == http.StatusTooManyRequests: + err := &clienterr.Backoff{} + retryAfter, ok := getRetryAfter(resp) + if ok { + err.RetryAfter = &retryAfter + c.retryAfter = &retryAfter + } + return err + + case code >= 400: + // Request to Loki failed + return clienterr.ErrorFromResponse(resp) + } + + return nil +} + +// Gets the parsed value of Retry-After from HTTP response headers. +func getRetryAfter(resp *http.Response) (time.Time, bool) { + retryAfterRaw := resp.Header.Get("Retry-After") + if retryAfterRaw == "" { + // Header unset + return time.Time{}, false + } + + // The Retry-After value can be a date-time + t, err := http.ParseTime(retryAfterRaw) + if err == nil { + return t, true + } + + // It can also be an integer number of seconds + n, err := strconv.Atoi(retryAfterRaw) + if err == nil && n > 0 { + t := time.Now().Add(time.Duration(n) * time.Second) + return t, true + } + + return time.Time{}, false +} diff --git a/internals/overlord/logstate/loki/loki_test.go b/internals/overlord/logstate/loki/loki_test.go new file mode 100644 index 00000000..29e9ca92 --- /dev/null +++ b/internals/overlord/logstate/loki/loki_test.go @@ -0,0 +1,241 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package loki_test + +import ( + "context" + "encoding/json" + "errors" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + . "gopkg.in/check.v1" + + "github.com/canonical/pebble/internals/overlord/logstate/clienterr" + "github.com/canonical/pebble/internals/overlord/logstate/loki" + "github.com/canonical/pebble/internals/plan" + "github.com/canonical/pebble/internals/servicelog" +) + +type suite struct{} + +var _ = Suite(&suite{}) + +func Test(t *testing.T) { + TestingT(t) +} + +func (*suite) TestRequest(c *C) { + input := []servicelog.Entry{{ + Time: time.Date(2023, 12, 31, 12, 34, 50, 0, time.UTC), + Service: "svc1", + Message: "log line #1\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 51, 0, time.UTC), + Service: "svc2", + Message: "log line #2\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 52, 0, time.UTC), + Service: "svc1", + Message: "log line #3\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 53, 0, time.UTC), + Service: "svc3", + Message: "log line #4\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 54, 0, time.UTC), + Service: "svc1", + Message: "log line #5\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 55, 0, time.UTC), + Service: "svc3", + Message: "log line #6\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 56, 0, time.UTC), + Service: "svc2", + Message: "log line #7\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 57, 0, time.UTC), + Service: "svc1", + Message: "log line #8\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 58, 0, time.UTC), + Service: "svc4", + Message: "log line #9\n", + }} + + expected := ` +{ + "streams": [ + { + "stream": { + "pebble_service": "svc1" + }, + "values": [ + [ "1704026090000000000", "log line #1" ], + [ "1704026092000000000", "log line #3" ], + [ "1704026094000000000", "log line #5" ], + [ "1704026097000000000", "log line #8" ] + ] + }, + { + "stream": { + "pebble_service": "svc2" + }, + "values": [ + [ "1704026091000000000", "log line #2" ], + [ "1704026096000000000", "log line #7" ] + ] + }, + { + "stream": { + "pebble_service": "svc3" + }, + "values": [ + [ "1704026093000000000", "log line #4" ], + [ "1704026095000000000", "log line #6" ] + ] + }, + { + "stream": { + "pebble_service": "svc4" + }, + "values": [ + [ "1704026098000000000", "log line #9" ] + ] + } + ] +} +` + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c.Assert(r.Method, Equals, http.MethodPost) + c.Assert(r.Header.Get("Content-Type"), Equals, "application/json; charset=utf-8") + + reqBody, err := io.ReadAll(r.Body) + c.Assert(err, IsNil) + expFlattened, err := flattenJSON(expected) + c.Assert(err, IsNil) + c.Assert(string(reqBody), Equals, expFlattened) + })) + defer server.Close() + + client := loki.NewClient(&plan.LogTarget{Location: server.URL}) + for _, entry := range input { + err := client.Write(context.Background(), entry) + c.Assert(err, IsNil) + } + + err := client.Flush(context.Background()) + c.Assert(err, IsNil) +} + +func (*suite) TestFlushCancelContext(c *C) { + serverCtx, killServer := context.WithCancel(context.Background()) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + select { + case <-serverCtx.Done(): + // Simulate a slow-responding server + case <-time.After(10 * time.Second): + } + })) + defer server.Close() + defer killServer() + + client := loki.NewClient(&plan.LogTarget{Location: server.URL}) + err := client.Write(context.Background(), servicelog.Entry{ + Time: time.Now(), + Service: "svc1", + Message: "this is a log line\n", + }) + c.Assert(err, IsNil) + + flushReturned := make(chan struct{}) + go func() { + // Cancel the Flush context quickly + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Microsecond) + defer cancel() + + err := client.Flush(ctx) + c.Assert(err, ErrorMatches, ".*context deadline exceeded.*") + close(flushReturned) + }() + + // Check Flush returns quickly after context timeout + select { + case <-flushReturned: + case <-time.After(1 * time.Second): + c.Fatal("lokiClient.Flush took too long to return after context timeout") + } +} + +func (*suite) TestServerTimeout(c *C) { + restore := loki.SetRequestTimeout(1 * time.Microsecond) + defer restore() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(2 * time.Microsecond) + })) + defer server.Close() + + client := loki.NewClient(&plan.LogTarget{Location: server.URL}) + err := client.Write(context.Background(), servicelog.Entry{ + Time: time.Now(), + Service: "svc1", + Message: "this is a log line\n", + }) + c.Assert(err, IsNil) + + err = client.Flush(context.Background()) + c.Assert(err, ErrorMatches, ".*context deadline exceeded.*") +} + +func (*suite) TestTooManyRequests(c *C) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Retry-After", "Tue, 15 Aug 2023 08:49:37 GMT") + w.WriteHeader(http.StatusTooManyRequests) + })) + defer server.Close() + + client := loki.NewClient(&plan.LogTarget{Location: server.URL}) + err := client.Write(context.Background(), servicelog.Entry{ + Time: time.Now(), + Service: "svc1", + Message: "this is a log line\n", + }) + c.Assert(err, IsNil) + + err = client.Flush(context.Background()) + backoff := &clienterr.Backoff{} + c.Assert(errors.As(err, &backoff), Equals, true) + + expectedTime := time.Date(2023, 8, 15, 8, 49, 37, 0, time.UTC) + c.Check(backoff.RetryAfter, DeepEquals, &expectedTime) +} + +// Strips all extraneous whitespace from JSON +func flattenJSON(s string) (string, error) { + var v any + err := json.Unmarshal([]byte(s), &v) + if err != nil { + return "", err + } + + b, err := json.Marshal(v) + return string(b), err +}