Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logfwd): support forwarding logs to Loki #267

Merged
merged 39 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
f56d4ad
Implement Loki client
barrettj12 Jul 10, 2023
8ff49d8
update README
barrettj12 Aug 28, 2023
72942ad
don't need to allocate for errors.As
barrettj12 Aug 28, 2023
e91b3d9
remove clienterr package
barrettj12 Aug 29, 2023
4d8e66f
don't drop logs on 429 / 5xx
barrettj12 Aug 29, 2023
a702d37
remove retryAfter from loki.Client
barrettj12 Aug 29, 2023
a8b0c3e
revert changes to .golangci.yml
barrettj12 Aug 29, 2023
2c427e4
rename gatherer timer -> flushTimer
barrettj12 Aug 29, 2023
2d62565
rename SetRequestTimeout -> FakeRequestTimeout
barrettj12 Aug 29, 2023
8a77df6
[README] better names for example targets
barrettj12 Aug 29, 2023
0af606a
empty buffer inside handleServerResponse
barrettj12 Aug 30, 2023
7e00ccd
fix errFromResponse
barrettj12 Aug 30, 2023
5d21565
elaborate on retry comments
barrettj12 Aug 30, 2023
2b78abe
address Gustavo's review comments
barrettj12 Sep 19, 2023
0142e7e
rejig loki error handling
barrettj12 Sep 19, 2023
79187af
if-else for debug logging error
barrettj12 Sep 19, 2023
1c694b3
success is only 200 or 204
barrettj12 Sep 20, 2023
0ea5f34
pull log-counting / flushing logic into gatherer
barrettj12 Sep 20, 2023
f7ef7ff
store entries in slice
barrettj12 Sep 20, 2023
f9fea04
fix up the tests
barrettj12 Sep 20, 2023
892c037
rename AddLog to Add
barrettj12 Sep 20, 2023
bce2a86
continue stmt after flush
barrettj12 Sep 20, 2023
2dbbfb6
count # of written logs in gatherer
barrettj12 Sep 20, 2023
9b72978
reallocate buffer periodically to avoid memory leaks
barrettj12 Sep 20, 2023
fabf8a5
add test for retry/truncate logic
barrettj12 Sep 20, 2023
1c0ed80
fix gathererSuite.TestRetryLoki
barrettj12 Sep 21, 2023
8ea3930
fix imports
barrettj12 Sep 21, 2023
6a733c0
increase timeouts to improve test reliability
barrettj12 Sep 21, 2023
3c6e172
remove unnecessary comment
barrettj12 Sep 21, 2023
4dab1dd
bump test timeouts to 1 sec
barrettj12 Sep 21, 2023
084544b
use json.Compact in tests
barrettj12 Sep 21, 2023
a5ab88c
loki test: handler doesn't need to be pointer
barrettj12 Sep 21, 2023
c4e153c
Address Gustavo's review comments
barrettj12 Sep 22, 2023
164f9ad
Rename logGathererArgs -> *logGathererOptions
barrettj12 Sep 22, 2023
c6b27b0
tweak 4xx comment
barrettj12 Sep 22, 2023
e80cfcd
when truncating, zero the element to allow GC
barrettj12 Sep 22, 2023
159bbe9
reuse same buffer instead of reallocating
barrettj12 Sep 25, 2023
a94d8ff
add test for buffer recycling
barrettj12 Sep 25, 2023
20c76b2
Address Ben's comments on testing
barrettj12 Sep 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 40 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -403,30 +403,26 @@ $ pebble run --verbose
...
```

<!--
TODO: uncomment this section once log forwarding is fully implemented
TODO: add log targets to the Pebble layer spec below

#### Log forwarding

Pebble supports forwarding its services' logs to a remote Loki server or syslog receiver (via UDP/TCP). In the `log-targets` section of the plan, you can specify destinations for log forwarding, for example:
Pebble supports forwarding its services' logs to a remote Loki server. In the `log-targets` section of the plan, you can specify destinations for log forwarding, for example:
```yaml
log-targets:
loki-example:
staging-logs:
override: merge
type: loki
location: http://10.1.77.205:3100/loki/api/v1/push
services: [all]
syslog-example:
production-logs:
override: merge
type: syslog
location: tcp://192.168.10.241:1514
type: loki
location: http://my.loki.server.com/loki/api/v1/push
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
services: [svc1, svc2]
```

For each log target, use the `services` key to specify a list of services to collect logs from. In the above example, the `syslog-example` target will collect logs from `svc1` and `svc2`.
For each log target, use the `services` key to specify a list of services to collect logs from. In the above example, the `production-logs` target will collect logs from `svc1` and `svc2`.

Use the special keyword `all` to match all services, including services that might be added in future layers. In the above example, `loki-example` will collect logs from all services.
Use the special keyword `all` to match all services, including services that might be added in future layers. In the above example, `staging-logs` will collect logs from all services.

To remove a service from a log target when merging, prefix the service name with a minus `-`. For example, if we have a base layer with
```yaml
Expand Down Expand Up @@ -455,7 +451,6 @@ my-target:
```
would remove all services and then add `svc1`, so `my-target` would receive logs from only `svc1`.

-->

## Container usage

Expand Down Expand Up @@ -721,6 +716,37 @@ checks:
# (Optional) Working directory to run command in. By default, the
# command is run in the service manager's current directory.
working-dir: <directory>

# (Optional) A list of remote log receivers, to which service logs can be sent.
log-targets:

<log target name>:

# (Required) Control how this log target definition is combined with
# other pre-existing definitions with the same name in the Pebble plan.
#
# The value 'merge' will ensure that values in this layer specification
# are merged over existing definitions, whereas 'replace' will entirely
# override the existing target spec in the plan with the same name.
override: merge | replace

# (Required) The type of log target, which determines the format in
# which logs will be sent. Currently, the only supported type is 'loki',
# but more protocols may be added in the future.
type: loki

# (Required) The URL of the remote log target.
# For Loki, this needs to be the fully-qualified URL of the push API,
# including the API endpoint, e.g.
# http://<ip-address>:3100/loki/api/v1/push
location: <url>

# (Optional) A list of services whose logs will be sent to this target.
# Use the special keyword 'all' to match all services in the plan.
# When merging log targets, the 'services' lists are appended. Prefix a
# service name with a minus (e.g. '-svc1') to remove a previously added
# service. '-all' will remove all services.
services: [<service names>]
```

