Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logfwd): support forwarding logs to Loki #267

Merged
merged 39 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
f56d4ad
Implement Loki client
barrettj12 Jul 10, 2023
8ff49d8
update README
barrettj12 Aug 28, 2023
72942ad
don't need to allocate for errors.As
barrettj12 Aug 28, 2023
e91b3d9
remove clienterr package
barrettj12 Aug 29, 2023
4d8e66f
don't drop logs on 429 / 5xx
barrettj12 Aug 29, 2023
a702d37
remove retryAfter from loki.Client
barrettj12 Aug 29, 2023
a8b0c3e
revert changes to .golangci.yml
barrettj12 Aug 29, 2023
2c427e4
rename gatherer timer -> flushTimer
barrettj12 Aug 29, 2023
2d62565
rename SetRequestTimeout -> FakeRequestTimeout
barrettj12 Aug 29, 2023
8a77df6
[README] better names for example targets
barrettj12 Aug 29, 2023
0af606a
empty buffer inside handleServerResponse
barrettj12 Aug 30, 2023
7e00ccd
fix errFromResponse
barrettj12 Aug 30, 2023
5d21565
elaborate on retry comments
barrettj12 Aug 30, 2023
2b78abe
address Gustavo's review comments
barrettj12 Sep 19, 2023
0142e7e
rejig loki error handling
barrettj12 Sep 19, 2023
79187af
if-else for debug logging error
barrettj12 Sep 19, 2023
1c694b3
success is only 200 or 204
barrettj12 Sep 20, 2023
0ea5f34
pull log-counting / flushing logic into gatherer
barrettj12 Sep 20, 2023
f7ef7ff
store entries in slice
barrettj12 Sep 20, 2023
f9fea04
fix up the tests
barrettj12 Sep 20, 2023
892c037
rename AddLog to Add
barrettj12 Sep 20, 2023
bce2a86
continue stmt after flush
barrettj12 Sep 20, 2023
2dbbfb6
count # of written logs in gatherer
barrettj12 Sep 20, 2023
9b72978
reallocate buffer periodically to avoid memory leaks
barrettj12 Sep 20, 2023
fabf8a5
add test for retry/truncate logic
barrettj12 Sep 20, 2023
1c0ed80
fix gathererSuite.TestRetryLoki
barrettj12 Sep 21, 2023
8ea3930
fix imports
barrettj12 Sep 21, 2023
6a733c0
increase timeouts to improve test reliability
barrettj12 Sep 21, 2023
3c6e172
remove unnecessary comment
barrettj12 Sep 21, 2023
4dab1dd
bump test timeouts to 1 sec
barrettj12 Sep 21, 2023
084544b
use json.Compact in tests
barrettj12 Sep 21, 2023
a5ab88c
loki test: handler doesn't need to be pointer
barrettj12 Sep 21, 2023
c4e153c
Address Gustavo's review comments
barrettj12 Sep 22, 2023
164f9ad
Rename logGathererArgs -> *logGathererOptions
barrettj12 Sep 22, 2023
c6b27b0
tweak 4xx comment
barrettj12 Sep 22, 2023
e80cfcd
when truncating, zero the element to allow GC
barrettj12 Sep 22, 2023
159bbe9
reuse same buffer instead of reallocating
barrettj12 Sep 25, 2023
a94d8ff
add test for buffer recycling
barrettj12 Sep 25, 2023
20c76b2
Address Ben's comments on testing
barrettj12 Sep 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .github/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,5 @@ issues:
# these values ensure that all issues will be surfaced
max-issues-per-linter: 0
max-same-issues: 0

run:
timeout: 5m

