diff --git a/internals/overlord/logstate/fake.go b/internals/overlord/logstate/fake.go
index 85bbf6787..e8836bede 100644
--- a/internals/overlord/logstate/fake.go
+++ b/internals/overlord/logstate/fake.go
@@ -64,6 +64,6 @@ func (c *slowClient) Flush(ctx context.Context) error {
case <-time.After(c.flushTime):
return nil
case <-ctx.Done():
- return errTimeout
+ return fmt.Errorf("timeout flushing logs")
}
}
diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go
index a46b3b2ad..e6dae4c36 100644
--- a/internals/overlord/logstate/gatherer.go
+++ b/internals/overlord/logstate/gatherer.go
@@ -20,10 +20,11 @@ import (
"sync"
"time"
+ "gopkg.in/tomb.v2"
+
"github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
- "gopkg.in/tomb.v2"
)
const (
@@ -40,39 +41,46 @@ const (
// logGatherer is responsible for collecting service logs from a bunch of
// services, and sending them to its logClient.
// One logGatherer will run per log target. Its loop() method should be run
-// in its own goroutine. It can be stopped in a separate goroutine by calling
-// the stop() method.
+// in its own goroutine.
+// A logGatherer will spawn a separate logPuller for each service it collects
+// logs from. Each logPuller will run in a separate goroutine, and send logs to
+// the logGatherer via a shared channel.
// The logGatherer will "flush" the client:
// - on a regular cadence (e.g. every 1 second)
// - when it is told to shut down.
//
-// Its client may also flush itself when its internal buffer reaches a certain
+// The client may also flush itself when its internal buffer reaches a certain
// size.
+// Calling the stop() method will tear down the logGatherer and all of its
+// associated logPullers. stop() can be called from an outside goroutine.
type logGatherer struct {
logGathererArgs
targetName string
- client logClient
-
- entryCh chan servicelog.Entry
- pullers map[string]*logPuller
- pullersLock sync.Mutex
-
- // Context to pass to client
- // When we are asked to shutdown, we can cancel this context to ensure the
- // client is not blocking us.
- clientCtx context.Context
- clientCancel context.CancelFunc
-
// tomb for the main loop
tomb tomb.Tomb
- // this context and WaitGroup are used to manage pullers
- // Each puller will use a child context of this one
- // We wait on pullersGrp during teardown so we know when all pullers are done
- pullersCtx context.Context
+ client logClient
+ // Context to pass to client methods
+ clientCtx context.Context
+ // cancel func for clientCtx - can be used during teardown if required, to
+ // ensure the client is not blocking subsequent teardown steps.
+ clientCancel context.CancelFunc
+
+ // Currently active logPullers, indexed by service name
+ pullers map[string]*logPuller
+ // Mutex for pullers
+ pullersLock sync.Mutex
+ // All pullers send logs on this channel, received by main loop
+ entryCh chan servicelog.Entry
+ // Common context for all pullers. Each puller uses a derived context so we
+ // can easily kill all pullers (if required) during teardown.
+ pullersCtx context.Context
+ // Cancel func for pullersCtx
killPullers context.CancelFunc
- pullersGrp *sync.WaitGroup
+ // WaitGroup for pullers - we use this during teardown to know when all the
+ // pullers are finished.
+ pullersGrp *sync.WaitGroup
}
// logGathererArgs allows overriding the newLogClient method and time values
@@ -94,6 +102,9 @@ func newLogGatherer(target *plan.LogTarget) (*logGatherer, error) {
return newLogGathererInternal(target, logGathererArgs{})
}
+// newLogGathererInternal contains the actual creation code for a logGatherer.
+// This function is used in the real implementation, but also allows overriding
+// certain configuration values for testing.
func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logGatherer, error) {
args = fillDefaultArgs(args)
client, err := args.newClient(target)
@@ -138,6 +149,7 @@ func fillDefaultArgs(args logGathererArgs) logGathererArgs {
return args
}
+// planChanged is called by the LogManager when the plan is changed.
func (g *logGatherer) planChanged(pl *plan.Plan, buffers map[string]*servicelog.RingBuffer) {
g.pullersLock.Lock()
defer g.pullersLock.Unlock()
@@ -175,6 +187,8 @@ func (g *logGatherer) planChanged(pl *plan.Plan, buffers map[string]*servicelog.
g.pullers = newPullers
}
+// serviceStarted is called by the LogManager on the start of a service which
+// logs to this gatherer's target.
func (g *logGatherer) serviceStarted(service *plan.Service, buffer *servicelog.RingBuffer) {
g.pullersLock.Lock()
defer g.pullersLock.Unlock()
@@ -186,6 +200,9 @@ func (g *logGatherer) serviceStarted(service *plan.Service, buffer *servicelog.R
g.pullers[service.Name] = g.newLogPuller(buffer)
}
+// The main control loop for the logGatherer. loop receives logs from the
+// pullers on entryCh, and writes them to the client. It also flushes the
+// client periodically, and exits when the gatherer's tomb is killed.
func (g *logGatherer) loop() error {
ticker := time.NewTicker(g.tickPeriod)
defer ticker.Stop()
@@ -312,16 +329,17 @@ func (p *logPuller) loop() {
// JSON format expected by Loki, and send them over HTTP(S).
//
// logClient implementations have some freedom about the semantics of these
-// methods.
-// - For a buffering client (e.g. HTTP), Write could add the log to the
-// client's internal buffer, while Flush would prepare and send a request
-// with the buffered logs.
-// - For a non-buffering client (e.g. TCP), Write could serialise the log
-// directly to the open connection, while Flush would be a no-op.
+// methods. For a buffering client (e.g. HTTP):
+// - Write could add the log to the client's internal buffer, calling Flush
+// when this buffer reaches capacity.
+// - Flush would prepare and send a request with the buffered logs.
+//
+// For a non-buffering client (e.g. TCP), Write could serialise the log
+// directly to the open connection, while Flush would be a no-op.
type logClient interface {
// Write adds the given log entry to the client. Depending on the
// implementation of the client, this may send the log to the remote target,
- // or simply add the log to an internal buffer, flushing that buffer as
+ // or simply add the log to an internal buffer, flushing that buffer when
// required.
Write(context.Context, servicelog.Entry) error
@@ -330,14 +348,11 @@ type logClient interface {
Flush(context.Context) error
}
-var errTimeout = fmt.Errorf("timeout")
-
func newLogClient(target *plan.LogTarget) (logClient, error) {
switch target.Type {
//case plan.LokiTarget: TODO
//case plan.SyslogTarget: TODO
default:
- return &slowClient{flushTime: 10 * time.Second}, nil
- //return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name)
+ return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name)
}
}
diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go
index 2816afe1f..0638d2763 100644
--- a/internals/overlord/logstate/gatherer_test.go
+++ b/internals/overlord/logstate/gatherer_test.go
@@ -1,3 +1,17 @@
+// Copyright (c) 2023 Canonical Ltd
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3 as
+// published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
package logstate
import (
@@ -6,9 +20,10 @@ import (
"io"
"time"
+ . "gopkg.in/check.v1"
+
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
- . "gopkg.in/check.v1"
)
type gathererSuite struct{}
diff --git a/internals/overlord/logstate/manager.go b/internals/overlord/logstate/manager.go
index a48e6d30c..82a8a17bc 100644
--- a/internals/overlord/logstate/manager.go
+++ b/internals/overlord/logstate/manager.go
@@ -39,7 +39,8 @@ func NewLogManager() *LogManager {
}
}
-// PlanChanged is called by the service manager when the plan changes. We update the list of gatherers for each forwarder based on the new plan.
+// PlanChanged is called by the service manager when the plan changes.
+// Based on the new plan, we will stop old gatherers and start new ones.
func (m *LogManager) PlanChanged(pl *plan.Plan) {
m.mu.Lock()
defer m.mu.Unlock()
diff --git a/internals/overlord/logstate/manager_test.go b/internals/overlord/logstate/manager_test.go
index cf2b85164..8832c6f70 100644
--- a/internals/overlord/logstate/manager_test.go
+++ b/internals/overlord/logstate/manager_test.go
@@ -1,3 +1,17 @@
+// Copyright (c) 2023 Canonical Ltd
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3 as
+// published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
package logstate
import (
@@ -94,7 +108,6 @@ func checkBuffers(c *C, buffers map[string]*servicelog.RingBuffer, expected []st
}
}
-// TODO: why is this test intermittently failing?
func (s *managerSuite) TestTimelyShutdown(c *C) {
gathererArgs := logGathererArgs{
timeoutCurrentFlush: 1 * time.Millisecond,