diff --git a/README.md b/README.md index ab9a0cc2..3a94b0fd 100644 --- a/README.md +++ b/README.md @@ -403,30 +403,26 @@ $ pebble run --verbose ... ``` - ## Container usage @@ -721,6 +716,37 @@ checks: # (Optional) Working directory to run command in. By default, the # command is run in the service manager's current directory. working-dir: + +# (Optional) A list of remote log receivers, to which service logs can be sent. +log-targets: + + : + + # (Required) Control how this log target definition is combined with + # other pre-existing definitions with the same name in the Pebble plan. + # + # The value 'merge' will ensure that values in this layer specification + # are merged over existing definitions, whereas 'replace' will entirely + # override the existing target spec in the plan with the same name. + override: merge | replace + + # (Required) The type of log target, which determines the format in + # which logs will be sent. Currently, the only supported type is 'loki', + # but more protocols may be added in the future. + type: loki + + # (Required) The URL of the remote log target. + # For Loki, this needs to be the fully-qualified URL of the push API, + # including the API endpoint, e.g. + # http://:3100/loki/api/v1/push + location: + + # (Optional) A list of services whose logs will be sent to this target. + # Use the special keyword 'all' to match all services in the plan. + # When merging log targets, the 'services' lists are appended. Prefix a + # service name with a minus (e.g. '-svc1') to remove a previously added + # service. '-all' will remove all services. + services: [] ``` ## API and clients @@ -753,7 +779,8 @@ Here are some of the things coming soon: - [x] Automatically restart services that fail - [x] Support for custom health checks (HTTP, TCP, command) - [x] Terminate all services before exiting run command - - [ ] Log forwarding (syslog and Loki) + - [x] Log forwarding to Loki + - [ ] Log forwarding to syslog - [ ] [Other in-progress PRs](https://github.com/canonical/pebble/pulls) - [ ] [Other requested features](https://github.com/canonical/pebble/issues) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index 001b9e2f..cde95306 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -22,13 +22,15 @@ import ( "gopkg.in/tomb.v2" "github.com/canonical/pebble/internals/logger" + "github.com/canonical/pebble/internals/overlord/logstate/loki" "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" ) const ( - parserSize = 4 * 1024 - bufferTimeout = 1 * time.Second + parserSize = 4 * 1024 + bufferTimeout = 1 * time.Second + maxBufferedEntries = 100 // These constants control the maximum time allowed for each teardown step. timeoutCurrentFlush = 1 * time.Second @@ -55,7 +57,7 @@ const ( // Calling the Stop() method will tear down the logGatherer and all of its // associated logPullers. Stop() can be called from an outside goroutine. type logGatherer struct { - logGathererArgs + *logGathererOptions targetName string // tomb for the main loop @@ -73,31 +75,32 @@ type logGatherer struct { entryCh chan servicelog.Entry } -// logGathererArgs allows overriding the newLogClient method and time values +// logGathererOptions allows overriding the newLogClient method and time values // in testing. -type logGathererArgs struct { - bufferTimeout time.Duration - timeoutFinalFlush time.Duration +type logGathererOptions struct { + bufferTimeout time.Duration + maxBufferedEntries int + timeoutFinalFlush time.Duration // method to get a new client newClient func(*plan.LogTarget) (logClient, error) } func newLogGatherer(target *plan.LogTarget) (*logGatherer, error) { - return newLogGathererInternal(target, logGathererArgs{}) + return newLogGathererInternal(target, &logGathererOptions{}) } // newLogGathererInternal contains the actual creation code for a logGatherer. // This function is used in the real implementation, but also allows overriding // certain configuration values for testing. -func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logGatherer, error) { - args = fillDefaultArgs(args) - client, err := args.newClient(target) +func newLogGathererInternal(target *plan.LogTarget, options *logGathererOptions) (*logGatherer, error) { + options = fillDefaultOptions(options) + client, err := options.newClient(target) if err != nil { return nil, fmt.Errorf("cannot create log client: %w", err) } g := &logGatherer{ - logGathererArgs: args, + logGathererOptions: options, targetName: target.Name, client: client, @@ -111,17 +114,20 @@ func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logG return g, nil } -func fillDefaultArgs(args logGathererArgs) logGathererArgs { - if args.bufferTimeout == 0 { - args.bufferTimeout = bufferTimeout +func fillDefaultOptions(options *logGathererOptions) *logGathererOptions { + if options.bufferTimeout == 0 { + options.bufferTimeout = bufferTimeout } - if args.timeoutFinalFlush == 0 { - args.timeoutFinalFlush = timeoutFinalFlush + if options.maxBufferedEntries == 0 { + options.maxBufferedEntries = maxBufferedEntries } - if args.newClient == nil { - args.newClient = newLogClient + if options.timeoutFinalFlush == 0 { + options.timeoutFinalFlush = timeoutFinalFlush } - return args + if options.newClient == nil { + options.newClient = newLogClient + } + return options } // PlanChanged is called by the LogManager when the plan is changed, if this @@ -169,8 +175,20 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R // pullers on entryCh, and writes them to the client. It also flushes the // client periodically, and exits when the gatherer's tomb is killed. func (g *logGatherer) loop() error { - timer := newTimer() - defer timer.Stop() + flushTimer := newTimer() + defer flushTimer.Stop() + // Keep track of number of logs written since last flush + numWritten := 0 + + flushClient := func(ctx context.Context) { + // Mark timer as unset + flushTimer.Stop() + err := g.client.Flush(ctx) + if err != nil { + logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) + } + numWritten = 0 + } mainLoop: for { @@ -178,20 +196,23 @@ mainLoop: case <-g.tomb.Dying(): break mainLoop - case <-timer.Expired(): - // Mark timer as unset - timer.Stop() - err := g.client.Flush(g.clientCtx) - if err != nil { - logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) - } + case <-flushTimer.Expired(): + flushClient(g.clientCtx) case entry := <-g.entryCh: - err := g.client.Write(g.clientCtx, entry) + err := g.client.Add(entry) if err != nil { logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err) + continue + } + numWritten++ + // Check if buffer is full + if numWritten >= g.maxBufferedEntries { + flushClient(g.clientCtx) + continue } - timer.EnsureSet(g.bufferTimeout) + // Otherwise, set the timeout + flushTimer.EnsureSet(g.bufferTimeout) } } @@ -199,10 +220,7 @@ mainLoop: // We need to create a new context, as the previous one may have been cancelled. ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush) defer cancel() - err := g.client.Flush(ctx) - if err != nil { - logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) - } + flushClient(ctx) return nil } @@ -288,30 +306,18 @@ func (t *timer) EnsureSet(timeout time.Duration) { // protocol required by that log target. // For example, a logClient for Loki would encode the log messages in the // JSON format expected by Loki, and send them over HTTP(S). -// -// logClient implementations have some freedom about the semantics of these -// methods. For a buffering client (e.g. HTTP): -// - Write could add the log to the client's internal buffer, calling Flush -// when this buffer reaches capacity. -// - Flush would prepare and send a request with the buffered logs. -// -// For a non-buffering client (e.g. TCP), Write could serialise the log -// directly to the open connection, while Flush would be a no-op. type logClient interface { - // Write adds the given log entry to the client. Depending on the - // implementation of the client, this may send the log to the remote target, - // or simply add the log to an internal buffer, flushing that buffer when - // required. - Write(context.Context, servicelog.Entry) error - - // Flush sends buffered logs (if any) to the remote target. For clients which - // don't buffer logs, Flush should be a no-op. + // Add adds the given log entry to the client's buffer. + Add(servicelog.Entry) error + + // Flush sends buffered logs (if any) to the remote target. Flush(context.Context) error } func newLogClient(target *plan.LogTarget) (logClient, error) { switch target.Type { - //case plan.LokiTarget: TODO + case plan.LokiTarget: + return loki.NewClient(target), nil //case plan.SyslogTarget: TODO default: return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name) diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go index b7220cbe..956f22f3 100644 --- a/internals/overlord/logstate/gatherer_test.go +++ b/internals/overlord/logstate/gatherer_test.go @@ -18,10 +18,13 @@ import ( "context" "fmt" "io" + "net/http" + "net/http/httptest" "time" . "gopkg.in/check.v1" + "github.com/canonical/pebble/internals/overlord/logstate/loki" "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" ) @@ -32,7 +35,8 @@ var _ = Suite(&gathererSuite{}) func (s *gathererSuite) TestGatherer(c *C) { received := make(chan []servicelog.Entry, 1) - gathererArgs := logGathererArgs{ + gathererOptions := logGathererOptions{ + maxBufferedEntries: 5, newClient: func(target *plan.LogTarget) (logClient, error) { return &testClient{ bufferSize: 5, @@ -41,7 +45,7 @@ func (s *gathererSuite) TestGatherer(c *C) { }, } - g, err := newLogGathererInternal(&plan.LogTarget{Name: "tgt1"}, gathererArgs) + g, err := newLogGathererInternal(&plan.LogTarget{Name: "tgt1"}, &gathererOptions) c.Assert(err, IsNil) testSvc := newTestService("svc1") @@ -59,7 +63,7 @@ func (s *gathererSuite) TestGatherer(c *C) { testSvc.writeLog("log line #5") select { - case <-time.After(5 * time.Millisecond): + case <-time.After(1 * time.Second): c.Fatalf("timeout waiting for logs") case logs := <-received: checkLogs(c, logs, []string{"log line #1", "log line #2", "log line #3", "log line #4", "log line #5"}) @@ -68,7 +72,7 @@ func (s *gathererSuite) TestGatherer(c *C) { func (s *gathererSuite) TestGathererTimeout(c *C) { received := make(chan []servicelog.Entry, 1) - gathererArgs := logGathererArgs{ + gathererOptions := logGathererOptions{ bufferTimeout: 1 * time.Millisecond, newClient: func(target *plan.LogTarget) (logClient, error) { return &testClient{ @@ -78,7 +82,7 @@ func (s *gathererSuite) TestGathererTimeout(c *C) { }, } - g, err := newLogGathererInternal(&plan.LogTarget{Name: "tgt1"}, gathererArgs) + g, err := newLogGathererInternal(&plan.LogTarget{Name: "tgt1"}, &gathererOptions) c.Assert(err, IsNil) testSvc := newTestService("svc1") @@ -95,7 +99,7 @@ func (s *gathererSuite) TestGathererTimeout(c *C) { func (s *gathererSuite) TestGathererShutdown(c *C) { received := make(chan []servicelog.Entry, 1) - gathererArgs := logGathererArgs{ + gathererOptions := logGathererOptions{ bufferTimeout: 1 * time.Microsecond, newClient: func(target *plan.LogTarget) (logClient, error) { return &testClient{ @@ -105,7 +109,7 @@ func (s *gathererSuite) TestGathererShutdown(c *C) { }, } - g, err := newLogGathererInternal(&plan.LogTarget{Name: "tgt1"}, gathererArgs) + g, err := newLogGathererInternal(&plan.LogTarget{Name: "tgt1"}, &gathererOptions) c.Assert(err, IsNil) testSvc := newTestService("svc1") @@ -122,7 +126,7 @@ func (s *gathererSuite) TestGathererShutdown(c *C) { }() select { - case <-time.After(100 * time.Millisecond): + case <-time.After(1 * time.Second): c.Fatalf("timeout waiting for gatherer to tear down") case <-hasShutdown: } @@ -137,6 +141,83 @@ logs in client buffer: %v`, len(g.client.(*testClient).buffered)) } } +func (s *gathererSuite) TestRetryLoki(c *C) { + var handler func(http.ResponseWriter, *http.Request) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handler(w, r) + })) + defer server.Close() + + g, err := newLogGathererInternal( + &plan.LogTarget{ + Name: "tgt1", + Location: server.URL, + }, + &logGathererOptions{ + bufferTimeout: 1 * time.Millisecond, + maxBufferedEntries: 5, + newClient: func(target *plan.LogTarget) (logClient, error) { + return loki.NewClientWithOptions(target, &loki.ClientOptions{ + MaxRequestEntries: 5, + }), nil + }, + }, + ) + c.Assert(err, IsNil) + + testSvc := newTestService("svc1") + g.ServiceStarted(testSvc.config, testSvc.ringBuffer) + + reqReceived := make(chan struct{}) + // First attempt: server should return a retryable error + handler = func(w http.ResponseWriter, _ *http.Request) { + close(reqReceived) + w.WriteHeader(http.StatusTooManyRequests) + } + + testSvc.writeLog("log line #1") + testSvc.writeLog("log line #2") + testSvc.writeLog("log line #3") + testSvc.writeLog("log line #4") + testSvc.writeLog("log line #5") + + // Check that request was received + select { + case <-reqReceived: + case <-time.After(1 * time.Second): + c.Fatalf("timed out waiting for request") + } + + reqReceived = make(chan struct{}) + // Second attempt: check that logs were held over from last time + handler = func(w http.ResponseWriter, r *http.Request) { + close(reqReceived) + reqBody, err := io.ReadAll(r.Body) + c.Assert(err, IsNil) + + expected := `{"streams":\[{"stream":{"pebble_service":"svc1"},"values":\[` + + // First two log lines should have been truncated + `\["\d+","log line #3"\],` + + `\["\d+","log line #4"\],` + + `\["\d+","log line #5"\],` + + `\["\d+","log line #6"\],` + + `\["\d+","log line #7"\]` + + `\]}\]}` + c.Assert(string(reqBody), Matches, expected) + } + + testSvc.writeLog("log line #6") + testSvc.writeLog("log line #7") + // Wait for flush timeout to elapse + + // Check that request was received + select { + case <-reqReceived: + case <-time.After(1 * time.Second): + c.Fatalf("timed out waiting for request") + } +} + func checkLogs(c *C, received []servicelog.Entry, expected []string) { c.Assert(received, HasLen, len(expected)) for i, entry := range received { @@ -151,11 +232,8 @@ type testClient struct { sendCh chan []servicelog.Entry } -func (c *testClient) Write(ctx context.Context, entry servicelog.Entry) error { +func (c *testClient) Add(entry servicelog.Entry) error { c.buffered = append(c.buffered, entry) - if len(c.buffered) >= c.bufferSize { - return c.Flush(ctx) - } return nil } diff --git a/internals/overlord/logstate/loki/export_test.go b/internals/overlord/logstate/loki/export_test.go new file mode 100644 index 00000000..8343c361 --- /dev/null +++ b/internals/overlord/logstate/loki/export_test.go @@ -0,0 +1,25 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package loki + +type LokiEntryWithService = lokiEntryWithService + +func GetBuffer(c *Client) []LokiEntryWithService { + return c.buffer +} + +func GetMessage(e LokiEntryWithService) string { + return e.entry[1] +} diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go new file mode 100644 index 00000000..d3f37bfd --- /dev/null +++ b/internals/overlord/logstate/loki/loki.go @@ -0,0 +1,249 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package loki + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "sort" + "strconv" + "strings" + "time" + + "github.com/canonical/pebble/cmd" + "github.com/canonical/pebble/internals/logger" + "github.com/canonical/pebble/internals/plan" + "github.com/canonical/pebble/internals/servicelog" +) + +const ( + requestTimeout = 10 * time.Second + maxRequestEntries = 100 +) + +type Client struct { + options *ClientOptions + + targetName string + remoteURL string + httpClient *http.Client + + // To store log entries, keep a buffer of size 2*MaxRequestEntries with a + // sliding window 'entries' of size MaxRequestEntries + buffer []lokiEntryWithService + entries []lokiEntryWithService +} + +func NewClient(target *plan.LogTarget) *Client { + return NewClientWithOptions(target, &ClientOptions{}) +} + +// ClientOptions allows overriding default parameters (e.g. for testing) +type ClientOptions struct { + RequestTimeout time.Duration + MaxRequestEntries int +} + +func NewClientWithOptions(target *plan.LogTarget, options *ClientOptions) *Client { + options = fillDefaultOptions(options) + c := &Client{ + options: options, + targetName: target.Name, + remoteURL: target.Location, + httpClient: &http.Client{Timeout: options.RequestTimeout}, + buffer: make([]lokiEntryWithService, 2*options.MaxRequestEntries), + } + // c.entries should be backed by the same array as c.buffer + c.entries = c.buffer[0:0:len(c.buffer)] + return c +} + +func fillDefaultOptions(options *ClientOptions) *ClientOptions { + if options.RequestTimeout == 0 { + options.RequestTimeout = requestTimeout + } + if options.MaxRequestEntries == 0 { + options.MaxRequestEntries = maxRequestEntries + } + return options +} + +func (c *Client) Add(entry servicelog.Entry) error { + if n := len(c.entries); n >= c.options.MaxRequestEntries { + // 'entries' is full - remove the first element to make room + // Zero the removed element to allow garbage collection + c.entries[0] = lokiEntryWithService{} + c.entries = c.entries[1:] + } + + if len(c.entries) >= cap(c.entries) { + // Copy all the elements to the start of the buffer + copy(c.buffer, c.entries) + + // Reset the view into the buffer + c.entries = c.buffer[0:len(c.entries):len(c.buffer)] + + // Zero removed elements to allow garbage collection + for i := len(c.entries); i < len(c.buffer); i++ { + c.buffer[i] = lokiEntryWithService{} + } + } + + c.entries = append(c.entries, lokiEntryWithService{ + entry: encodeEntry(entry), + service: entry.Service, + }) + return nil +} + +func encodeEntry(entry servicelog.Entry) lokiEntry { + return lokiEntry{ + strconv.FormatInt(entry.Time.UnixNano(), 10), + strings.TrimSuffix(entry.Message, "\n"), + } +} + +func (c *Client) Flush(ctx context.Context) error { + if len(c.entries) == 0 { + return nil // no-op + } + + req := c.buildRequest() + jsonReq, err := json.Marshal(req) + if err != nil { + return fmt.Errorf("encoding request to JSON: %v", err) + } + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.remoteURL, bytes.NewReader(jsonReq)) + if err != nil { + return fmt.Errorf("creating HTTP request: %v", err) + } + httpReq.Header.Set("Content-Type", "application/json; charset=utf-8") + httpReq.Header.Set("User-Agent", fmt.Sprintf("pebble/%s", cmd.Version)) + + resp, err := c.httpClient.Do(httpReq) + if err != nil { + return err + } + + return c.handleServerResponse(resp) +} + +// resetBuffer drops all buffered logs (in the case of a successful send, or an +// unrecoverable error). +func (c *Client) resetBuffer() { + c.entries = c.entries[:0] +} + +func (c *Client) buildRequest() lokiRequest { + // Put entries into service "buckets" + bucketedEntries := map[string][]lokiEntry{} + for _, data := range c.entries { + bucketedEntries[data.service] = append(bucketedEntries[data.service], data.entry) + } + + // Sort service names to guarantee deterministic output + var services []string + for service := range bucketedEntries { + services = append(services, service) + } + sort.Strings(services) + + var req lokiRequest + for _, service := range services { + entries := bucketedEntries[service] + stream := lokiStream{ + Labels: map[string]string{ + "pebble_service": service, + }, + Entries: entries, + } + req.Streams = append(req.Streams, stream) + } + return req +} + +type lokiRequest struct { + Streams []lokiStream `json:"streams"` +} + +type lokiStream struct { + Labels map[string]string `json:"stream"` + Entries []lokiEntry `json:"values"` +} + +type lokiEntry [2]string + +type lokiEntryWithService struct { + entry lokiEntry + service string +} + +// handleServerResponse determines what to do based on the response from the +// Loki server. 4xx and 5xx responses indicate errors, so in this case, we will +// bubble up the error to the caller. +func (c *Client) handleServerResponse(resp *http.Response) error { + defer func() { + // Drain request body to allow connection reuse + // see https://pkg.go.dev/net/http#Response.Body + _, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, 1024*1024)) + _ = resp.Body.Close() + }() + + code := resp.StatusCode + switch { + case code == http.StatusOK || code == http.StatusNoContent: + // Success - safe to drop logs + c.resetBuffer() + return nil + + case code == http.StatusTooManyRequests: + // For 429, don't drop logs - just retry later + return errFromResponse(resp) + + case 400 <= code && code < 500: + // Other 4xx codes indicate a client problem, so drop the logs (retrying won't help) + logger.Noticef("Target %q: request failed with status %d, dropping %d logs", + c.targetName, code, len(c.entries)) + c.resetBuffer() + return errFromResponse(resp) + + case 500 <= code && code < 600: + // 5xx indicates a problem with the server, so don't drop logs (retry later) + return errFromResponse(resp) + + default: + // Unexpected response - don't drop logs to be safe + return fmt.Errorf("unexpected response from server: %v", resp.Status) + } +} + +// errFromResponse generates an error from a failed *http.Response. +// Note: this function reads the response body. +func errFromResponse(resp *http.Response) error { + // Read response body to get more context + body, err := io.ReadAll(io.LimitReader(resp.Body, 1024)) + if err == nil { + logger.Debugf("HTTP %d error, response %q", resp.StatusCode, body) + } else { + logger.Debugf("HTTP %d error, but cannot read response: %v", resp.StatusCode, err) + } + + return fmt.Errorf("server returned HTTP %v", resp.Status) +} diff --git a/internals/overlord/logstate/loki/loki_test.go b/internals/overlord/logstate/loki/loki_test.go new file mode 100644 index 00000000..3baef17e --- /dev/null +++ b/internals/overlord/logstate/loki/loki_test.go @@ -0,0 +1,256 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package loki_test + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + . "gopkg.in/check.v1" + + "github.com/canonical/pebble/internals/overlord/logstate/loki" + "github.com/canonical/pebble/internals/plan" + "github.com/canonical/pebble/internals/servicelog" +) + +type suite struct{} + +var _ = Suite(&suite{}) + +func Test(t *testing.T) { + TestingT(t) +} + +func (*suite) TestRequest(c *C) { + input := []servicelog.Entry{{ + Time: time.Date(2023, 12, 31, 12, 34, 50, 0, time.UTC), + Service: "svc1", + Message: "log line #1\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 51, 0, time.UTC), + Service: "svc2", + Message: "log line #2\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 52, 0, time.UTC), + Service: "svc1", + Message: "log line #3\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 53, 0, time.UTC), + Service: "svc3", + Message: "log line #4\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 54, 0, time.UTC), + Service: "svc1", + Message: "log line #5\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 55, 0, time.UTC), + Service: "svc3", + Message: "log line #6\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 56, 0, time.UTC), + Service: "svc2", + Message: "log line #7\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 57, 0, time.UTC), + Service: "svc1", + Message: "log line #8\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 58, 0, time.UTC), + Service: "svc4", + Message: "log line #9\n", + }} + + expected := compactJSON(` +{"streams": [{ + "stream": {"pebble_service": "svc1"}, + "values": [ + [ "1704026090000000000", "log line #1" ], + [ "1704026092000000000", "log line #3" ], + [ "1704026094000000000", "log line #5" ], + [ "1704026097000000000", "log line #8" ] + ] +}, { + "stream": {"pebble_service": "svc2"}, + "values": [ + [ "1704026091000000000", "log line #2" ], + [ "1704026096000000000", "log line #7" ] + ] +}, { + "stream": {"pebble_service": "svc3"}, + "values": [ + [ "1704026093000000000", "log line #4" ], + [ "1704026095000000000", "log line #6" ] + ] +}, { + "stream": {"pebble_service": "svc4"}, + "values": [ + [ "1704026098000000000", "log line #9" ] + ] +}]}`) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c.Assert(r.Method, Equals, http.MethodPost) + c.Assert(r.Header.Get("Content-Type"), Equals, "application/json; charset=utf-8") + + reqBody, err := io.ReadAll(r.Body) + c.Assert(err, IsNil) + c.Assert(reqBody, DeepEquals, expected) + })) + defer server.Close() + + client := loki.NewClient(&plan.LogTarget{Location: server.URL}) + for _, entry := range input { + err := client.Add(entry) + c.Assert(err, IsNil) + } + + err := client.Flush(context.Background()) + c.Assert(err, IsNil) +} + +func (*suite) TestFlushCancelContext(c *C) { + serverCtx, killServer := context.WithCancel(context.Background()) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + select { + case <-serverCtx.Done(): + // Simulate a slow-responding server + case <-time.After(10 * time.Second): + } + })) + defer server.Close() + defer killServer() + + client := loki.NewClient(&plan.LogTarget{Location: server.URL}) + err := client.Add(servicelog.Entry{ + Time: time.Now(), + Service: "svc1", + Message: "this is a log line\n", + }) + c.Assert(err, IsNil) + + flushReturned := make(chan struct{}) + go func() { + // Cancel the Flush context quickly + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Microsecond) + defer cancel() + + err := client.Flush(ctx) + c.Assert(err, ErrorMatches, ".*context deadline exceeded.*") + close(flushReturned) + }() + + // Check Flush returns quickly after context timeout + select { + case <-flushReturned: + case <-time.After(1 * time.Second): + c.Fatal("lokiClient.Flush took too long to return after context timeout") + } +} + +func (*suite) TestServerTimeout(c *C) { + stopRequest := make(chan struct{}) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-stopRequest + })) + defer server.Close() + defer close(stopRequest) + + client := loki.NewClientWithOptions( + &plan.LogTarget{Location: server.URL}, + &loki.ClientOptions{ + RequestTimeout: 1 * time.Microsecond, + }, + ) + err := client.Add(servicelog.Entry{ + Time: time.Now(), + Service: "svc1", + Message: "this is a log line\n", + }) + c.Assert(err, IsNil) + + err = client.Flush(context.Background()) + c.Assert(err, ErrorMatches, ".*context deadline exceeded.*") +} + +func (*suite) TestBufferFull(c *C) { + client := loki.NewClientWithOptions( + &plan.LogTarget{ + Name: "tgt1", + Location: "fake", + }, + &loki.ClientOptions{ + MaxRequestEntries: 3, + }, + ) + + addEntry := func(s string) { + err := client.Add(servicelog.Entry{Message: s}) + c.Assert(err, IsNil) + } + + // Check that the client's buffer is as expected + buffer := loki.GetBuffer(client) + checkBuffer := func(expected []any) { + if len(buffer) != len(expected) { + c.Fatalf("buffer length is %v, expected %v", len(buffer), len(expected)) + } + + for i := range expected { + // 'nil' means c.buffer[i] should be zero + if expected[i] == nil { + c.Assert(buffer[i], DeepEquals, loki.LokiEntryWithService{}, + Commentf("buffer[%d] should be zero, obtained %v", i, buffer[i])) + continue + } + + // Otherwise, check buffer message matches string + msg := expected[i].(string) + c.Assert(loki.GetMessage(buffer[i]), Equals, msg) + } + } + + checkBuffer([]any{nil, nil, nil, nil, nil, nil}) + addEntry("1") + checkBuffer([]any{"1", nil, nil, nil, nil, nil}) + addEntry("2") + checkBuffer([]any{"1", "2", nil, nil, nil, nil}) + addEntry("3") + checkBuffer([]any{"1", "2", "3", nil, nil, nil}) + addEntry("4") + checkBuffer([]any{nil, "2", "3", "4", nil, nil}) + addEntry("5") + checkBuffer([]any{nil, nil, "3", "4", "5", nil}) + addEntry("6") + checkBuffer([]any{nil, nil, nil, "4", "5", "6"}) + addEntry("7") + checkBuffer([]any{"5", "6", "7", nil, nil, nil}) +} + +// Strips all extraneous whitespace from JSON +func compactJSON(s string) []byte { + var buf bytes.Buffer + err := json.Compact(&buf, []byte(s)) + if err != nil { + panic(fmt.Sprintf("error compacting JSON: %v", err)) + } + return buf.Bytes() +} diff --git a/internals/overlord/logstate/manager_test.go b/internals/overlord/logstate/manager_test.go index 366250be..73ef8dfb 100644 --- a/internals/overlord/logstate/manager_test.go +++ b/internals/overlord/logstate/manager_test.go @@ -30,14 +30,14 @@ type managerSuite struct{} var _ = Suite(&managerSuite{}) func (s *managerSuite) TestPlanChange(c *C) { - gathererArgs := logGathererArgs{ + gathererOptions := logGathererOptions{ newClient: func(target *plan.LogTarget) (logClient, error) { return &testClient{}, nil }, } m := NewLogManager() m.newGatherer = func(t *plan.LogTarget) (*logGatherer, error) { - return newLogGathererInternal(t, gathererArgs) + return newLogGathererInternal(t, &gathererOptions) } svc1 := newTestService("svc1") @@ -116,7 +116,7 @@ func checkBuffers(c *C, buffers map[string]*servicelog.RingBuffer, expected []st } func (s *managerSuite) TestTimelyShutdown(c *C) { - gathererArgs := logGathererArgs{ + gathererOptions := logGathererOptions{ timeoutFinalFlush: 5 * time.Millisecond, newClient: func(target *plan.LogTarget) (logClient, error) { return &slowFlushingClient{ @@ -127,7 +127,7 @@ func (s *managerSuite) TestTimelyShutdown(c *C) { m := NewLogManager() m.newGatherer = func(t *plan.LogTarget) (*logGatherer, error) { - return newLogGathererInternal(t, gathererArgs) + return newLogGathererInternal(t, &gathererOptions) } svc1 := newTestService("svc1") @@ -162,7 +162,7 @@ func (s *managerSuite) TestTimelyShutdown(c *C) { }() select { case <-done: - case <-time.After(50 * time.Millisecond): + case <-time.After(1 * time.Second): c.Fatal("LogManager.Stop() took too long") } } @@ -171,7 +171,7 @@ type slowFlushingClient struct { flushTime time.Duration } -func (c *slowFlushingClient) Write(_ context.Context, _ servicelog.Entry) error { +func (c *slowFlushingClient) Add(_ servicelog.Entry) error { // no-op return nil }