diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index fff775819..0d1d96f6b 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -311,7 +311,8 @@ type logClient interface { func newLogClient(target *plan.LogTarget) (logClient, error) { switch target.Type { - //case plan.LokiTarget: TODO + case plan.LokiTarget: + return newLokiClient(target), nil //case plan.SyslogTarget: TODO default: return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name) diff --git a/internals/overlord/logstate/loki.go b/internals/overlord/logstate/loki.go new file mode 100644 index 000000000..826aa4de4 --- /dev/null +++ b/internals/overlord/logstate/loki.go @@ -0,0 +1,157 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package logstate + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "sort" + "strconv" + "strings" + "time" + + "github.com/canonical/pebble/internals/plan" + "github.com/canonical/pebble/internals/servicelog" +) + +const ( + maxLokiRequestSize = 10 + lokiRequestTimeout = 10 * time.Second +) + +type lokiClient struct { + target *plan.LogTarget + remoteURL string + // buffered entries are "sharded" by service name + entries map[string][]lokiEntry + numEntries int + + httpClient http.Client +} + +func newLokiClient(target *plan.LogTarget) logClient { + return &lokiClient{ + target: target, + remoteURL: target.Location, + entries: map[string][]lokiEntry{}, + httpClient: http.Client{Timeout: lokiRequestTimeout}, + } +} + +func (c *lokiClient) Write(ctx context.Context, entry servicelog.Entry) error { + c.entries[entry.Service] = append(c.entries[entry.Service], asLokiEntry(entry)) + c.numEntries++ + if c.numEntries >= maxLokiRequestSize { + return c.Flush(ctx) + } + return nil +} + +func asLokiEntry(entry servicelog.Entry) lokiEntry { + return lokiEntry{ + strconv.FormatInt(entry.Time.UnixNano(), 10), + strings.TrimSuffix(entry.Message, "\n"), + } +} + +func (c *lokiClient) Flush(ctx context.Context) error { + if c.numEntries == 0 { + return nil // no-op + } + defer c.emptyBuffer() + + req := c.buildRequest() + jsonReq, err := json.Marshal(req) + if err != nil { + return fmt.Errorf("encoding request to JSON: %v", err) + } + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.remoteURL, bytes.NewReader(jsonReq)) + if err != nil { + return fmt.Errorf("creating HTTP request: %v", err) + } + httpReq.Header.Set("Content-Type", "application/json") + + resp, err := c.httpClient.Do(httpReq) + if err != nil { + return err + } + defer func() { + // Drain request body to allow connection reuse + // see https://pkg.go.dev/net/http#Response.Body + _, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, 1024*1024)) + _ = resp.Body.Close() + }() + + // Check response status code to see if it was successful + if code := resp.StatusCode; code < 200 || code >= 300 { + // Request to Loki failed + b, err := io.ReadAll(io.LimitReader(resp.Body, 1024)) + if err != nil { + b = append(b, []byte("//couldn't read response body: ")...) + b = append(b, []byte(err.Error())...) + } + return fmt.Errorf("cannot send logs to Loki (HTTP %d), response body:\n%s", resp.StatusCode, b) + } + + return nil +} + +func (c *lokiClient) buildRequest() lokiRequest { + // Sort keys to guarantee deterministic output + var services []string + for svc, entries := range c.entries { + if len(entries) == 0 { + continue + } + services = append(services, svc) + } + sort.Strings(services) + + var req lokiRequest + for _, service := range services { + entries := c.entries[service] + stream := lokiStream{ + Labels: map[string]string{ + "pebble_service": service, + }, + Entries: entries, + } + req.Streams = append(req.Streams, stream) + } + return req +} + +func (c *lokiClient) emptyBuffer() { + for svc := range c.entries { + c.entries[svc] = c.entries[svc][:0] + } + c.numEntries = 0 +} + +type lokiRequest struct { + Streams []lokiStream `json:"streams"` +} + +type lokiStream struct { + Labels map[string]string `json:"stream"` + Entries []lokiEntry `json:"values"` +} + +type lokiEntry [2]string diff --git a/internals/overlord/logstate/loki_test.go b/internals/overlord/logstate/loki_test.go new file mode 100644 index 000000000..56973c0b2 --- /dev/null +++ b/internals/overlord/logstate/loki_test.go @@ -0,0 +1,175 @@ +package logstate + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "time" + + . "gopkg.in/check.v1" + + "github.com/canonical/pebble/internals/plan" + "github.com/canonical/pebble/internals/servicelog" +) + +type lokiSuite struct{} + +var _ = Suite(&lokiSuite{}) + +func (*lokiSuite) TestLokiRequest(c *C) { + input := []servicelog.Entry{{ + Time: time.Date(2023, 12, 31, 12, 34, 50, 0, time.UTC), + Service: "svc1", + Message: "log line #1\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 51, 0, time.UTC), + Service: "svc2", + Message: "log line #2\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 52, 0, time.UTC), + Service: "svc1", + Message: "log line #3\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 53, 0, time.UTC), + Service: "svc3", + Message: "log line #4\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 54, 0, time.UTC), + Service: "svc1", + Message: "log line #5\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 55, 0, time.UTC), + Service: "svc3", + Message: "log line #6\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 56, 0, time.UTC), + Service: "svc2", + Message: "log line #7\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 57, 0, time.UTC), + Service: "svc1", + Message: "log line #8\n", + }, { + Time: time.Date(2023, 12, 31, 12, 34, 58, 0, time.UTC), + Service: "svc4", + Message: "log line #9\n", + }} + + expected := ` +{ + "streams": [ + { + "stream": { + "pebble_service": "svc1" + }, + "values": [ + [ "1704026090000000000", "log line #1" ], + [ "1704026092000000000", "log line #3" ], + [ "1704026094000000000", "log line #5" ], + [ "1704026097000000000", "log line #8" ] + ] + }, + { + "stream": { + "pebble_service": "svc2" + }, + "values": [ + [ "1704026091000000000", "log line #2" ], + [ "1704026096000000000", "log line #7" ] + ] + }, + { + "stream": { + "pebble_service": "svc3" + }, + "values": [ + [ "1704026093000000000", "log line #4" ], + [ "1704026095000000000", "log line #6" ] + ] + }, + { + "stream": { + "pebble_service": "svc4" + }, + "values": [ + [ "1704026098000000000", "log line #9" ] + ] + } + ] +} +` + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c.Assert(r.Method, Equals, http.MethodPost) + c.Assert(r.Header.Get("Content-Type"), Equals, "application/json") + + reqBody, err := io.ReadAll(r.Body) + c.Assert(err, IsNil) + expFlattened, err := flattenJSON(expected) + c.Assert(err, IsNil) + c.Assert(string(reqBody), Equals, expFlattened) + })) + defer server.Close() + + client := newLokiClient(&plan.LogTarget{Location: server.URL}) + for _, entry := range input { + err := client.Write(context.Background(), entry) + c.Assert(err, IsNil) + } + + err := client.Flush(context.Background()) + c.Assert(err, IsNil) +} + +func (*lokiSuite) TestLokiFlushCancelContext(c *C) { + serverCtx, killServer := context.WithCancel(context.Background()) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + select { + case <-serverCtx.Done(): + // Simulate a slow-responding server + case <-time.After(10 * time.Second): + } + })) + defer server.Close() + defer killServer() + + client := newLokiClient(&plan.LogTarget{Location: server.URL}) + err := client.Write(context.Background(), servicelog.Entry{ + Time: time.Now(), + Service: "svc1", + Message: "this is a log line\n", + }) + c.Assert(err, IsNil) + + flushReturned := make(chan struct{}) + go func() { + // Cancel the Flush context quickly + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Microsecond) + defer cancel() + + err := client.Flush(ctx) + c.Assert(err, ErrorMatches, ".*context deadline exceeded") + close(flushReturned) + }() + + // Check Flush returns quickly after context timeout + select { + case <-flushReturned: + case <-time.After(1 * time.Second): + c.Fatal("lokiClient.Flush took too long to return after context timeout") + } +} + +// Strips all extraneous whitespace from JSON +func flattenJSON(s string) (string, error) { + var v any + err := json.Unmarshal([]byte(s), &v) + if err != nil { + return "", err + } + + b, err := json.Marshal(v) + return string(b), err +}