timeout: 5m
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
53 changes: 40 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -403,30 +403,26 @@ $ pebble run --verbose
...
```

<!--
TODO: uncomment this section once log forwarding is fully implemented
TODO: add log targets to the Pebble layer spec below

#### Log forwarding

Pebble supports forwarding its services' logs to a remote Loki server or syslog receiver (via UDP/TCP). In the `log-targets` section of the plan, you can specify destinations for log forwarding, for example:
Pebble supports forwarding its services' logs to a remote Loki server. In the `log-targets` section of the plan, you can specify destinations for log forwarding, for example:
```yaml
log-targets:
loki-example:
example1:
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
override: merge
type: loki
location: http://10.1.77.205:3100/loki/api/v1/push
services: [all]
syslog-example:
example2:
override: merge
type: syslog
location: tcp://192.168.10.241:1514
type: loki
location: http://my.loki.server.com/loki/api/v1/push
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
services: [svc1, svc2]
```

For each log target, use the `services` key to specify a list of services to collect logs from. In the above example, the `syslog-example` target will collect logs from `svc1` and `svc2`.
For each log target, use the `services` key to specify a list of services to collect logs from. In the above example, the `example2` target will collect logs from `svc1` and `svc2`.

Use the special keyword `all` to match all services, including services that might be added in future layers. In the above example, `loki-example` will collect logs from all services.
Use the special keyword `all` to match all services, including services that might be added in future layers. In the above example, `example1` will collect logs from all services.

To remove a service from a log target when merging, prefix the service name with a minus `-`. For example, if we have a base layer with
```yaml
Expand Down Expand Up @@ -455,7 +451,6 @@ my-target:
```
would remove all services and then add `svc1`, so `my-target` would receive logs from only `svc1`.

-->

## Container usage

Expand Down Expand Up @@ -721,6 +716,37 @@ checks:
# (Optional) Working directory to run command in. By default, the
# command is run in the service manager's current directory.
working-dir: <directory>

# (Optional) A list of remote log receivers, to which service logs can be sent.
log-targets:

<log target name>:

# (Required) Control how this log target definition is combined with
# other pre-existing definitions with the same name in the Pebble plan.
#
# The value 'merge' will ensure that values in this layer specification
# are merged over existing definitions, whereas 'replace' will entirely
# override the existing target spec in the plan with the same name.
override: merge | replace

# (Required) The type of log target, which determines the format in
# which logs will be sent. Currently, the only supported type is 'loki',
# but more protocols may be added in the future.
type: loki

# (Required) The URL of the remote log target.
# For Loki, this needs to be the fully-qualified URL of the push API,
# including the API endpoint, e.g.
# http://<ip-address>:3100/loki/api/v1/push
location: <url>

# (Optional) A list of services whose logs will be sent to this target.
# Use the special keyword 'all' to match all services in the plan.
# When merging log targets, the 'services' lists are appended. Prefix a
# service name with a minus (e.g. '-svc1') to remove a previously added
# service. '-all' will remove all services.
services: [<service names>]
```

