Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logfwd): log forwarding implementation II #256

Merged
merged 31 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ff53051
Log forwarding implementation
barrettj12 Jul 3, 2023
760e7a4
remove extra fakeLogManager decl
barrettj12 Aug 2, 2023
6e03c0c
surface client errors
barrettj12 Aug 2, 2023
5ca3752
fix imports in gatherer.go
barrettj12 Aug 2, 2023
e91f756
fix a small typo in comment
barrettj12 Aug 3, 2023
2f1d967
Move plan logic out of pullerGroup
barrettj12 Aug 8, 2023
ab40a6c
Add extra ctx.Done check in logPuller.loop
barrettj12 Aug 8, 2023
7e4d212
Uppercase logGatherer methods (PlanChanged, ServiceStarted, Stop) whi…
barrettj12 Aug 8, 2023
b03882b
fix error/log messages
barrettj12 Aug 8, 2023
783c3ca
Revert changes to state engine
barrettj12 Aug 8, 2023
baabeb2
Stop log manager after service manager
barrettj12 Aug 8, 2023
04a3354
kill gatherer tomb with nil
barrettj12 Aug 8, 2023
b35a8b0
make some channels read/write only
barrettj12 Aug 8, 2023
b31e4be
pullerGroup.Add: log warning if puller already exists
barrettj12 Aug 8, 2023
01bd92a
rename logPuller.kill to cancel
barrettj12 Aug 8, 2023
ab46d31
flesh out logGatherer.PlanChanged doc comment
barrettj12 Aug 8, 2023
6bae051
use timer instead of ticker
barrettj12 Aug 8, 2023
be25405
rewrite pullerGroup using tombs
barrettj12 Aug 11, 2023
04de335
close service ring buffers on Pebble shutdown
barrettj12 Aug 11, 2023
b39c6d0
fix imports
barrettj12 Aug 11, 2023
5899532
small comment fix in overlord
barrettj12 Aug 11, 2023
2b4f8eb
Improve handling of log gatherer tear down.
hpidcock Aug 14, 2023
97e7d94
add persistent goroutine to pullerGroup tomb
barrettj12 Aug 14, 2023
9465a3a
rename tickPeriod to bufferTimeout
barrettj12 Aug 14, 2023
5e77fb3
address Ben's review comments
barrettj12 Aug 15, 2023
28d2d51
lowercase pullerGroup contains and len methods
barrettj12 Aug 15, 2023
0b0d873
remove fake client implementations
barrettj12 Aug 15, 2023
97c4681
add comment on pullerGroup tomb
barrettj12 Aug 15, 2023
8863487
obtain lock before reading ServiceManager.services
barrettj12 Aug 15, 2023
c0584ce
Add some more doc comments to pullerGroup methods
barrettj12 Aug 17, 2023
3c92f43
Remove internal error on tomb.Wait log
benhoyt Aug 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions internals/overlord/logstate/fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package logstate

import (
"context"
"fmt"
"time"

"github.com/canonical/pebble/internals/servicelog"
)

// Fake sample implementations of logClient
// TODO: remove this file before merging
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved

type nonBufferingClient struct{}

var _ logClient = &nonBufferingClient{}

func (c *nonBufferingClient) Write(_ context.Context, entry servicelog.Entry) error {
fmt.Printf("%v [%s] %s", entry.Time, entry.Service, entry.Message)
return nil
}

func (c *nonBufferingClient) Flush(_ context.Context) error {
// no-op
return nil
}

type bufferingClient struct {
entries []servicelog.Entry
threshold int
}

var _ logClient = &bufferingClient{}

func (c *bufferingClient) Write(ctx context.Context, entry servicelog.Entry) error {
c.entries = append(c.entries, entry)
if c.threshold > 0 && len(c.entries) >= c.threshold {
return c.Flush(ctx)
}
return nil
}

func (c *bufferingClient) Flush(_ context.Context) error {
for _, entry := range c.entries {
fmt.Printf("%v [%s] %s", entry.Time, entry.Service, entry.Message)
}
fmt.Println()
c.entries = c.entries[:0]
return nil
}

// a slow client where Flush takes a long time
type slowClient struct {
flushTime time.Duration
}

var _ logClient = &slowClient{}

func (c *slowClient) Write(_ context.Context, _ servicelog.Entry) error {
return nil
}

func (c *slowClient) Flush(ctx context.Context) error {
select {
case <-time.After(c.flushTime):
return nil
case <-ctx.Done():
return fmt.Errorf("timeout flushing logs")
}
}
316 changes: 316 additions & 0 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package logstate