## API and clients
Expand Down Expand Up @@ -753,7 +779,8 @@ Here are some of the things coming soon:
- [x] Automatically restart services that fail
- [x] Support for custom health checks (HTTP, TCP, command)
- [x] Terminate all services before exiting run command
- [ ] Log forwarding (syslog and Loki)
- [x] Log forwarding to Loki
- [ ] Log forwarding to syslog
- [ ] [Other in-progress PRs](https://github.com/canonical/pebble/pulls)
- [ ] [Other requested features](https://github.com/canonical/pebble/issues)

Expand Down
80 changes: 43 additions & 37 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
"gopkg.in/tomb.v2"

"github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/overlord/logstate/loki"
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)

const (
parserSize = 4 * 1024
bufferTimeout = 1 * time.Second
parserSize = 4 * 1024
bufferTimeout = 1 * time.Second
maxBufferedEntries = 100

// These constants control the maximum time allowed for each teardown step.
timeoutCurrentFlush = 1 * time.Second
Expand Down Expand Up @@ -76,8 +78,9 @@ type logGatherer struct {
// logGathererArgs allows overriding the newLogClient method and time values
// in testing.
type logGathererArgs struct {
bufferTimeout time.Duration
timeoutFinalFlush time.Duration
bufferTimeout time.Duration
maxBufferedEntries int
timeoutFinalFlush time.Duration
// method to get a new client
newClient func(*plan.LogTarget) (logClient, error)
}
Expand Down Expand Up @@ -115,6 +118,9 @@ func fillDefaultArgs(args logGathererArgs) logGathererArgs {
if args.bufferTimeout == 0 {
args.bufferTimeout = bufferTimeout
}
if args.maxBufferedEntries == 0 {
args.maxBufferedEntries = maxBufferedEntries
}
if args.timeoutFinalFlush == 0 {
args.timeoutFinalFlush = timeoutFinalFlush
}
Expand Down Expand Up @@ -169,40 +175,52 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R
// pullers on entryCh, and writes them to the client. It also flushes the
// client periodically, and exits when the gatherer's tomb is killed.
func (g *logGatherer) loop() error {
timer := newTimer()
defer timer.Stop()
flushTimer := newTimer()
defer flushTimer.Stop()
// Keep track of number of logs written since last flush
numWritten := 0

flushClient := func(ctx context.Context) {
// Mark timer as unset
flushTimer.Stop()
err := g.client.Flush(ctx)
if err != nil {
logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
}
numWritten = 0
}

mainLoop:
for {
select {
case <-g.tomb.Dying():
break mainLoop

case <-timer.Expired():
// Mark timer as unset
timer.Stop()
err := g.client.Flush(g.clientCtx)
if err != nil {
logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
}
case <-flushTimer.Expired():
flushClient(g.clientCtx)

case entry := <-g.entryCh:
err := g.client.Write(g.clientCtx, entry)
err := g.client.Add(entry)
if err != nil {
logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err)
continue
}
numWritten++
// Check if buffer is full
if numWritten >= g.maxBufferedEntries {
flushClient(g.clientCtx)
continue
}
timer.EnsureSet(g.bufferTimeout)
// Otherwise, set the timeout
flushTimer.EnsureSet(g.bufferTimeout)
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Final flush to send any remaining logs buffered in the client
// We need to create a new context, as the previous one may have been cancelled.
ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush)
defer cancel()
err := g.client.Flush(ctx)
if err != nil {
logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
}
flushClient(ctx)
return nil
}

Expand Down Expand Up @@ -288,30 +306,18 @@ func (t *timer) EnsureSet(timeout time.Duration) {
// protocol required by that log target.
// For example, a logClient for Loki would encode the log messages in the
// JSON format expected by Loki, and send them over HTTP(S).
//
// logClient implementations have some freedom about the semantics of these
// methods. For a buffering client (e.g. HTTP):
// - Write could add the log to the client's internal buffer, calling Flush
// when this buffer reaches capacity.
// - Flush would prepare and send a request with the buffered logs.
//
// For a non-buffering client (e.g. TCP), Write could serialise the log
// directly to the open connection, while Flush would be a no-op.
type logClient interface {
// Write adds the given log entry to the client. Depending on the
// implementation of the client, this may send the log to the remote target,
// or simply add the log to an internal buffer, flushing that buffer when
// required.
Write(context.Context, servicelog.Entry) error

// Flush sends buffered logs (if any) to the remote target. For clients which
// don't buffer logs, Flush should be a no-op.
// Add adds the given log entry to the client's buffer.
Add(servicelog.Entry) error

// Flush sends buffered logs (if any) to the remote target.
Flush(context.Context) error
}

func newLogClient(target *plan.LogTarget) (logClient, error) {
switch target.Type {
//case plan.LokiTarget: TODO
case plan.LokiTarget:
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
return loki.NewClient(target), nil
//case plan.SyslogTarget: TODO
default:
return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name)
Expand Down
90 changes: 84 additions & 6 deletions internals/overlord/logstate/gatherer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"time"

. "gopkg.in/check.v1"

"github.com/canonical/pebble/internals/overlord/logstate/loki"
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)
Expand All @@ -33,6 +36,7 @@ var _ = Suite(&gathererSuite{})
func (s *gathererSuite) TestGatherer(c *C) {
received := make(chan []servicelog.Entry, 1)
gathererArgs := logGathererArgs{
maxBufferedEntries: 5,
newClient: func(target *plan.LogTarget) (logClient, error) {
return &testClient{
bufferSize: 5,
Expand All @@ -59,7 +63,7 @@ func (s *gathererSuite) TestGatherer(c *C) {

testSvc.writeLog("log line #5")
select {
case <-time.After(5 * time.Millisecond):
case <-time.After(1 * time.Second):
c.Fatalf("timeout waiting for logs")
case logs := <-received:
checkLogs(c, logs, []string{"log line #1", "log line #2", "log line #3", "log line #4", "log line #5"})
Expand Down Expand Up @@ -122,7 +126,7 @@ func (s *gathererSuite) TestGathererShutdown(c *C) {
}()

select {
case <-time.After(100 * time.Millisecond):
case <-time.After(1 * time.Second):
c.Fatalf("timeout waiting for gatherer to tear down")
case <-hasShutdown:
}
Expand All @@ -137,6 +141,83 @@ logs in client buffer: %v`, len(g.client.(*testClient).buffered))
}
}

func (s *gathererSuite) TestRetryLoki(c *C) {
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
var handler func(http.ResponseWriter, *http.Request)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handler(w, r)
}))
defer server.Close()

g, err := newLogGathererInternal(
&plan.LogTarget{
Name: "tgt1",
Location: server.URL,
},
logGathererArgs{
bufferTimeout: 1 * time.Millisecond,
maxBufferedEntries: 5,
newClient: func(target *plan.LogTarget) (logClient, error) {
return loki.NewClientWithArgs(target, loki.ClientArgs{
MaxRequestEntries: 5,
}), nil
},
},
)
c.Assert(err, IsNil)

testSvc := newTestService("svc1")
g.ServiceStarted(testSvc.config, testSvc.ringBuffer)

reqReceived := make(chan struct{})
// First attempt: server should return a retryable error
handler = func(w http.ResponseWriter, _ *http.Request) {
close(reqReceived)
w.WriteHeader(http.StatusTooManyRequests)
}

testSvc.writeLog("log line #1")
testSvc.writeLog("log line #2")
testSvc.writeLog("log line #3")
testSvc.writeLog("log line #4")
testSvc.writeLog("log line #5")

// Check that request was received
select {
case <-reqReceived:
case <-time.After(1 * time.Second):
c.Fatalf("timed out waiting for request")
}

reqReceived = make(chan struct{})
// Second attempt: check that logs were held over from last time
handler = func(w http.ResponseWriter, r *http.Request) {
close(reqReceived)
reqBody, err := io.ReadAll(r.Body)
c.Assert(err, IsNil)

expected := `{"streams":\[{"stream":{"pebble_service":"svc1"},"values":\[` +
// First two log lines should have been truncated
`\["\d+","log line #3"\],` +
`\["\d+","log line #4"\],` +
`\["\d+","log line #5"\],` +
`\["\d+","log line #6"\],` +
`\["\d+","log line #7"\]` +
`\]}\]}`
c.Assert(string(reqBody), Matches, expected)
}

testSvc.writeLog("log line #6")
testSvc.writeLog("log line #7")
// Wait for flush timeout to elapse

// Check that request was received
select {
case <-reqReceived:
case <-time.After(1 * time.Second):
c.Fatalf("timed out waiting for request")
}
}

func checkLogs(c *C, received []servicelog.Entry, expected []string) {
c.Assert(received, HasLen, len(expected))
for i, entry := range received {
Expand All @@ -151,11 +232,8 @@ type testClient struct {
sendCh chan []servicelog.Entry
}

func (c *testClient) Write(ctx context.Context, entry servicelog.Entry) error {
func (c *testClient) Add(entry servicelog.Entry) error {
c.buffered = append(c.buffered, entry)
if len(c.buffered) >= c.bufferSize {
return c.Flush(ctx)
}
return nil
}

Expand Down
Loading
Loading