Skip to content

Commit

Permalink
feat(logfwd): support forwarding logs to Loki (#267)
Browse files Browse the repository at this point in the history
Add a loki.Client type that encodes sends Pebble logs to Loki. This implements the logstate.logClient interface introduced in #256.
  • Loading branch information
barrettj12 authored Sep 25, 2023
1 parent 32b1b6e commit b4666dd
Show file tree
Hide file tree
Showing 7 changed files with 725 additions and 84 deletions.
53 changes: 40 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -405,30 +405,26 @@ $ pebble run --verbose
...
```
<!--
TODO: uncomment this section once log forwarding is fully implemented
TODO: add log targets to the Pebble layer spec below
#### Log forwarding
Pebble supports forwarding its services' logs to a remote Loki server or syslog receiver (via UDP/TCP). In the `log-targets` section of the plan, you can specify destinations for log forwarding, for example:
Pebble supports forwarding its services' logs to a remote Loki server. In the `log-targets` section of the plan, you can specify destinations for log forwarding, for example:
```yaml
log-targets:
loki-example:
staging-logs:
override: merge
type: loki
location: http://10.1.77.205:3100/loki/api/v1/push
services: [all]
syslog-example:
production-logs:
override: merge
type: syslog
location: tcp://192.168.10.241:1514
type: loki
location: http://my.loki.server.com/loki/api/v1/push
services: [svc1, svc2]
```

For each log target, use the `services` key to specify a list of services to collect logs from. In the above example, the `syslog-example` target will collect logs from `svc1` and `svc2`.
For each log target, use the `services` key to specify a list of services to collect logs from. In the above example, the `production-logs` target will collect logs from `svc1` and `svc2`.

Use the special keyword `all` to match all services, including services that might be added in future layers. In the above example, `loki-example` will collect logs from all services.
Use the special keyword `all` to match all services, including services that might be added in future layers. In the above example, `staging-logs` will collect logs from all services.

To remove a service from a log target when merging, prefix the service name with a minus `-`. For example, if we have a base layer with
```yaml
Expand Down Expand Up @@ -457,7 +453,6 @@ my-target:
```
would remove all services and then add `svc1`, so `my-target` would receive logs from only `svc1`.

-->

## Container usage

Expand Down Expand Up @@ -723,6 +718,37 @@ checks:
# (Optional) Working directory to run command in. By default, the
# command is run in the service manager's current directory.
working-dir: <directory>
# (Optional) A list of remote log receivers, to which service logs can be sent.
log-targets:
<log target name>:
# (Required) Control how this log target definition is combined with
# other pre-existing definitions with the same name in the Pebble plan.
#
# The value 'merge' will ensure that values in this layer specification
# are merged over existing definitions, whereas 'replace' will entirely
# override the existing target spec in the plan with the same name.
override: merge | replace
# (Required) The type of log target, which determines the format in
# which logs will be sent. Currently, the only supported type is 'loki',
# but more protocols may be added in the future.
type: loki
# (Required) The URL of the remote log target.
# For Loki, this needs to be the fully-qualified URL of the push API,
# including the API endpoint, e.g.
# http://<ip-address>:3100/loki/api/v1/push
location: <url>
# (Optional) A list of services whose logs will be sent to this target.
# Use the special keyword 'all' to match all services in the plan.
# When merging log targets, the 'services' lists are appended. Prefix a
# service name with a minus (e.g. '-svc1') to remove a previously added
# service. '-all' will remove all services.
services: [<service names>]
```

## API and clients
Expand Down Expand Up @@ -755,7 +781,8 @@ Here are some of the things coming soon:
- [x] Automatically restart services that fail
- [x] Support for custom health checks (HTTP, TCP, command)
- [x] Terminate all services before exiting run command
- [ ] Log forwarding (syslog and Loki)
- [x] Log forwarding to Loki
- [ ] Log forwarding to syslog
- [ ] [Other in-progress PRs](https://github.com/canonical/pebble/pulls)
- [ ] [Other requested features](https://github.com/canonical/pebble/issues)

Expand Down
112 changes: 59 additions & 53 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
"gopkg.in/tomb.v2"

"github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/overlord/logstate/loki"
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)

const (
parserSize = 4 * 1024
bufferTimeout = 1 * time.Second
parserSize = 4 * 1024
bufferTimeout = 1 * time.Second
maxBufferedEntries = 100

// These constants control the maximum time allowed for each teardown step.
timeoutCurrentFlush = 1 * time.Second
Expand All @@ -55,7 +57,7 @@ const (
// Calling the Stop() method will tear down the logGatherer and all of its
// associated logPullers. Stop() can be called from an outside goroutine.
type logGatherer struct {
logGathererArgs
*logGathererOptions

targetName string
// tomb for the main loop
Expand All @@ -73,31 +75,32 @@ type logGatherer struct {
entryCh chan servicelog.Entry
}

// logGathererArgs allows overriding the newLogClient method and time values
// logGathererOptions allows overriding the newLogClient method and time values
// in testing.
type logGathererArgs struct {
bufferTimeout time.Duration
timeoutFinalFlush time.Duration
type logGathererOptions struct {
bufferTimeout time.Duration
maxBufferedEntries int
timeoutFinalFlush time.Duration
// method to get a new client
newClient func(*plan.LogTarget) (logClient, error)
}

func newLogGatherer(target *plan.LogTarget) (*logGatherer, error) {
return newLogGathererInternal(target, logGathererArgs{})
return newLogGathererInternal(target, &logGathererOptions{})
}

// newLogGathererInternal contains the actual creation code for a logGatherer.
// This function is used in the real implementation, but also allows overriding
// certain configuration values for testing.
func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logGatherer, error) {
args = fillDefaultArgs(args)
client, err := args.newClient(target)
func newLogGathererInternal(target *plan.LogTarget, options *logGathererOptions) (*logGatherer, error) {
options = fillDefaultOptions(options)
client, err := options.newClient(target)
if err != nil {
return nil, fmt.Errorf("cannot create log client: %w", err)
}

g := &logGatherer{
logGathererArgs: args,
logGathererOptions: options,

targetName: target.Name,
client: client,
Expand All @@ -111,17 +114,20 @@ func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logG
return g, nil
}

func fillDefaultArgs(args logGathererArgs) logGathererArgs {
if args.bufferTimeout == 0 {
args.bufferTimeout = bufferTimeout
func fillDefaultOptions(options *logGathererOptions) *logGathererOptions {
if options.bufferTimeout == 0 {
options.bufferTimeout = bufferTimeout
}
if args.timeoutFinalFlush == 0 {
args.timeoutFinalFlush = timeoutFinalFlush
if options.maxBufferedEntries == 0 {
options.maxBufferedEntries = maxBufferedEntries
}
if args.newClient == nil {
args.newClient = newLogClient
if options.timeoutFinalFlush == 0 {
options.timeoutFinalFlush = timeoutFinalFlush
}
return args
if options.newClient == nil {
options.newClient = newLogClient
}
return options
}

// PlanChanged is called by the LogManager when the plan is changed, if this
Expand Down Expand Up @@ -169,40 +175,52 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R
// pullers on entryCh, and writes them to the client. It also flushes the
// client periodically, and exits when the gatherer's tomb is killed.
func (g *logGatherer) loop() error {
timer := newTimer()
defer timer.Stop()
flushTimer := newTimer()
defer flushTimer.Stop()
// Keep track of number of logs written since last flush
numWritten := 0

flushClient := func(ctx context.Context) {
// Mark timer as unset
flushTimer.Stop()
err := g.client.Flush(ctx)
if err != nil {
logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
}
numWritten = 0
}

mainLoop:
for {
select {
case <-g.tomb.Dying():
break mainLoop

case <-timer.Expired():
// Mark timer as unset
timer.Stop()
err := g.client.Flush(g.clientCtx)
if err != nil {
logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
}
case <-flushTimer.Expired():
flushClient(g.clientCtx)

case entry := <-g.entryCh:
err := g.client.Write(g.clientCtx, entry)
err := g.client.Add(entry)
if err != nil {
logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err)
continue
}
numWritten++
// Check if buffer is full
if numWritten >= g.maxBufferedEntries {
flushClient(g.clientCtx)
continue
}
timer.EnsureSet(g.bufferTimeout)
// Otherwise, set the timeout
flushTimer.EnsureSet(g.bufferTimeout)
}
}

// Final flush to send any remaining logs buffered in the client
// We need to create a new context, as the previous one may have been cancelled.
ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush)
defer cancel()
err := g.client.Flush(ctx)
if err != nil {
logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
}
flushClient(ctx)
return nil
}

Expand Down Expand Up @@ -288,30 +306,18 @@ func (t *timer) EnsureSet(timeout time.Duration) {
// protocol required by that log target.
// For example, a logClient for Loki would encode the log messages in the
// JSON format expected by Loki, and send them over HTTP(S).
//
// logClient implementations have some freedom about the semantics of these
// methods. For a buffering client (e.g. HTTP):
// - Write could add the log to the client's internal buffer, calling Flush
// when this buffer reaches capacity.
// - Flush would prepare and send a request with the buffered logs.
//
// For a non-buffering client (e.g. TCP), Write could serialise the log
// directly to the open connection, while Flush would be a no-op.
type logClient interface {
// Write adds the given log entry to the client. Depending on the
// implementation of the client, this may send the log to the remote target,
// or simply add the log to an internal buffer, flushing that buffer when
// required.
Write(context.Context, servicelog.Entry) error

// Flush sends buffered logs (if any) to the remote target. For clients which
// don't buffer logs, Flush should be a no-op.
// Add adds the given log entry to the client's buffer.
Add(servicelog.Entry) error

// Flush sends buffered logs (if any) to the remote target.
Flush(context.Context) error
}

func newLogClient(target *plan.LogTarget) (logClient, error) {
switch target.Type {
//case plan.LokiTarget: TODO
case plan.LokiTarget:
return loki.NewClient(target), nil
//case plan.SyslogTarget: TODO
default:
return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name)
Expand Down
Loading

0 comments on commit b4666dd

Please sign in to comment.