Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logfwd): add log labels #312

Merged
merged 68 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
51f4602
fix(loki): on resetBuffer, zero entries to allow GC
barrettj12 Sep 26, 2023
9ef2380
feat(logfwd): add log labels
barrettj12 Sep 27, 2023
0b5a1fa
get env from service cmd rather than plan
barrettj12 Sep 28, 2023
51a437f
update tests
barrettj12 Sep 28, 2023
8e1fa60
add RemoveEnv
barrettj12 Sep 28, 2023
098f86b
override for labels
barrettj12 Oct 3, 2023
f99b20d
add test for label override
barrettj12 Oct 3, 2023
5a13d2e
replace several fields with plan.LogTarget
barrettj12 Oct 3, 2023
5a1872e
add labels test
barrettj12 Oct 3, 2023
3d32da9
add docs about labels
barrettj12 Oct 3, 2023
6ed0474
make pebble_* labels reserved
barrettj12 Oct 3, 2023
67ff654
Replace logClient.AddEnv with .SetLabels
barrettj12 Oct 3, 2023
56f5cdc
fix tests
barrettj12 Oct 3, 2023
e5a2340
add test for env parsing / label substitution
barrettj12 Oct 3, 2023
a43d565
debug log if var is not defined
barrettj12 Oct 3, 2023
f863399
simplify loki.Client.SetLabels
barrettj12 Oct 3, 2023
838c632
evaluate labels in SetLabels
barrettj12 Oct 4, 2023
2da06e0
use strings.Cut instead of SplitN
barrettj12 Oct 4, 2023
feb81ad
create envMap at top of loop
barrettj12 Oct 4, 2023
6c1b3c5
more descriptive errmsg for reserved labels
barrettj12 Oct 4, 2023
4a8bcf8
add test to catch race conditions
barrettj12 Oct 4, 2023
67f9bc2
add client locking to avoid race conditions
barrettj12 Oct 5, 2023
4b77d5a
flush client only from main loop
barrettj12 Oct 5, 2023
d746987
test label change
barrettj12 Oct 5, 2023
3efc33c
set labels inside gatherer main loop
barrettj12 Oct 5, 2023
f838434
fix tests
barrettj12 Oct 5, 2023
4d05daf
TestLabels: synchronise on channel to avoid data race
barrettj12 Oct 5, 2023
8ff9cce
fix linting
barrettj12 Oct 5, 2023
b440d71
fix fakeLogManager signature
barrettj12 Oct 5, 2023
3421732
remove client mutex
barrettj12 Oct 5, 2023
399cddd
fix imports in servstate/manager_test
barrettj12 Oct 5, 2023
f79f87e
clarify env var substitution
barrettj12 Oct 5, 2023
dfd7f68
move log forwarding one level up in README
barrettj12 Oct 6, 2023
93b99c4
check if tomb.Dying in gatherer.PlanChanged
barrettj12 Oct 6, 2023
9ccd99a
loki.Client.SetLabels: make a copy of labels map
barrettj12 Oct 6, 2023
c00f9a8
fix serviceData.Equal -> .unchanged method
barrettj12 Oct 6, 2023
b7fb690
add test for ServiceData.unchanged method
barrettj12 Oct 6, 2023
6a5d3a4
add test for servstate.parseEnv
barrettj12 Oct 6, 2023
13cc07c
change labelsSet -> notifySetLabels
barrettj12 Oct 6, 2023
8360ffe
address Harry's review comments
barrettj12 Oct 6, 2023
26145cf
remove unnecessary setClientLabels method
barrettj12 Oct 16, 2023
5c34141
address some of Gustavo's review comments
barrettj12 Oct 19, 2023
ca0afa1
flush client before setting labels
barrettj12 Oct 20, 2023
c97fab6
store Loki labels as json.RawMessage
barrettj12 Oct 20, 2023
b32d1d9
rearrange tomb.Dying select stmts
barrettj12 Oct 20, 2023
db25bc8
drop debug log: undef'd var
barrettj12 Oct 20, 2023
b78837b
move label evaluation into the gatherer
barrettj12 Oct 20, 2023
4a18e9d
replace gatherer.ServiceStarted with EnvChanged and BufferChanged
barrettj12 Oct 20, 2023
d2c07ba
set labels before starting puller
barrettj12 Oct 20, 2023
b908d26
remove TestServiceDataUnchanged
barrettj12 Oct 20, 2023
2f7d0b0
update gatherer tests
barrettj12 Oct 20, 2023
3f48ffd
fix panic in ServiceStarted
barrettj12 Oct 20, 2023
be982ee
add mutex to slowFlushingClient to avoid race conditions
barrettj12 Oct 20, 2023
f16ffea
Revert "flush client before setting labels"
barrettj12 Oct 20, 2023
61e5e98
flush before setting labels
barrettj12 Oct 23, 2023
3b76568
improve comments around setLabels
barrettj12 Oct 23, 2023
76054c5
use plan environment to interpret labels
barrettj12 Oct 26, 2023
161b8d5
update gatherer's target on PlanChanged
barrettj12 Nov 7, 2023
dc53f85
improve the README
barrettj12 Nov 7, 2023
523d789
address some of Ben's review comments
barrettj12 Nov 8, 2023
ea6564f
use different label values in plan test
barrettj12 Nov 8, 2023
3f5049f
test improvements following Ben's comments
barrettj12 Nov 8, 2023
28c6c0e
don't need to set labels in gatherer.ServiceStarted
barrettj12 Nov 8, 2023
b7f81cb
rewrite TestRace to do concurrent ops in blocks
barrettj12 Nov 8, 2023
4c5e676
remove target field on log gatherer
barrettj12 Nov 9, 2023
b928bb7
gatherer.target.Name -> .targetName
barrettj12 Nov 9, 2023
c859159
Merge branch 'master' into labels
benhoyt Nov 9, 2023
611da60
fix flaky loki test
barrettj12 Nov 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 45 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ $ pebble run --verbose
...
```

#### Log forwarding
### Log forwarding

Pebble supports forwarding its services' logs to a remote Loki server. In the `log-targets` section of the plan, you can specify destinations for log forwarding, for example:
```yaml
Expand All @@ -422,6 +422,8 @@ log-targets:
services: [svc1, svc2]
```

#### Specifying services

For each log target, use the `services` key to specify a list of services to collect logs from. In the above example, the `production-logs` target will collect logs from `svc1` and `svc2`.

Use the special keyword `all` to match all services, including services that might be added in future layers. In the above example, `staging-logs` will collect logs from all services.
Expand Down Expand Up @@ -453,6 +455,38 @@ my-target:
```
would remove all services and then add `svc1`, so `my-target` would receive logs from only `svc1`.