import (
"context"
"fmt"
"time"

"gopkg.in/tomb.v2"

"github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)

const (
parserSize = 4 * 1024
tickPeriod = 1 * time.Second

// These constants control the maximum time allowed for each teardown step.
timeoutCurrentFlush = 1 * time.Second
timeoutPullers = 2 * time.Second
timeoutMainLoop = 3 * time.Second
// timeoutFinalFlush is measured from when the gatherer's main loop finishes,
// NOT from when Stop() is called like the other constants.
timeoutFinalFlush = 2 * time.Second
)

// logGatherer is responsible for collecting service logs from a bunch of
// services, and sending them to its logClient.
// One logGatherer will run per log target. Its loop() method should be run
// in its own goroutine.
// A logGatherer will spawn a separate logPuller for each service it collects
// logs from. Each logPuller will run in a separate goroutine, and send logs to
// the logGatherer via a shared channel.
// The logGatherer will "flush" the client:
// - on a regular cadence (e.g. every 1 second)
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
// - when it is told to shut down.
//
// The client may also flush itself when its internal buffer reaches a certain
// size.
// Calling the Stop() method will tear down the logGatherer and all of its
// associated logPullers. Stop() can be called from an outside goroutine.
type logGatherer struct {
logGathererArgs

targetName string
// tomb for the main loop
tomb tomb.Tomb

client logClient
// Context to pass to client methods
clientCtx context.Context
// cancel func for clientCtx - can be used during teardown if required, to
// ensure the client is not blocking subsequent teardown steps.
clientCancel context.CancelFunc

pullers *pullerGroup
// All pullers send logs on this channel, received by main loop
entryCh chan servicelog.Entry
}

// logGathererArgs allows overriding the newLogClient method and time values
// in testing.
type logGathererArgs struct {
tickPeriod time.Duration
timeoutFinalFlush time.Duration
// method to get a new client
newClient func(*plan.LogTarget) (logClient, error)
}

func newLogGatherer(target *plan.LogTarget) (*logGatherer, error) {
return newLogGathererInternal(target, logGathererArgs{})
}

// newLogGathererInternal contains the actual creation code for a logGatherer.
// This function is used in the real implementation, but also allows overriding
// certain configuration values for testing.
func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logGatherer, error) {
args = fillDefaultArgs(args)
client, err := args.newClient(target)
if err != nil {
return nil, fmt.Errorf("cannot create log client: %w", err)
}

g := &logGatherer{
logGathererArgs: args,

targetName: target.Name,
client: client,
entryCh: make(chan servicelog.Entry),
pullers: newPullerGroup(target.Name),
}
g.clientCtx, g.clientCancel = context.WithCancel(context.Background())
g.tomb.Go(g.loop)
return g, nil
}

func fillDefaultArgs(args logGathererArgs) logGathererArgs {
if args.tickPeriod == 0 {
args.tickPeriod = tickPeriod
}
if args.timeoutFinalFlush == 0 {
args.timeoutFinalFlush = timeoutFinalFlush
}
if args.newClient == nil {
args.newClient = newLogClient
}
return args
}

// PlanChanged is called by the LogManager when the plan is changed, if this
// gatherer's target exists in the new plan.
func (g *logGatherer) PlanChanged(pl *plan.Plan, buffers map[string]*servicelog.RingBuffer) {
// Remove old pullers
for _, svcName := range g.pullers.List() {
svc, svcExists := pl.Services[svcName]
if !svcExists {
g.pullers.Remove(svcName)
continue
}

tgt := pl.LogTargets[g.targetName]
if !svc.LogsTo(tgt) {
g.pullers.Remove(svcName)
}
}

// Add new pullers
for _, service := range pl.Services {
target := pl.LogTargets[g.targetName]
if !service.LogsTo(target) {
continue
}

buffer, bufferExists := buffers[service.Name]
if !bufferExists {
// We don't yet have a reference to the service's ring buffer
// Need to wait until ServiceStarted
continue
}

g.pullers.Add(service.Name, buffer, g.entryCh)
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
}
}

// ServiceStarted is called by the LogManager on the start of a service which
// logs to this gatherer's target.
func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.RingBuffer) {
g.pullers.Add(service.Name, buffer, g.entryCh)
}