## API and clients
Expand Down Expand Up @@ -753,7 +779,8 @@ Here are some of the things coming soon:
- [x] Automatically restart services that fail
- [x] Support for custom health checks (HTTP, TCP, command)
- [x] Terminate all services before exiting run command
- [ ] Log forwarding (syslog and Loki)
- [x] Log forwarding to Loki
- [ ] Log forwarding to syslog
- [ ] [Other in-progress PRs](https://github.com/canonical/pebble/pulls)
- [ ] [Other requested features](https://github.com/canonical/pebble/issues)

Expand Down
83 changes: 83 additions & 0 deletions internals/overlord/logstate/clienterr/clienterr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

/*
Package clienterr contains common error types which are recognised by the log
gatherer. logstate.logClient implementations should return these error types
to communicate with the gatherer.

Errors in this package should be pattern-matched using errors.As:

err := client.Flush(ctx)
var backoff *clienterr.Backoff
if errors.As(err, &backoff) {
...
}
*/
package clienterr
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved

import (
"bytes"
"fmt"
"io"
"net/http"
"time"
)

// Backoff should be returned if the server indicates we are sending too many
// requests (e.g. an HTTP 429 response).
type Backoff struct {
RetryAfter *time.Time
}

func (e *Backoff) Error() string {
errStr := "too many requests"
if e.RetryAfter != nil {
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
errStr += ", retry after " + e.RetryAfter.String()
}
return errStr
}

// ErrorResponse represents an HTTP error response from the server
// (4xx or 5xx).
// This is a generic, catch-all error type - clients should use a more refined
// type when appropriate (e.g. Backoff for a 429 response).
type ErrorResponse struct {
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
StatusCode int
Body bytes.Buffer
ReadErr error
}

func (e *ErrorResponse) Error() string {
errStr := fmt.Sprintf("server returned HTTP %d\n", e.StatusCode)
if e.Body.Len() > 0 {
errStr += fmt.Sprintf(`response body:
%s
`, e.Body.String())
}
if e.ReadErr != nil {
errStr += "cannot read response body: " + e.ReadErr.Error()
}
return errStr
}

// ErrorFromResponse generates a *ErrorResponse from a failed *http.Response.
// NB: this function reads the response body.
func ErrorFromResponse(resp *http.Response) *ErrorResponse {
err := &ErrorResponse{}
err.StatusCode = resp.StatusCode
_, readErr := io.CopyN(&err.Body, resp.Body, 1024)
err.ReadErr = readErr
return err
}
59 changes: 41 additions & 18 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ package logstate

import (
"context"
"errors"
"fmt"
"time"

"gopkg.in/tomb.v2"

"github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/overlord/logstate/clienterr"
"github.com/canonical/pebble/internals/overlord/logstate/loki"
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)
Expand Down Expand Up @@ -71,6 +74,8 @@ type logGatherer struct {
pullers *pullerGroup
// All pullers send logs on this channel, received by main loop
entryCh chan servicelog.Entry

timer timer
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
}

// logGathererArgs allows overriding the newLogClient method and time values
Expand Down Expand Up @@ -103,6 +108,7 @@ func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logG
client: client,
entryCh: make(chan servicelog.Entry),
pullers: newPullerGroup(target.Name),
timer: newTimer(),
}
g.clientCtx, g.clientCancel = context.WithCancel(context.Background())
g.tomb.Go(g.loop)
Expand Down Expand Up @@ -169,43 +175,48 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R
// pullers on entryCh, and writes them to the client. It also flushes the
// client periodically, and exits when the gatherer's tomb is killed.
func (g *logGatherer) loop() error {
timer := newTimer()
defer timer.Stop()
defer g.timer.Stop()

mainLoop:
for {
select {
case <-g.tomb.Dying():
break mainLoop

case <-timer.Expired():
case <-g.timer.Expired():
// Mark timer as unset
timer.Stop()
err := g.client.Flush(g.clientCtx)
if err != nil {
logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
}
g.timer.Stop()
g.handleClientErr(g.client.Flush(g.clientCtx))

case entry := <-g.entryCh:
err := g.client.Write(g.clientCtx, entry)
if err != nil {
logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err)
}
timer.EnsureSet(g.bufferTimeout)
g.handleClientErr(g.client.Write(g.clientCtx, entry))
g.timer.EnsureSet(g.bufferTimeout)
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Final flush to send any remaining logs buffered in the client
// We need to create a new context, as the previous one may have been cancelled.
ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush)
defer cancel()
err := g.client.Flush(ctx)
if err != nil {
logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
}
g.handleClientErr(g.client.Flush(ctx))
return nil
}

func (g *logGatherer) handleClientErr(err error) {
if err == nil {
return
}

var backoff *clienterr.Backoff
if errors.As(err, &backoff) {
logger.Noticef("Target %q: %v", g.targetName, err)
g.timer.retryAfter = backoff.RetryAfter
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
return
}

logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
}

// Stop tears down the gatherer and associated resources (pullers, client).
// This method will block until gatherer teardown is complete.
//
Expand Down Expand Up @@ -250,6 +261,8 @@ func (g *logGatherer) Stop() {
type timer struct {
timer *time.Timer
set bool
// If non-nil, the timer won't expire until after this time
retryAfter *time.Time
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
}

func newTimer() timer {
Expand Down Expand Up @@ -279,6 +292,15 @@ func (t *timer) EnsureSet(timeout time.Duration) {
return
}

if t.retryAfter != nil {
// We've been told to wait before retrying
retryTime := time.Until(*t.retryAfter)
if retryTime > timeout {
timeout = retryTime
}
t.retryAfter = nil
}

t.timer.Reset(timeout)
t.set = true
}
Expand Down Expand Up @@ -311,7 +333,8 @@ type logClient interface {

func newLogClient(target *plan.LogTarget) (logClient, error) {
switch target.Type {
//case plan.LokiTarget: TODO
case plan.LokiTarget:
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
return loki.NewClient(target), nil
//case plan.SyslogTarget: TODO
default:
return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name)
Expand Down
25 changes: 25 additions & 0 deletions internals/overlord/logstate/loki/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package loki

import "time"

func SetRequestTimeout(new time.Duration) (restore func()) {
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
oldRequestTimeout := requestTimeout
requestTimeout = new
return func() {
requestTimeout = oldRequestTimeout
}
}
Loading
Loading