#### Labels

In the `labels` section, you can specify custom labels to be added to any outgoing logs. These labels may contain `$ENVIRONMENT_VARIABLES` - these will be interpreted in the environment of the corresponding service. Pebble may also add its own default labels (depending on the protocol). For example, given the following plan:
```yaml
services:
svc1:
environment:
OWNER: 'alice'
svc2:
environment:
OWNER: 'bob'

log-targets:
tgt1:
type: loki
labels:
product: 'juju'
owner: 'user-$OWNER'
```
the logs from `svc1` will be sent with the following labels:
```yaml
product: juju
owner: user-alice # env var $OWNER substituted
pebble_service: svc1 # default label for Loki
```
and for svc2, the labels will be
```yaml
product: juju
owner: user-bob # env var $OWNER substituted
pebble_service: svc2 # default label for Loki
```


## Container usage

Expand Down Expand Up @@ -733,8 +767,10 @@ log-targets:
override: merge | replace

# (Required) The type of log target, which determines the format in
# which logs will be sent. Currently, the only supported type is 'loki',
# but more protocols may be added in the future.
# which logs will be sent. The supported types are:
#
# - loki: Use the Grafana Loki protocol. A "pebble_service" label is
# added automatically, with the name of the Pebble service as its value.
type: loki

# (Required) The URL of the remote log target.
Expand All @@ -749,6 +785,12 @@ log-targets:
# service name with a minus (e.g. '-svc1') to remove a previously added
# service. '-all' will remove all services.
services: [<service names>]