// The main control loop for the logGatherer. loop receives logs from the
// pullers on entryCh, and writes them to the client. It also flushes the
// client periodically, and exits when the gatherer's tomb is killed.
func (g *logGatherer) loop() error {
timer := newTimer()
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved

mainLoop:
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case <-g.tomb.Dying():
break mainLoop

case <-timer.Finished():
// Mark timer as unset
benhoyt marked this conversation as resolved.
Show resolved Hide resolved
timer.Stop()
err := g.client.Flush(g.clientCtx)
benhoyt marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
logger.Noticef("Error sending logs to target %q: %v", g.targetName, err)
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
}

case entry := <-g.entryCh:
err := g.client.Write(g.clientCtx, entry)
if err != nil {
logger.Noticef("Error writing logs to target %q: %v", g.targetName, err)
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
}
// Set timer if not already set
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
timer.EnsureSet(g.tickPeriod)
}
}

// Final flush to send any remaining logs buffered in the client
// We need to create a new context, as the previous one may have been cancelled.
ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush)
defer cancel()
err := g.client.Flush(ctx)
if err != nil {
logger.Noticef("Error sending logs to target %q: %v", g.targetName, err)
}
return nil
}
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved

// Stop tears down the gatherer and associated resources (pullers, client).
// This method will block until gatherer teardown is complete.
//
// The teardown process has several steps:
// - If the main loop is in the middle of a flush when we call Stop, this
// will block the pullers from sending logs to the gatherer. Hence, wait
// for the current flush to complete.
// - Wait for the pullers to pull the final logs from the iterator.
// - Kill the main loop.
// - Flush out any final logs buffered in the client.
func (g *logGatherer) Stop() {
// Wait up to timeoutCurrentFlush for the current flush to complete (if any)
time.AfterFunc(timeoutCurrentFlush, g.clientCancel)

// Wait up to timeoutPullers for the pullers to pull the final logs from the
// iterator and send to the main loop.
time.AfterFunc(timeoutPullers, func() {
logger.Debugf("gatherer %q: force killing log pullers", g.targetName)
g.pullers.KillAll()
})

// Kill the main loop once either:
// - all the pullers are done
// - timeoutMainLoop has passed
select {
case <-g.pullers.Done():
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
logger.Debugf("gatherer %q: pullers have finished", g.targetName)
case <-time.After(timeoutMainLoop):
logger.Debugf("gatherer %q: force killing main loop", g.targetName)
}

g.tomb.Kill(nil)
// Wait for final flush in the main loop
err := g.tomb.Wait()
if err != nil {
logger.Noticef("Error shutting down gatherer: %v", err)
}
}

// timer wraps time.Timer and provides a better API.
type timer struct {
timer *time.Timer
set bool
}

func newTimer() timer {
t := timer{
timer: time.NewTimer(1 * time.Hour),
}
t.Stop()
return t
}

func (t *timer) Finished() <-chan time.Time {
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
return t.timer.C
}

func (t *timer) Stop() {
t.timer.Stop()
t.set = false
// Drain timer channel
select {
case <-t.timer.C:
default:
}
}

func (t *timer) EnsureSet(timeout time.Duration) {
benhoyt marked this conversation as resolved.
Show resolved Hide resolved
if t.set {
return
}

t.timer.Reset(timeout)
t.set = true
}

// logClient handles requests to a specific type of log target. It encodes
// log messages in the required format, and sends the messages using the
// protocol required by that log target.
// For example, a logClient for Loki would encode the log messages in the
// JSON format expected by Loki, and send them over HTTP(S).
//
// logClient implementations have some freedom about the semantics of these
// methods. For a buffering client (e.g. HTTP):
// - Write could add the log to the client's internal buffer, calling Flush
// when this buffer reaches capacity.
// - Flush would prepare and send a request with the buffered logs.
//
// For a non-buffering client (e.g. TCP), Write could serialise the log
// directly to the open connection, while Flush would be a no-op.
type logClient interface {
// Write adds the given log entry to the client. Depending on the
// implementation of the client, this may send the log to the remote target,
// or simply add the log to an internal buffer, flushing that buffer when
// required.
Write(context.Context, servicelog.Entry) error

// Flush sends buffered logs (if any) to the remote target. For clients which
// don't buffer logs, Flush should be a no-op.
Flush(context.Context) error
}

func newLogClient(target *plan.LogTarget) (logClient, error) {
switch target.Type {
//case plan.LokiTarget: TODO
//case plan.SyslogTarget: TODO
default:
return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name)
}
}
Loading
Loading