diff --git a/.github/.golangci.yml b/.github/.golangci.yml
index 2e368060..7d784fdf 100644
--- a/.github/.golangci.yml
+++ b/.github/.golangci.yml
@@ -12,7 +12,5 @@ issues:
# these values ensure that all issues will be surfaced
max-issues-per-linter: 0
max-same-issues: 0
-
run:
- timeout: 5m
-
+ timeout: 5m
\ No newline at end of file
diff --git a/internals/overlord/logstate/clienterr/clienterr.go b/internals/overlord/logstate/clienterr/clienterr.go
new file mode 100644
index 00000000..c756ae13
--- /dev/null
+++ b/internals/overlord/logstate/clienterr/clienterr.go
@@ -0,0 +1,81 @@
+// Copyright (c) 2023 Canonical Ltd
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3 as
+// published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+/*
+Package clienterr contains common error types which are recognised by the log
+gatherer. logstate.logClient implementations should return these error types
+to communicate with the gatherer.
+
+Errors in this package should be pattern-matched using errors.As:
+
+ err := client.Flush(ctx)
+ backoff := &clienterr.Backoff{}
+ if errors.As(err, &backoff) {
+ ...
+ }
+*/
+package clienterr
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "net/http"
+ "time"
+)
+
+// Backoff should be returned if the server indicates we are sending too many
+// requests (e.g. an HTTP 429 response).
+type Backoff struct {
+ RetryAfter *time.Time
+}
+
+func (e *Backoff) Error() string {
+ errStr := "too many requests"
+ if e.RetryAfter != nil {
+ errStr += ", retry after " + e.RetryAfter.String()
+ }
+ return errStr
+}
+
+// ErrorResponse represents an HTTP error response from the server
+// (4xx or 5xx).
+type ErrorResponse struct {
+ StatusCode int
+ Body bytes.Buffer
+ ReadErr error
+}
+
+func (e *ErrorResponse) Error() string {
+ errStr := fmt.Sprintf("server returned HTTP %d\n", e.StatusCode)
+ if e.Body.Len() > 0 {
+ errStr += fmt.Sprintf(`response body:
+%s
+`, e.Body.String())
+ }
+ if e.ReadErr != nil {
+ errStr += "cannot read response body: " + e.ReadErr.Error()
+ }
+ return errStr
+}
+
+// ErrorFromResponse generates a *ErrorResponse from a failed *http.Response.
+// NB: this function reads the response body.
+func ErrorFromResponse(resp *http.Response) *ErrorResponse {
+ err := &ErrorResponse{}
+ err.StatusCode = resp.StatusCode
+ _, readErr := io.CopyN(&err.Body, resp.Body, 1024)
+ err.ReadErr = readErr
+ return err
+}
diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go
index 001b9e2f..f764d7bc 100644
--- a/internals/overlord/logstate/gatherer.go
+++ b/internals/overlord/logstate/gatherer.go
@@ -16,12 +16,15 @@ package logstate
import (
"context"
+ "errors"
"fmt"
"time"
"gopkg.in/tomb.v2"
"github.com/canonical/pebble/internals/logger"
+ "github.com/canonical/pebble/internals/overlord/logstate/clienterr"
+ "github.com/canonical/pebble/internals/overlord/logstate/loki"
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)
@@ -71,6 +74,8 @@ type logGatherer struct {
pullers *pullerGroup
// All pullers send logs on this channel, received by main loop
entryCh chan servicelog.Entry
+
+ timer timer
}
// logGathererArgs allows overriding the newLogClient method and time values
@@ -103,6 +108,7 @@ func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logG
client: client,
entryCh: make(chan servicelog.Entry),
pullers: newPullerGroup(target.Name),
+ timer: newTimer(),
}
g.clientCtx, g.clientCancel = context.WithCancel(context.Background())
g.tomb.Go(g.loop)
@@ -169,8 +175,7 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R
// pullers on entryCh, and writes them to the client. It also flushes the
// client periodically, and exits when the gatherer's tomb is killed.
func (g *logGatherer) loop() error {
- timer := newTimer()
- defer timer.Stop()
+ defer g.timer.Stop()
mainLoop:
for {
@@ -178,20 +183,14 @@ mainLoop:
case <-g.tomb.Dying():
break mainLoop
- case <-timer.Expired():
+ case <-g.timer.Expired():
// Mark timer as unset
- timer.Stop()
- err := g.client.Flush(g.clientCtx)
- if err != nil {
- logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
- }
+ g.timer.Stop()
+ g.handleClientErr(g.client.Flush(g.clientCtx))
case entry := <-g.entryCh:
- err := g.client.Write(g.clientCtx, entry)
- if err != nil {
- logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err)
- }
- timer.EnsureSet(g.bufferTimeout)
+ g.handleClientErr(g.client.Write(g.clientCtx, entry))
+ g.timer.EnsureSet(g.bufferTimeout)
}
}
@@ -199,13 +198,25 @@ mainLoop:
// We need to create a new context, as the previous one may have been cancelled.
ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush)
defer cancel()
- err := g.client.Flush(ctx)
- if err != nil {
- logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
- }
+ g.handleClientErr(g.client.Flush(ctx))
return nil
}
+func (g *logGatherer) handleClientErr(err error) {
+ if err == nil {
+ return
+ }
+
+ backoff := &clienterr.Backoff{}
+ if errors.As(err, &backoff) {
+ logger.Noticef("Target %q: %v", g.targetName, err)
+ g.timer.retryAfter = backoff.RetryAfter
+ return
+ }
+
+ logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
+}
+
// Stop tears down the gatherer and associated resources (pullers, client).
// This method will block until gatherer teardown is complete.
//
@@ -250,6 +261,8 @@ func (g *logGatherer) Stop() {
type timer struct {
timer *time.Timer
set bool
+ // If non-nil, the timer won't expire until after this time
+ retryAfter *time.Time
}
func newTimer() timer {
@@ -279,6 +292,15 @@ func (t *timer) EnsureSet(timeout time.Duration) {
return
}
+ if t.retryAfter != nil {
+ // We've been told to wait before retrying
+ retryTime := time.Until(*t.retryAfter)
+ if retryTime > timeout {
+ timeout = retryTime
+ }
+ t.retryAfter = nil
+ }
+
t.timer.Reset(timeout)
t.set = true
}
@@ -311,7 +333,8 @@ type logClient interface {
func newLogClient(target *plan.LogTarget) (logClient, error) {
switch target.Type {
- //case plan.LokiTarget: TODO
+ case plan.LokiTarget:
+ return loki.NewClient(target), nil
//case plan.SyslogTarget: TODO
default:
return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name)
diff --git a/internals/overlord/logstate/loki/export_test.go b/internals/overlord/logstate/loki/export_test.go
new file mode 100644
index 00000000..f1616b2f
--- /dev/null
+++ b/internals/overlord/logstate/loki/export_test.go
@@ -0,0 +1,25 @@
+// Copyright (c) 2023 Canonical Ltd
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3 as
+// published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+package loki
+
+import "time"
+
+func SetRequestTimeout(new time.Duration) (restore func()) {
+ oldRequestTimeout := requestTimeout
+ requestTimeout = new
+ return func() {
+ requestTimeout = oldRequestTimeout
+ }
+}
diff --git a/internals/overlord/logstate/loki/loki.go b/internals/overlord/logstate/loki/loki.go
new file mode 100644
index 00000000..88516730
--- /dev/null
+++ b/internals/overlord/logstate/loki/loki.go
@@ -0,0 +1,201 @@
+// Copyright (c) 2023 Canonical Ltd
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3 as
+// published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+package loki
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "sort"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/canonical/pebble/cmd"
+ "github.com/canonical/pebble/internals/overlord/logstate/clienterr"
+ "github.com/canonical/pebble/internals/plan"
+ "github.com/canonical/pebble/internals/servicelog"
+)
+
+const maxRequestEntries = 1000
+
+var requestTimeout = 10 * time.Second
+
+type Client struct {
+ remoteURL string
+ // buffered entries are "sharded" by service name
+ entries map[string][]lokiEntry
+ // Keep track of the number of buffered entries, to avoid having to iterate
+ // the entries map to get the total number.
+ numEntries int
+
+ httpClient *http.Client
+ retryAfter *time.Time
+}
+
+func NewClient(target *plan.LogTarget) *Client {
+ return &Client{
+ remoteURL: target.Location,
+ entries: map[string][]lokiEntry{},
+ httpClient: &http.Client{Timeout: requestTimeout},
+ }
+}
+
+func (c *Client) Write(ctx context.Context, entry servicelog.Entry) error {
+ c.entries[entry.Service] = append(c.entries[entry.Service], encodeEntry(entry))
+ c.numEntries++
+
+ if c.numEntries >= maxRequestEntries {
+ if c.retryAfter != nil {
+ if time.Now().Before(*c.retryAfter) {
+ // Retry-After deadline hasn't passed yet, so we shouldn't flush
+ return nil
+ }
+ c.retryAfter = nil
+ }
+ return c.Flush(ctx)
+ }
+ return nil
+}
+
+func encodeEntry(entry servicelog.Entry) lokiEntry {
+ return lokiEntry{
+ strconv.FormatInt(entry.Time.UnixNano(), 10),
+ strings.TrimSuffix(entry.Message, "\n"),
+ }
+}
+
+func (c *Client) Flush(ctx context.Context) error {
+ if c.numEntries == 0 {
+ return nil // no-op
+ }
+ defer c.emptyBuffer()
+
+ req := c.buildRequest()
+ jsonReq, err := json.Marshal(req)
+ if err != nil {
+ return fmt.Errorf("encoding request to JSON: %v", err)
+ }
+
+ httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.remoteURL, bytes.NewReader(jsonReq))
+ if err != nil {
+ return fmt.Errorf("creating HTTP request: %v", err)
+ }
+ httpReq.Header.Set("Content-Type", "application/json; charset=utf-8")
+ httpReq.Header.Set("User-Agent", fmt.Sprintf("pebble/%s", cmd.Version))
+
+ resp, err := c.httpClient.Do(httpReq)
+ if err != nil {
+ return err
+ }
+ return c.handleServerResponse(resp)
+}
+
+func (c *Client) emptyBuffer() {
+ for svc := range c.entries {
+ c.entries[svc] = c.entries[svc][:0]
+ }
+ c.numEntries = 0
+}
+
+func (c *Client) buildRequest() request {
+ // Sort keys to guarantee deterministic output
+ services := make([]string, 0, len(c.entries))
+ for svc, entries := range c.entries {
+ if len(entries) == 0 {
+ continue
+ }
+ services = append(services, svc)
+ }
+ sort.Strings(services)
+
+ var req request
+ for _, service := range services {
+ entries := c.entries[service]
+ stream := stream{
+ Labels: map[string]string{
+ "pebble_service": service,
+ },
+ Entries: entries,
+ }
+ req.Streams = append(req.Streams, stream)
+ }
+ return req
+}
+
+type request struct {
+ Streams []stream `json:"streams"`
+}
+
+type stream struct {
+ Labels map[string]string `json:"stream"`
+ Entries []lokiEntry `json:"values"`
+}
+
+type lokiEntry [2]string
+
+func (c *Client) handleServerResponse(resp *http.Response) error {
+ defer func() {
+ // Drain request body to allow connection reuse
+ // see https://pkg.go.dev/net/http#Response.Body
+ _, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, 1024*1024))
+ _ = resp.Body.Close()
+ }()
+
+ code := resp.StatusCode
+ switch {
+ case code == http.StatusTooManyRequests:
+ err := &clienterr.Backoff{}
+ retryAfter, ok := getRetryAfter(resp)
+ if ok {
+ err.RetryAfter = &retryAfter
+ c.retryAfter = &retryAfter
+ }
+ return err
+
+ case code >= 400:
+ // Request to Loki failed
+ return clienterr.ErrorFromResponse(resp)
+ }
+
+ return nil
+}
+
+// Gets the parsed value of Retry-After from HTTP response headers.
+func getRetryAfter(resp *http.Response) (time.Time, bool) {
+ retryAfterRaw := resp.Header.Get("Retry-After")
+ if retryAfterRaw == "" {
+ // Header unset
+ return time.Time{}, false
+ }
+
+ // The Retry-After value can be a date-time
+ t, err := http.ParseTime(retryAfterRaw)
+ if err == nil {
+ return t, true
+ }
+
+ // It can also be an integer number of seconds
+ n, err := strconv.Atoi(retryAfterRaw)
+ if err == nil && n > 0 {
+ t := time.Now().Add(time.Duration(n) * time.Second)
+ return t, true
+ }
+
+ return time.Time{}, false
+}
diff --git a/internals/overlord/logstate/loki/loki_test.go b/internals/overlord/logstate/loki/loki_test.go
new file mode 100644
index 00000000..29e9ca92
--- /dev/null
+++ b/internals/overlord/logstate/loki/loki_test.go
@@ -0,0 +1,241 @@
+// Copyright (c) 2023 Canonical Ltd
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3 as
+// published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+package loki_test
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+ "time"
+
+ . "gopkg.in/check.v1"
+
+ "github.com/canonical/pebble/internals/overlord/logstate/clienterr"
+ "github.com/canonical/pebble/internals/overlord/logstate/loki"
+ "github.com/canonical/pebble/internals/plan"
+ "github.com/canonical/pebble/internals/servicelog"
+)
+
+type suite struct{}
+
+var _ = Suite(&suite{})
+
+func Test(t *testing.T) {
+ TestingT(t)
+}
+
+func (*suite) TestRequest(c *C) {
+ input := []servicelog.Entry{{
+ Time: time.Date(2023, 12, 31, 12, 34, 50, 0, time.UTC),
+ Service: "svc1",
+ Message: "log line #1\n",
+ }, {
+ Time: time.Date(2023, 12, 31, 12, 34, 51, 0, time.UTC),
+ Service: "svc2",
+ Message: "log line #2\n",
+ }, {
+ Time: time.Date(2023, 12, 31, 12, 34, 52, 0, time.UTC),
+ Service: "svc1",
+ Message: "log line #3\n",
+ }, {
+ Time: time.Date(2023, 12, 31, 12, 34, 53, 0, time.UTC),
+ Service: "svc3",
+ Message: "log line #4\n",
+ }, {
+ Time: time.Date(2023, 12, 31, 12, 34, 54, 0, time.UTC),
+ Service: "svc1",
+ Message: "log line #5\n",
+ }, {
+ Time: time.Date(2023, 12, 31, 12, 34, 55, 0, time.UTC),
+ Service: "svc3",
+ Message: "log line #6\n",
+ }, {
+ Time: time.Date(2023, 12, 31, 12, 34, 56, 0, time.UTC),
+ Service: "svc2",
+ Message: "log line #7\n",
+ }, {
+ Time: time.Date(2023, 12, 31, 12, 34, 57, 0, time.UTC),
+ Service: "svc1",
+ Message: "log line #8\n",
+ }, {
+ Time: time.Date(2023, 12, 31, 12, 34, 58, 0, time.UTC),
+ Service: "svc4",
+ Message: "log line #9\n",
+ }}
+
+ expected := `
+{
+ "streams": [
+ {
+ "stream": {
+ "pebble_service": "svc1"
+ },
+ "values": [
+ [ "1704026090000000000", "log line #1" ],
+ [ "1704026092000000000", "log line #3" ],
+ [ "1704026094000000000", "log line #5" ],
+ [ "1704026097000000000", "log line #8" ]
+ ]
+ },
+ {
+ "stream": {
+ "pebble_service": "svc2"
+ },
+ "values": [
+ [ "1704026091000000000", "log line #2" ],
+ [ "1704026096000000000", "log line #7" ]
+ ]
+ },
+ {
+ "stream": {
+ "pebble_service": "svc3"
+ },
+ "values": [
+ [ "1704026093000000000", "log line #4" ],
+ [ "1704026095000000000", "log line #6" ]
+ ]
+ },
+ {
+ "stream": {
+ "pebble_service": "svc4"
+ },
+ "values": [
+ [ "1704026098000000000", "log line #9" ]
+ ]
+ }
+ ]
+}
+`
+
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ c.Assert(r.Method, Equals, http.MethodPost)
+ c.Assert(r.Header.Get("Content-Type"), Equals, "application/json; charset=utf-8")
+
+ reqBody, err := io.ReadAll(r.Body)
+ c.Assert(err, IsNil)
+ expFlattened, err := flattenJSON(expected)
+ c.Assert(err, IsNil)
+ c.Assert(string(reqBody), Equals, expFlattened)
+ }))
+ defer server.Close()
+
+ client := loki.NewClient(&plan.LogTarget{Location: server.URL})
+ for _, entry := range input {
+ err := client.Write(context.Background(), entry)
+ c.Assert(err, IsNil)
+ }
+
+ err := client.Flush(context.Background())
+ c.Assert(err, IsNil)
+}
+
+func (*suite) TestFlushCancelContext(c *C) {
+ serverCtx, killServer := context.WithCancel(context.Background())
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ select {
+ case <-serverCtx.Done():
+ // Simulate a slow-responding server
+ case <-time.After(10 * time.Second):
+ }
+ }))
+ defer server.Close()
+ defer killServer()
+
+ client := loki.NewClient(&plan.LogTarget{Location: server.URL})
+ err := client.Write(context.Background(), servicelog.Entry{
+ Time: time.Now(),
+ Service: "svc1",
+ Message: "this is a log line\n",
+ })
+ c.Assert(err, IsNil)
+
+ flushReturned := make(chan struct{})
+ go func() {
+ // Cancel the Flush context quickly
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Microsecond)
+ defer cancel()
+
+ err := client.Flush(ctx)
+ c.Assert(err, ErrorMatches, ".*context deadline exceeded.*")
+ close(flushReturned)
+ }()
+
+ // Check Flush returns quickly after context timeout
+ select {
+ case <-flushReturned:
+ case <-time.After(1 * time.Second):
+ c.Fatal("lokiClient.Flush took too long to return after context timeout")
+ }
+}
+
+func (*suite) TestServerTimeout(c *C) {
+ restore := loki.SetRequestTimeout(1 * time.Microsecond)
+ defer restore()
+
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ time.Sleep(2 * time.Microsecond)
+ }))
+ defer server.Close()
+
+ client := loki.NewClient(&plan.LogTarget{Location: server.URL})
+ err := client.Write(context.Background(), servicelog.Entry{
+ Time: time.Now(),
+ Service: "svc1",
+ Message: "this is a log line\n",
+ })
+ c.Assert(err, IsNil)
+
+ err = client.Flush(context.Background())
+ c.Assert(err, ErrorMatches, ".*context deadline exceeded.*")
+}
+
+func (*suite) TestTooManyRequests(c *C) {
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Add("Retry-After", "Tue, 15 Aug 2023 08:49:37 GMT")
+ w.WriteHeader(http.StatusTooManyRequests)
+ }))
+ defer server.Close()
+
+ client := loki.NewClient(&plan.LogTarget{Location: server.URL})
+ err := client.Write(context.Background(), servicelog.Entry{
+ Time: time.Now(),
+ Service: "svc1",
+ Message: "this is a log line\n",
+ })
+ c.Assert(err, IsNil)
+
+ err = client.Flush(context.Background())
+ backoff := &clienterr.Backoff{}
+ c.Assert(errors.As(err, &backoff), Equals, true)
+
+ expectedTime := time.Date(2023, 8, 15, 8, 49, 37, 0, time.UTC)
+ c.Check(backoff.RetryAfter, DeepEquals, &expectedTime)
+}
+
+// Strips all extraneous whitespace from JSON
+func flattenJSON(s string) (string, error) {
+ var v any
+ err := json.Unmarshal([]byte(s), &v)
+ if err != nil {
+ return "", err
+ }
+
+ b, err := json.Marshal(v)
+ return string(b), err
+}