# (Optional) A list of key/value pairs defining labels which should be set
# on the outgoing logs. The label values may contain $ENV_VARS, which will
# be substituted using the environment for the corresponding service.
labels:
<label name>: <label value>
```

## API and clients
Expand Down
82 changes: 66 additions & 16 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package logstate
import (
"context"
"fmt"
"os"
"time"

"gopkg.in/tomb.v2"
Expand Down Expand Up @@ -70,6 +71,9 @@ type logGatherer struct {
// ensure the client is not blocking subsequent teardown steps.
clientCancel context.CancelFunc

// Channel used to notify the main loop to set the client's labels
setLabels chan svcWithLabels
benhoyt marked this conversation as resolved.
Show resolved Hide resolved

pullers *pullerGroup
// All pullers send logs on this channel, received by main loop
entryCh chan servicelog.Entry
Expand All @@ -78,9 +82,10 @@ type logGatherer struct {
// logGathererOptions allows overriding the newLogClient method and time values
// in testing.
type logGathererOptions struct {
bufferTimeout time.Duration
maxBufferedEntries int
timeoutFinalFlush time.Duration
bufferTimeout time.Duration
maxBufferedEntries int
timeoutCurrentFlush time.Duration
timeoutFinalFlush time.Duration
// method to get a new client
newClient func(*plan.LogTarget) (logClient, error)
}
Expand All @@ -104,6 +109,7 @@ func newLogGathererInternal(target *plan.LogTarget, options *logGathererOptions)

targetName: target.Name,
client: client,
setLabels: make(chan svcWithLabels),
entryCh: make(chan servicelog.Entry),
pullers: newPullerGroup(target.Name),
}
Expand All @@ -121,6 +127,9 @@ func fillDefaultOptions(options *logGathererOptions) *logGathererOptions {
if options.maxBufferedEntries == 0 {
options.maxBufferedEntries = maxBufferedEntries
}
if options.timeoutCurrentFlush == 0 {
options.timeoutCurrentFlush = timeoutCurrentFlush
}
if options.timeoutFinalFlush == 0 {
options.timeoutFinalFlush = timeoutFinalFlush
}
Expand All @@ -133,35 +142,46 @@ func fillDefaultOptions(options *logGathererOptions) *logGathererOptions {
// PlanChanged is called by the LogManager when the plan is changed, if this
// gatherer's target exists in the new plan.
func (g *logGatherer) PlanChanged(pl *plan.Plan, buffers map[string]*servicelog.RingBuffer) {
target := pl.LogTargets[g.targetName]

// Remove old pullers
for _, svcName := range g.pullers.Services() {
svc, svcExists := pl.Services[svcName]
if !svcExists {
g.pullers.Remove(svcName)
if svcExists && svc.LogsTo(target) {
// We're still collecting logs from this service, so don't remove it.
continue
}

tgt := pl.LogTargets[g.targetName]
if !svc.LogsTo(tgt) {
g.pullers.Remove(svcName)
// Service no longer forwarding to this log target (or it was removed from
// the plan). Remove it from the gatherer.
g.pullers.Remove(svcName)
select {
case g.setLabels <- svcWithLabels{svcName, nil}:
case <-g.tomb.Dying():
return
}
}

// Add new pullers
for _, service := range pl.Services {
target := pl.LogTargets[g.targetName]
if !service.LogsTo(target) {
continue
}

buffer, bufferExists := buffers[service.Name]
if !bufferExists {
// We don't yet have a reference to the service's ring buffer
// Need to wait until ServiceStarted
continue
labels := evaluateLabels(target.Labels, service.Environment)
select {
case g.setLabels <- svcWithLabels{service.Name, labels}:
case <-g.tomb.Dying():
return
}

g.pullers.Add(service.Name, buffer, g.entryCh)
// If the service was just added, it may not be started yet. In this case,
// we need to wait until the buffer is created, and then we can update the
// pullers inside ServiceStarted.
buffer, svcStarted := buffers[service.Name]
if svcStarted {
g.pullers.Add(service.Name, buffer, g.entryCh)
}
}
}

Expand All @@ -171,6 +191,21 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R
g.pullers.Add(service.Name, buffer, g.entryCh)
}

// evaluateLabels interprets the labels defined in the plan, substituting any
// $env_vars with the corresponding value in the service's environment.
func evaluateLabels(rawLabels, env map[string]string) map[string]string {
substitute := func(k string) string {
// Undefined variables default to "", just like Bash
return env[k]
}

labels := make(map[string]string, len(rawLabels))
for key, rawLabel := range rawLabels {
labels[key] = os.Expand(rawLabel, substitute)
}
return labels
}

// The main control loop for the logGatherer. loop receives logs from the
// pullers on entryCh, and writes them to the client. It also flushes the
// client periodically, and exits when the gatherer's tomb is killed.
Expand Down Expand Up @@ -199,6 +234,12 @@ mainLoop:
case <-flushTimer.Expired():
flushClient(g.clientCtx)

case args := <-g.setLabels:
// Before we change the labels, flush any logs currently in the buffer,
// so that these logs are sent with the correct (old) labels.
flushClient(g.clientCtx)
g.client.SetLabels(args.service, args.labels)

barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
case entry := <-g.entryCh:
err := g.client.Add(entry)
if err != nil {
Expand Down Expand Up @@ -236,7 +277,7 @@ mainLoop:
// - Flush out any final logs buffered in the client.
func (g *logGatherer) Stop() {
// Wait up to timeoutCurrentFlush for the current flush to complete (if any)
time.AfterFunc(timeoutCurrentFlush, g.clientCancel)
time.AfterFunc(g.timeoutCurrentFlush, g.clientCancel)

// Wait up to timeoutPullers for the pullers to pull the final logs from the
// iterator and send to the main loop.
Expand Down Expand Up @@ -264,6 +305,11 @@ func (g *logGatherer) Stop() {
}
}

type svcWithLabels struct {
service string
labels map[string]string
}

// timer wraps time.Timer and provides a better API.
type timer struct {
timer *time.Timer
Expand Down Expand Up @@ -312,6 +358,10 @@ type logClient interface {

// Flush sends buffered logs (if any) to the remote target.
Flush(context.Context) error

// SetLabels sets the log labels for the given service, or releases
// previously allocated label resources if the labels parameter is nil.
SetLabels(serviceName string, labels map[string]string)
}

func newLogClient(target *plan.LogTarget) (logClient, error) {
Expand Down
Loading
Loading