diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go
new file mode 100644
index 00000000..001b9e2f
--- /dev/null
+++ b/internals/overlord/logstate/gatherer.go
@@ -0,0 +1,319 @@
+// Copyright (c) 2023 Canonical Ltd
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3 as
+// published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+package logstate
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "gopkg.in/tomb.v2"
+
+ "github.com/canonical/pebble/internals/logger"
+ "github.com/canonical/pebble/internals/plan"
+ "github.com/canonical/pebble/internals/servicelog"
+)
+
+const (
+ parserSize = 4 * 1024
+ bufferTimeout = 1 * time.Second
+
+ // These constants control the maximum time allowed for each teardown step.
+ timeoutCurrentFlush = 1 * time.Second
+ timeoutPullers = 2 * time.Second
+ timeoutMainLoop = 3 * time.Second
+ // timeoutFinalFlush is measured from when the gatherer's main loop finishes,
+ // NOT from when Stop() is called like the other constants.
+ timeoutFinalFlush = 2 * time.Second
+)
+
+// logGatherer is responsible for collecting service logs from a bunch of
+// services, and sending them to its logClient.
+// One logGatherer will run per log target. Its loop() method should be run
+// in its own goroutine.
+// A logGatherer will spawn a separate logPuller for each service it collects
+// logs from. Each logPuller will run in a separate goroutine, and send logs to
+// the logGatherer via a shared channel.
+// The logGatherer will "flush" the client:
+// - after a timeout (1s) has passed since the first log was written;
+// - when it is told to shut down.
+//
+// The client may also flush itself when its internal buffer reaches a certain
+// size.
+// Calling the Stop() method will tear down the logGatherer and all of its
+// associated logPullers. Stop() can be called from an outside goroutine.
+type logGatherer struct {
+ logGathererArgs
+
+ targetName string
+ // tomb for the main loop
+ tomb tomb.Tomb
+
+ client logClient
+ // Context to pass to client methods
+ clientCtx context.Context
+ // cancel func for clientCtx - can be used during teardown if required, to
+ // ensure the client is not blocking subsequent teardown steps.
+ clientCancel context.CancelFunc
+
+ pullers *pullerGroup
+ // All pullers send logs on this channel, received by main loop
+ entryCh chan servicelog.Entry
+}
+
+// logGathererArgs allows overriding the newLogClient method and time values
+// in testing.
+type logGathererArgs struct {
+ bufferTimeout time.Duration
+ timeoutFinalFlush time.Duration
+ // method to get a new client
+ newClient func(*plan.LogTarget) (logClient, error)
+}
+
+func newLogGatherer(target *plan.LogTarget) (*logGatherer, error) {
+ return newLogGathererInternal(target, logGathererArgs{})
+}
+
+// newLogGathererInternal contains the actual creation code for a logGatherer.
+// This function is used in the real implementation, but also allows overriding
+// certain configuration values for testing.
+func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logGatherer, error) {
+ args = fillDefaultArgs(args)
+ client, err := args.newClient(target)
+ if err != nil {
+ return nil, fmt.Errorf("cannot create log client: %w", err)
+ }
+
+ g := &logGatherer{
+ logGathererArgs: args,
+
+ targetName: target.Name,
+ client: client,
+ entryCh: make(chan servicelog.Entry),
+ pullers: newPullerGroup(target.Name),
+ }
+ g.clientCtx, g.clientCancel = context.WithCancel(context.Background())
+ g.tomb.Go(g.loop)
+ g.tomb.Go(g.pullers.tomb.Wait)
+
+ return g, nil
+}
+
+func fillDefaultArgs(args logGathererArgs) logGathererArgs {
+ if args.bufferTimeout == 0 {
+ args.bufferTimeout = bufferTimeout
+ }
+ if args.timeoutFinalFlush == 0 {
+ args.timeoutFinalFlush = timeoutFinalFlush
+ }
+ if args.newClient == nil {
+ args.newClient = newLogClient
+ }
+ return args
+}
+
+// PlanChanged is called by the LogManager when the plan is changed, if this
+// gatherer's target exists in the new plan.
+func (g *logGatherer) PlanChanged(pl *plan.Plan, buffers map[string]*servicelog.RingBuffer) {
+ // Remove old pullers
+ for _, svcName := range g.pullers.Services() {
+ svc, svcExists := pl.Services[svcName]
+ if !svcExists {
+ g.pullers.Remove(svcName)
+ continue
+ }
+
+ tgt := pl.LogTargets[g.targetName]
+ if !svc.LogsTo(tgt) {
+ g.pullers.Remove(svcName)
+ }
+ }
+
+ // Add new pullers
+ for _, service := range pl.Services {
+ target := pl.LogTargets[g.targetName]
+ if !service.LogsTo(target) {
+ continue
+ }
+
+ buffer, bufferExists := buffers[service.Name]
+ if !bufferExists {
+ // We don't yet have a reference to the service's ring buffer
+ // Need to wait until ServiceStarted
+ continue
+ }
+
+ g.pullers.Add(service.Name, buffer, g.entryCh)
+ }
+}
+
+// ServiceStarted is called by the LogManager on the start of a service which
+// logs to this gatherer's target.
+func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.RingBuffer) {
+ g.pullers.Add(service.Name, buffer, g.entryCh)
+}
+
+// The main control loop for the logGatherer. loop receives logs from the
+// pullers on entryCh, and writes them to the client. It also flushes the
+// client periodically, and exits when the gatherer's tomb is killed.
+func (g *logGatherer) loop() error {
+ timer := newTimer()
+ defer timer.Stop()
+
+mainLoop:
+ for {
+ select {
+ case <-g.tomb.Dying():
+ break mainLoop
+
+ case <-timer.Expired():
+ // Mark timer as unset
+ timer.Stop()
+ err := g.client.Flush(g.clientCtx)
+ if err != nil {
+ logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
+ }
+
+ case entry := <-g.entryCh:
+ err := g.client.Write(g.clientCtx, entry)
+ if err != nil {
+ logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err)
+ }
+ timer.EnsureSet(g.bufferTimeout)
+ }
+ }
+
+ // Final flush to send any remaining logs buffered in the client
+ // We need to create a new context, as the previous one may have been cancelled.
+ ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush)
+ defer cancel()
+ err := g.client.Flush(ctx)
+ if err != nil {
+ logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
+ }
+ return nil
+}
+
+// Stop tears down the gatherer and associated resources (pullers, client).
+// This method will block until gatherer teardown is complete.
+//
+// The teardown process has several steps:
+// - If the main loop is in the middle of a flush when we call Stop, this
+// will block the pullers from sending logs to the gatherer. Hence, wait
+// for the current flush to complete.
+// - Wait for the pullers to pull the final logs from the iterator.
+// - Kill the main loop.
+// - Flush out any final logs buffered in the client.
+func (g *logGatherer) Stop() {
+ // Wait up to timeoutCurrentFlush for the current flush to complete (if any)
+ time.AfterFunc(timeoutCurrentFlush, g.clientCancel)
+
+ // Wait up to timeoutPullers for the pullers to pull the final logs from the
+ // iterator and send to the main loop.
+ time.AfterFunc(timeoutPullers, func() {
+ logger.Debugf("gatherer %q: force killing log pullers", g.targetName)
+ g.pullers.KillAll()
+ })
+
+ // Kill the main loop once either:
+ // - all the pullers are done
+ // - timeoutMainLoop has passed
+ g.pullers.tomb.Kill(nil)
+ select {
+ case <-g.pullers.Done():
+ logger.Debugf("gatherer %q: pullers have finished", g.targetName)
+ case <-time.After(timeoutMainLoop):
+ logger.Debugf("gatherer %q: force killing main loop", g.targetName)
+ }
+
+ g.tomb.Kill(nil)
+ // Wait for final flush in the main loop
+ err := g.tomb.Wait()
+ if err != nil {
+ logger.Noticef("Cannot shut down gatherer: %v", err)
+ }
+}
+
+// timer wraps time.Timer and provides a better API.
+type timer struct {
+ timer *time.Timer
+ set bool
+}
+
+func newTimer() timer {
+ t := timer{
+ timer: time.NewTimer(1 * time.Hour),
+ }
+ t.Stop()
+ return t
+}
+
+func (t *timer) Expired() <-chan time.Time {
+ return t.timer.C
+}
+
+func (t *timer) Stop() {
+ t.timer.Stop()
+ t.set = false
+ // Drain timer channel
+ select {
+ case <-t.timer.C:
+ default:
+ }
+}
+
+func (t *timer) EnsureSet(timeout time.Duration) {
+ if t.set {
+ return
+ }
+
+ t.timer.Reset(timeout)
+ t.set = true
+}
+
+// logClient handles requests to a specific type of log target. It encodes
+// log messages in the required format, and sends the messages using the
+// protocol required by that log target.
+// For example, a logClient for Loki would encode the log messages in the
+// JSON format expected by Loki, and send them over HTTP(S).
+//
+// logClient implementations have some freedom about the semantics of these
+// methods. For a buffering client (e.g. HTTP):
+// - Write could add the log to the client's internal buffer, calling Flush
+// when this buffer reaches capacity.
+// - Flush would prepare and send a request with the buffered logs.
+//
+// For a non-buffering client (e.g. TCP), Write could serialise the log
+// directly to the open connection, while Flush would be a no-op.
+type logClient interface {
+ // Write adds the given log entry to the client. Depending on the
+ // implementation of the client, this may send the log to the remote target,
+ // or simply add the log to an internal buffer, flushing that buffer when
+ // required.
+ Write(context.Context, servicelog.Entry) error
+
+ // Flush sends buffered logs (if any) to the remote target. For clients which
+ // don't buffer logs, Flush should be a no-op.
+ Flush(context.Context) error
+}
+
+func newLogClient(target *plan.LogTarget) (logClient, error) {
+ switch target.Type {
+ //case plan.LokiTarget: TODO
+ //case plan.SyslogTarget: TODO
+ default:
+ return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name)
+ }
+}
diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go
new file mode 100644
index 00000000..b7220cbe
--- /dev/null
+++ b/internals/overlord/logstate/gatherer_test.go
@@ -0,0 +1,203 @@
+// Copyright (c) 2023 Canonical Ltd
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3 as
+// published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+package logstate
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "time"
+
+ . "gopkg.in/check.v1"
+
+ "github.com/canonical/pebble/internals/plan"
+ "github.com/canonical/pebble/internals/servicelog"
+)
+
+type gathererSuite struct{}
+
+var _ = Suite(&gathererSuite{})
+
+func (s *gathererSuite) TestGatherer(c *C) {
+ received := make(chan []servicelog.Entry, 1)
+ gathererArgs := logGathererArgs{
+ newClient: func(target *plan.LogTarget) (logClient, error) {
+ return &testClient{
+ bufferSize: 5,
+ sendCh: received,
+ }, nil
+ },
+ }
+
+ g, err := newLogGathererInternal(&plan.LogTarget{Name: "tgt1"}, gathererArgs)
+ c.Assert(err, IsNil)
+
+ testSvc := newTestService("svc1")
+ g.ServiceStarted(testSvc.config, testSvc.ringBuffer)
+
+ testSvc.writeLog("log line #1")
+ testSvc.writeLog("log line #2")
+ testSvc.writeLog("log line #3")
+ testSvc.writeLog("log line #4")
+ select {
+ case logs := <-received:
+ c.Fatalf("wasn't expecting logs, received %#v", logs)
+ default:
+ }
+
+ testSvc.writeLog("log line #5")
+ select {
+ case <-time.After(5 * time.Millisecond):
+ c.Fatalf("timeout waiting for logs")
+ case logs := <-received:
+ checkLogs(c, logs, []string{"log line #1", "log line #2", "log line #3", "log line #4", "log line #5"})
+ }
+}
+
+func (s *gathererSuite) TestGathererTimeout(c *C) {
+ received := make(chan []servicelog.Entry, 1)
+ gathererArgs := logGathererArgs{
+ bufferTimeout: 1 * time.Millisecond,
+ newClient: func(target *plan.LogTarget) (logClient, error) {
+ return &testClient{
+ bufferSize: 5,
+ sendCh: received,
+ }, nil
+ },
+ }
+
+ g, err := newLogGathererInternal(&plan.LogTarget{Name: "tgt1"}, gathererArgs)
+ c.Assert(err, IsNil)
+
+ testSvc := newTestService("svc1")
+ g.ServiceStarted(testSvc.config, testSvc.ringBuffer)
+
+ testSvc.writeLog("log line #1")
+ select {
+ case <-time.After(1 * time.Second):
+ c.Fatalf("timeout waiting for logs")
+ case logs := <-received:
+ checkLogs(c, logs, []string{"log line #1"})
+ }
+}
+
+func (s *gathererSuite) TestGathererShutdown(c *C) {
+ received := make(chan []servicelog.Entry, 1)
+ gathererArgs := logGathererArgs{
+ bufferTimeout: 1 * time.Microsecond,
+ newClient: func(target *plan.LogTarget) (logClient, error) {
+ return &testClient{
+ bufferSize: 5,
+ sendCh: received,
+ }, nil
+ },
+ }
+
+ g, err := newLogGathererInternal(&plan.LogTarget{Name: "tgt1"}, gathererArgs)
+ c.Assert(err, IsNil)
+
+ testSvc := newTestService("svc1")
+ g.ServiceStarted(testSvc.config, testSvc.ringBuffer)
+
+ testSvc.writeLog("log line #1")
+ err = testSvc.stop()
+ c.Assert(err, IsNil)
+
+ hasShutdown := make(chan struct{})
+ go func() {
+ g.Stop()
+ close(hasShutdown)
+ }()
+
+ select {
+ case <-time.After(100 * time.Millisecond):
+ c.Fatalf("timeout waiting for gatherer to tear down")
+ case <-hasShutdown:
+ }
+
+ // check logs received
+ select {
+ case logs := <-received:
+ checkLogs(c, logs, []string{"log line #1"})
+ default:
+ c.Fatalf(`no logs were received
+logs in client buffer: %v`, len(g.client.(*testClient).buffered))
+ }
+}
+
+func checkLogs(c *C, received []servicelog.Entry, expected []string) {
+ c.Assert(received, HasLen, len(expected))
+ for i, entry := range received {
+ c.Check(entry.Message, Equals, expected[i]+"\n")
+ }
+}
+
+// test implementation of a client with buffer
+type testClient struct {
+ bufferSize int
+ buffered []servicelog.Entry
+ sendCh chan []servicelog.Entry
+}
+
+func (c *testClient) Write(ctx context.Context, entry servicelog.Entry) error {
+ c.buffered = append(c.buffered, entry)
+ if len(c.buffered) >= c.bufferSize {
+ return c.Flush(ctx)
+ }
+ return nil
+}
+
+func (c *testClient) Flush(ctx context.Context) (err error) {
+ if len(c.buffered) == 0 {
+ return
+ }
+
+ select {
+ case <-ctx.Done():
+ err = fmt.Errorf("timeout flushing, dropping logs")
+ case c.sendCh <- c.buffered:
+ }
+
+ c.buffered = c.buffered[:0]
+ return err
+}
+
+// fake "service" - useful for testing
+type testService struct {
+ name string
+ config *plan.Service
+ ringBuffer *servicelog.RingBuffer
+ writer io.Writer
+}
+
+func newTestService(name string) *testService {
+ rb := servicelog.NewRingBuffer(1024)
+ return &testService{
+ name: name,
+ config: &plan.Service{
+ Name: name,
+ },
+ ringBuffer: rb,
+ writer: servicelog.NewFormatWriter(rb, "svc1"),
+ }
+}
+
+func (s *testService) writeLog(log string) {
+ _, _ = s.writer.Write([]byte(log + "\n"))
+}
+
+func (s *testService) stop() error {
+ return s.ringBuffer.Close()
+}
diff --git a/internals/overlord/logstate/manager.go b/internals/overlord/logstate/manager.go
index 4ee2b235..46582899 100644
--- a/internals/overlord/logstate/manager.go
+++ b/internals/overlord/logstate/manager.go
@@ -15,26 +15,98 @@
package logstate
import (
+ "sync"
+
+ "github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)
-type LogManager struct{}
+type LogManager struct {
+ mu sync.Mutex
+ gatherers map[string]*logGatherer
+ buffers map[string]*servicelog.RingBuffer
+ plan *plan.Plan
+
+ newGatherer func(*plan.LogTarget) (*logGatherer, error)
+}
func NewLogManager() *LogManager {
- return &LogManager{}
+ return &LogManager{
+ gatherers: map[string]*logGatherer{},
+ buffers: map[string]*servicelog.RingBuffer{},
+ newGatherer: newLogGatherer,
+ }
}
-// PlanChanged is called by the service manager when the plan changes. We stop
-// all running forwarders, and start new forwarders based on the new plan.
+// PlanChanged is called by the service manager when the plan changes.
+// Based on the new plan, we will Stop old gatherers and start new ones.
func (m *LogManager) PlanChanged(pl *plan.Plan) {
- // TODO: implement
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ // Create a map to hold gatherers for the new plan.
+ // Old gatherers will be moved over or deleted.
+ newGatherers := make(map[string]*logGatherer, len(pl.LogTargets))
+
+ for _, target := range pl.LogTargets {
+ gatherer := m.gatherers[target.Name]
+ if gatherer == nil {
+ // Create new gatherer
+ var err error
+ gatherer, err = m.newGatherer(target)
+ if err != nil {
+ logger.Noticef("Internal error: cannot create gatherer for target %q: %v",
+ target.Name, err)
+ continue
+ }
+ newGatherers[target.Name] = gatherer
+ } else {
+ // Copy over existing gatherer
+ newGatherers[target.Name] = gatherer
+ delete(m.gatherers, target.Name)
+ }
+
+ // Update iterators for gatherer
+ gatherer.PlanChanged(pl, m.buffers)
+ }
+
+ // Old gatherers for now-removed targets need to be shut down.
+ for _, gatherer := range m.gatherers {
+ go gatherer.Stop()
+ }
+ m.gatherers = newGatherers
+
+ // Remove old buffers
+ for svc := range m.buffers {
+ if _, ok := pl.Services[svc]; !ok {
+ // Service has been removed
+ delete(m.buffers, svc)
+ }
+ }
+
+ m.plan = pl
}
// ServiceStarted notifies the log manager that the named service has started,
// and provides a reference to the service's log buffer.
-func (m *LogManager) ServiceStarted(serviceName string, buffer *servicelog.RingBuffer) {
- // TODO: implement
+func (m *LogManager) ServiceStarted(service *plan.Service, buffer *servicelog.RingBuffer) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ if m.buffers[service.Name] == buffer {
+ // Service restarted with same buffer. Don't need to update anything
+ return
+ }
+
+ m.buffers[service.Name] = buffer
+ for _, gatherer := range m.gatherers {
+ target := m.plan.LogTargets[gatherer.targetName]
+ if !service.LogsTo(target) {
+ continue
+ }
+ gatherer.ServiceStarted(service, buffer)
+ }
}
// Ensure implements overlord.StateManager.
@@ -44,5 +116,16 @@ func (m *LogManager) Ensure() error {
// Stop implements overlord.StateStopper and stops all log forwarding.
func (m *LogManager) Stop() {
- // TODO: implement
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ wg := sync.WaitGroup{}
+ for _, gatherer := range m.gatherers {
+ wg.Add(1)
+ go func(gatherer *logGatherer) {
+ gatherer.Stop()
+ wg.Done()
+ }(gatherer)
+ }
+ wg.Wait()
}
diff --git a/internals/overlord/logstate/manager_test.go b/internals/overlord/logstate/manager_test.go
new file mode 100644
index 00000000..366250be
--- /dev/null
+++ b/internals/overlord/logstate/manager_test.go
@@ -0,0 +1,186 @@
+// Copyright (c) 2023 Canonical Ltd
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3 as
+// published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+package logstate
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ . "gopkg.in/check.v1"
+
+ "github.com/canonical/pebble/internals/plan"
+ "github.com/canonical/pebble/internals/servicelog"
+)
+
+type managerSuite struct{}
+
+var _ = Suite(&managerSuite{})
+
+func (s *managerSuite) TestPlanChange(c *C) {
+ gathererArgs := logGathererArgs{
+ newClient: func(target *plan.LogTarget) (logClient, error) {
+ return &testClient{}, nil
+ },
+ }
+ m := NewLogManager()
+ m.newGatherer = func(t *plan.LogTarget) (*logGatherer, error) {
+ return newLogGathererInternal(t, gathererArgs)
+ }
+
+ svc1 := newTestService("svc1")
+ svc2 := newTestService("svc2")
+ svc3 := newTestService("svc3")
+
+ m.PlanChanged(&plan.Plan{
+ Services: map[string]*plan.Service{
+ svc1.name: svc1.config,
+ svc2.name: svc2.config,
+ svc3.name: svc3.config,
+ },
+ LogTargets: map[string]*plan.LogTarget{
+ "tgt1": {Name: "tgt1", Services: []string{"all", "-svc3"}},
+ "tgt2": {Name: "tgt2", Services: []string{}},
+ "tgt3": {Name: "tgt3", Services: []string{"all"}},
+ },
+ })
+ m.ServiceStarted(svc1.config, svc1.ringBuffer)
+ m.ServiceStarted(svc2.config, svc2.ringBuffer)
+ m.ServiceStarted(svc3.config, svc3.ringBuffer)
+
+ checkGatherers(c, m.gatherers, map[string][]string{
+ "tgt1": {"svc1", "svc2"},
+ "tgt2": {},
+ "tgt3": {"svc1", "svc2", "svc3"},
+ })
+ checkBuffers(c, m.buffers, []string{"svc1", "svc2", "svc3"})
+
+ svc4 := newTestService("svc4")
+
+ m.PlanChanged(&plan.Plan{
+ Services: map[string]*plan.Service{
+ svc1.name: svc1.config,
+ svc2.name: svc2.config,
+ svc4.name: svc4.config,
+ },
+ LogTargets: map[string]*plan.LogTarget{
+ "tgt1": {Name: "tgt1", Services: []string{"svc1"}},
+ "tgt2": {Name: "tgt2", Services: []string{"svc1", "svc4"}},
+ "tgt4": {Name: "tgt4", Services: []string{"all", "-svc2"}},
+ },
+ })
+ m.ServiceStarted(svc4.config, svc4.ringBuffer)
+ // simulate service restart for svc2
+ m.ServiceStarted(svc2.config, svc2.ringBuffer)
+
+ checkGatherers(c, m.gatherers, map[string][]string{
+ "tgt1": {"svc1"},
+ "tgt2": {"svc1", "svc4"},
+ "tgt4": {"svc1", "svc4"},
+ })
+ // svc3 no longer exists so we should have dropped the reference to its buffer
+ checkBuffers(c, m.buffers, []string{"svc1", "svc2", "svc4"})
+}
+
+func checkGatherers(c *C, gatherers map[string]*logGatherer, expected map[string][]string) {
+ c.Assert(gatherers, HasLen, len(expected))
+ for tgtName, svcs := range expected {
+ g, ok := gatherers[tgtName]
+ c.Assert(ok, Equals, true)
+
+ c.Assert(g.pullers.len(), Equals, len(svcs))
+ for _, svc := range svcs {
+ c.Check(g.pullers.contains(svc), Equals, true)
+ }
+ }
+}
+
+func checkBuffers(c *C, buffers map[string]*servicelog.RingBuffer, expected []string) {
+ c.Assert(buffers, HasLen, len(expected))
+ for _, svcName := range expected {
+ _, ok := buffers[svcName]
+ c.Check(ok, Equals, true)
+ }
+}
+
+func (s *managerSuite) TestTimelyShutdown(c *C) {
+ gathererArgs := logGathererArgs{
+ timeoutFinalFlush: 5 * time.Millisecond,
+ newClient: func(target *plan.LogTarget) (logClient, error) {
+ return &slowFlushingClient{
+ flushTime: 10 * time.Second,
+ }, nil
+ },
+ }
+
+ m := NewLogManager()
+ m.newGatherer = func(t *plan.LogTarget) (*logGatherer, error) {
+ return newLogGathererInternal(t, gathererArgs)
+ }
+
+ svc1 := newTestService("svc1")
+
+ // Start 10 log gatherers
+ logTargets := make(map[string]*plan.LogTarget, 10)
+ for i := 0; i < 10; i++ {
+ targetName := fmt.Sprintf("tgt%d", i)
+ logTargets[targetName] = &plan.LogTarget{
+ Name: targetName,
+ Services: []string{"all"},
+ }
+ }
+ m.PlanChanged(&plan.Plan{
+ Services: map[string]*plan.Service{
+ "svc1": svc1.config,
+ },
+ LogTargets: logTargets,
+ })
+ m.ServiceStarted(svc1.config, svc1.ringBuffer)
+
+ c.Assert(m.gatherers, HasLen, 10)
+
+ err := svc1.stop()
+ c.Assert(err, IsNil)
+
+ // Stop all gatherers and check this happens quickly
+ done := make(chan struct{})
+ go func() {
+ m.Stop()
+ close(done)
+ }()
+ select {
+ case <-done:
+ case <-time.After(50 * time.Millisecond):
+ c.Fatal("LogManager.Stop() took too long")
+ }
+}
+
+type slowFlushingClient struct {
+ flushTime time.Duration
+}
+
+func (c *slowFlushingClient) Write(_ context.Context, _ servicelog.Entry) error {
+ // no-op
+ return nil
+}
+
+func (c *slowFlushingClient) Flush(ctx context.Context) error {
+ select {
+ case <-ctx.Done():
+ return fmt.Errorf("flush timed out")
+ case <-time.After(c.flushTime):
+ return nil
+ }
+}
diff --git a/internals/overlord/logstate/package_test.go b/internals/overlord/logstate/package_test.go
new file mode 100644
index 00000000..e7fe8c86
--- /dev/null
+++ b/internals/overlord/logstate/package_test.go
@@ -0,0 +1,23 @@
+// Copyright (c) 2023 Canonical Ltd
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3 as
+// published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+package logstate
+
+import (
+ "testing"
+
+ . "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) { TestingT(t) }
diff --git a/internals/overlord/logstate/puller.go b/internals/overlord/logstate/puller.go
new file mode 100644
index 00000000..cd12c5d8
--- /dev/null
+++ b/internals/overlord/logstate/puller.go
@@ -0,0 +1,177 @@
+// Copyright (c) 2023 Canonical Ltd
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3 as
+// published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+package logstate
+
+import (
+ "sync"
+
+ "gopkg.in/tomb.v2"
+
+ "github.com/canonical/pebble/internals/logger"
+ "github.com/canonical/pebble/internals/servicelog"
+)
+
+// logPuller handles pulling logs from a single iterator and sending to the
+// main control loop.
+type logPuller struct {
+ iterator servicelog.Iterator
+ entryCh chan<- servicelog.Entry
+
+ tomb tomb.Tomb
+}
+
+// loop pulls logs off the iterator and sends them on the entryCh.
+// The loop will terminate:
+// - if the puller's context is cancelled
+// - once the ringbuffer is closed and the iterator finishes reading all
+// remaining logs.
+func (p *logPuller) loop() error {
+ defer func() { _ = p.iterator.Close() }()
+
+ parser := servicelog.NewParser(p.iterator, parserSize)
+ for p.iterator.Next(p.tomb.Dying()) {
+ for parser.Next() {
+ if err := parser.Err(); err != nil {
+ return err
+ }
+
+ select {
+ case p.entryCh <- parser.Entry():
+ case <-p.tomb.Dying():
+ return nil
+ }
+ }
+ }
+ return nil
+}
+
+// pullerGroup represents a group of logPullers, and provides methods for a
+// gatherer to manage logPullers (dynamically add/remove, kill all, wait for
+// all to finish).
+type pullerGroup struct {
+ targetName string
+
+ // Currently active logPullers, indexed by service name
+ pullers map[string]*logPuller
+ // Mutex for pullers map
+ mu sync.RWMutex
+
+ tomb tomb.Tomb
+}
+
+func newPullerGroup(targetName string) *pullerGroup {
+ pg := &pullerGroup{
+ targetName: targetName,
+ pullers: map[string]*logPuller{},
+ }
+ // This goroutine lives for the lifetime of the pullerGroup. This is so that,
+ // if needed, we can safely remove all pullers and then add more, without
+ // causing a panic (tombs can't be reused once all the tracked goroutines
+ // have finished).
+ pg.tomb.Go(func() error {
+ <-pg.tomb.Dying()
+ return nil
+ })
+
+ return pg
+}
+
+// Add adds a new puller to the group. This puller will read from the given
+// buffer, and send parsed logs on the provided channel.
+func (pg *pullerGroup) Add(serviceName string, buffer *servicelog.RingBuffer, entryCh chan<- servicelog.Entry) {
+ pg.mu.Lock()
+ defer pg.mu.Unlock()
+
+ // There shouldn't already be a puller for this service, but if there is,
+ // shut it down first and wait for it to die.
+ pg.remove(serviceName)
+
+ lp := &logPuller{
+ iterator: buffer.TailIterator(),
+ entryCh: entryCh,
+ }
+ lp.tomb.Go(lp.loop)
+ pg.tomb.Go(lp.tomb.Wait)
+
+ pg.pullers[serviceName] = lp
+}
+
+// Services returns a list containing the name of each service for which we
+// have a puller in this group.
+func (pg *pullerGroup) Services() []string {
+ pg.mu.RLock()
+ defer pg.mu.RUnlock()
+
+ svcs := make([]string, 0, len(pg.pullers))
+ for svc := range pg.pullers {
+ svcs = append(svcs, svc)
+ }
+ return svcs
+}
+
+// Remove removes the puller for the named service.
+func (pg *pullerGroup) Remove(serviceName string) {
+ pg.mu.Lock()
+ defer pg.mu.Unlock()
+ pg.remove(serviceName)
+}
+
+// remove removes the puller for the named service.
+// This method is not concurrency-safe - please lock pg.mu before calling.
+func (pg *pullerGroup) remove(serviceName string) {
+ puller, pullerExists := pg.pullers[serviceName]
+ if !pullerExists {
+ return
+ }
+
+ puller.tomb.Kill(nil)
+ delete(pg.pullers, serviceName)
+
+ err := puller.tomb.Wait()
+ if err != nil {
+ logger.Noticef("Error from log puller: %v", err)
+ }
+}
+
+// KillAll kills all pullers in this group.
+func (pg *pullerGroup) KillAll() {
+ pg.mu.RLock()
+ defer pg.mu.RUnlock()
+
+ for _, puller := range pg.pullers {
+ puller.tomb.Kill(nil)
+ }
+ pg.tomb.Kill(nil)
+}
+
+// Done returns a channel which can be waited on until all pullers have finished.
+func (pg *pullerGroup) Done() <-chan struct{} {
+ return pg.tomb.Dead()
+}
+
+// contains is used for testing.
+func (pg *pullerGroup) contains(serviceName string) bool {
+ pg.mu.RLock()
+ defer pg.mu.RUnlock()
+ _, ok := pg.pullers[serviceName]
+ return ok
+}
+
+// len is used for testing.
+func (pg *pullerGroup) len() int {
+ pg.mu.RLock()
+ defer pg.mu.RUnlock()
+ return len(pg.pullers)
+}
diff --git a/internals/overlord/overlord.go b/internals/overlord/overlord.go
index 771a670d..b731c34e 100644
--- a/internals/overlord/overlord.go
+++ b/internals/overlord/overlord.go
@@ -108,13 +108,16 @@ func New(pebbleDir string, restartHandler restart.Handler, serviceOutput io.Writ
o.runner.AddOptionalHandler(matchAnyUnknownTask, handleUnknownTask, nil)
o.logMgr = logstate.NewLogManager()
- o.addManager(o.logMgr)
o.serviceMgr, err = servstate.NewManager(s, o.runner, o.pebbleDir, serviceOutput, restartHandler, o.logMgr)
if err != nil {
return nil, err
}
o.addManager(o.serviceMgr)
+ // The log manager should be stopped after the service manager, because
+ // ServiceManager.Stop closes the service ring buffers, which signals to the
+ // log manager that it's okay to stop log forwarding.
+ o.addManager(o.logMgr)
o.commandMgr = cmdstate.NewManager(o.runner)
o.addManager(o.commandMgr)
diff --git a/internals/overlord/servstate/handlers.go b/internals/overlord/servstate/handlers.go
index 32348a25..2db5f6cc 100644
--- a/internals/overlord/servstate/handlers.go
+++ b/internals/overlord/servstate/handlers.go
@@ -280,6 +280,21 @@ func (m *ServiceManager) serviceForStop(task *state.Task, name string) *serviceD
func (m *ServiceManager) removeService(name string) {
m.servicesLock.Lock()
defer m.servicesLock.Unlock()
+ m.removeServiceInternal(name)
+}
+
+// not concurrency-safe, please lock m.servicesLock before calling
+func (m *ServiceManager) removeServiceInternal(name string) {
+ svc, svcExists := m.services[name]
+ if !svcExists {
+ return
+ }
+ if svc.logs != nil {
+ err := svc.logs.Close()
+ if err != nil {
+ logger.Noticef("Error closing service %q ring buffer: %v", name, err)
+ }
+ }
delete(m.services, name)
}
@@ -444,7 +459,7 @@ func (s *serviceData) startInternal() error {
}
// Pass buffer reference to logMgr to start log forwarding
- s.manager.logMgr.ServiceStarted(serviceName, s.logs)
+ s.manager.logMgr.ServiceStarted(s.config, s.logs)
return nil
}
diff --git a/internals/overlord/servstate/manager.go b/internals/overlord/servstate/manager.go
index f8edb2e0..f85bfeb5 100644
--- a/internals/overlord/servstate/manager.go
+++ b/internals/overlord/servstate/manager.go
@@ -39,7 +39,7 @@ type ServiceManager struct {
}
type LogManager interface {
- ServiceStarted(serviceName string, logs *servicelog.RingBuffer)
+ ServiceStarted(service *plan.Service, logs *servicelog.RingBuffer)
}
// PlanFunc is the type of function used by NotifyPlanChanged.
@@ -88,6 +88,13 @@ func (m *ServiceManager) Stop() {
if err != nil {
logger.Noticef("Cannot stop child process reaper: %v", err)
}
+
+ // Close all the service ringbuffers
+ m.servicesLock.Lock()
+ defer m.servicesLock.Unlock()
+ for name := range m.services {
+ m.removeServiceInternal(name)
+ }
}
// NotifyPlanChanged adds f to the list of functions that are called whenever
diff --git a/internals/overlord/servstate/manager_test.go b/internals/overlord/servstate/manager_test.go
index 48a360c0..af9befd6 100644
--- a/internals/overlord/servstate/manager_test.go
+++ b/internals/overlord/servstate/manager_test.go
@@ -1867,7 +1867,7 @@ func createZombie() error {
type fakeLogManager struct{}
-func (f fakeLogManager) ServiceStarted(serviceName string, logs *servicelog.RingBuffer) {
+func (f fakeLogManager) ServiceStarted(service *plan.Service, logs *servicelog.RingBuffer) {
// no-op
}
diff --git a/internals/overlord/stateengine.go b/internals/overlord/stateengine.go
index 25710333..18908ffc 100644
--- a/internals/overlord/stateengine.go
+++ b/internals/overlord/stateengine.go
@@ -138,6 +138,7 @@ func (se *StateEngine) Stop() {
}
for _, m := range se.managers {
if stopper, ok := m.(StateStopper); ok {
+ logger.Debugf("state engine: stopping %T", m)
stopper.Stop()
}
}
diff --git a/internals/servicelog/iterator.go b/internals/servicelog/iterator.go
index eb2926b2..29ecd1c3 100644
--- a/internals/servicelog/iterator.go
+++ b/internals/servicelog/iterator.go
@@ -20,27 +20,29 @@ import (
)
type Iterator interface {
- // Close closes the iterator so that buffers can be used for future writes.
- // If Close is not called, the iterator will block buffer recycling causing
- // write failures.
+ // Close removes this iterator from the ring buffer. After calling Close,
+ // any future calls to Next will return false.
Close() error
- // Next moves the ring buffer read mark forward, making its tail available for reuse
- // without truncation. If the ring buffer writer produces data faster than the iterator
- // can read it, the iterator will eventually be truncated and restarted. The truncation
- // will be identified in the iterator output with the text specified when the iterator was
- // created.
- // Next returns true if there is more data to read from the RingBuffer.
- // If a non-nil cancel channel is passed in, Next will wait for more data to
- // become available. Sending on this channel, or closing it, will cause Next to
- // return immediately.
+
+ // Next returns true if there is more data to read. If a non-nil cancel
+ // channel is passed in, Next will wait for more data to become available.
+ // Sending on this channel, or closing it, will cause Next to return
+ // immediately.
+ // If the ring buffer writer produces data faster than the iterator can read
+ // it, the iterator will eventually be truncated and restarted. The
+ // truncation will be identified in the iterator output with the text
+ // specified when the iterator was created.
Next(cancel <-chan struct{}) bool
- // Notify sets the notification channel. When more data is available, the channel
- // passed in to Notify will have true sent on it. If the channel is not receiving (unbuffered)
- // or full (buffered), the notification will be dropped.
+ // Notify sets the notification channel. When more data is available, the
+ // channel passed in to Notify will have true sent on it. If the channel is
+ // not receiving (unbuffered) or full (buffered), the notification will be
+ // dropped.
Notify(ch chan bool)
+ // Buffered returns the approximate number of bytes available to read.
Buffered() int
+
io.Reader
io.WriterTo
}