From ff53051c5f7601e5a1569cd6f261ac26908fb243 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Mon, 3 Jul 2023 15:04:45 +1200 Subject: [PATCH 01/31] Log forwarding implementation --- internals/overlord/logstate/fake.go | 70 +++++ internals/overlord/logstate/gatherer.go | 255 +++++++++++++++++++ internals/overlord/logstate/gatherer_test.go | 203 +++++++++++++++ internals/overlord/logstate/manager.go | 99 ++++++- internals/overlord/logstate/manager_test.go | 186 ++++++++++++++ internals/overlord/logstate/package_test.go | 23 ++ internals/overlord/logstate/puller.go | 153 +++++++++++ internals/overlord/servstate/handlers.go | 2 +- internals/overlord/servstate/manager.go | 2 +- internals/overlord/servstate/manager_test.go | 6 + internals/overlord/stateengine.go | 9 +- internals/overlord/stateengine_test.go | 63 +++-- internals/servicelog/iterator.go | 32 +-- 13 files changed, 1059 insertions(+), 44 deletions(-) create mode 100644 internals/overlord/logstate/fake.go create mode 100644 internals/overlord/logstate/gatherer.go create mode 100644 internals/overlord/logstate/gatherer_test.go create mode 100644 internals/overlord/logstate/manager_test.go create mode 100644 internals/overlord/logstate/package_test.go create mode 100644 internals/overlord/logstate/puller.go diff --git a/internals/overlord/logstate/fake.go b/internals/overlord/logstate/fake.go new file mode 100644 index 00000000..8356e236 --- /dev/null +++ b/internals/overlord/logstate/fake.go @@ -0,0 +1,70 @@ +package logstate + +import ( + "context" + "fmt" + "time" + + "github.com/canonical/pebble/internals/servicelog" +) + +// Fake sample implementations of logClient +// TODO: remove this file before merging + +type nonBufferingClient struct{} + +var _ logClient = &nonBufferingClient{} + +func (c *nonBufferingClient) Write(_ context.Context, entry servicelog.Entry) error { + fmt.Printf("%v [%s] %s", entry.Time, entry.Service, entry.Message) + return nil +} + +func (c *nonBufferingClient) Flush(_ context.Context) error { + // no-op + return nil +} + +type bufferingClient struct { + entries []servicelog.Entry + threshold int +} + +var _ logClient = &bufferingClient{} + +func (c *bufferingClient) Write(ctx context.Context, entry servicelog.Entry) error { + c.entries = append(c.entries, entry) + if c.threshold > 0 && len(c.entries) >= c.threshold { + return c.Flush(ctx) + } + return nil +} + +func (c *bufferingClient) Flush(_ context.Context) error { + for _, entry := range c.entries { + fmt.Printf("%v [%s] %s", entry.Time, entry.Service, entry.Message) + } + fmt.Println() + c.entries = c.entries[:0] + return nil +} + +// a slow client where Flush takes a long time +type slowClient struct { + flushTime time.Duration +} + +var _ logClient = &slowClient{} + +func (c *slowClient) Write(_ context.Context, _ servicelog.Entry) error { + return nil +} + +func (c *slowClient) Flush(ctx context.Context) error { + select { + case <-time.After(c.flushTime): + return nil + case <-ctx.Done(): + return fmt.Errorf("timeout flushing logs") + } +} diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go new file mode 100644 index 00000000..e727bceb --- /dev/null +++ b/internals/overlord/logstate/gatherer.go @@ -0,0 +1,255 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package logstate + +import ( + "context" + "fmt" + "time" + + "gopkg.in/tomb.v2" + + "github.com/canonical/pebble/internals/plan" + "github.com/canonical/pebble/internals/servicelog" +) + +const ( + parserSize = 4 * 1024 + tickPeriod = 1 * time.Second + + // These constants control the maximum time allowed for each teardown step. + timeoutCurrentFlush = 1 * time.Second + timeoutPullers = 2 * time.Second + timeoutMainLoop = 3 * time.Second + // timeoutFinalFlush is measured from when the gatherer's main loop finishes, + // NOT from when stop() is called like the other constants. + timeoutFinalFlush = 2 * time.Second +) + +// logGatherer is responsible for collecting service logs from a bunch of +// services, and sending them to its logClient. +// One logGatherer will run per log target. Its loop() method should be run +// in its own goroutine. +// A logGatherer will spawn a separate logPuller for each service it collects +// logs from. Each logPuller will run in a separate goroutine, and send logs to +// the logGatherer via a shared channel. +// The logGatherer will "flush" the client: +// - on a regular cadence (e.g. every 1 second) +// - when it is told to shut down. +// +// The client may also flush itself when its internal buffer reaches a certain +// size. +// Calling the stop() method will tear down the logGatherer and all of its +// associated logPullers. stop() can be called from an outside goroutine. +type logGatherer struct { + logGathererArgs + + targetName string + // tomb for the main loop + tomb tomb.Tomb + + client logClient + // Context to pass to client methods + clientCtx context.Context + // cancel func for clientCtx - can be used during teardown if required, to + // ensure the client is not blocking subsequent teardown steps. + clientCancel context.CancelFunc + + pullers *pullerGroup + // All pullers send logs on this channel, received by main loop + entryCh chan servicelog.Entry +} + +// logGathererArgs allows overriding the newLogClient method and time values +// in testing. +type logGathererArgs struct { + tickPeriod time.Duration + timeoutFinalFlush time.Duration + // method to get a new client + newClient func(*plan.LogTarget) (logClient, error) +} + +func newLogGatherer(target *plan.LogTarget) (*logGatherer, error) { + return newLogGathererInternal(target, logGathererArgs{}) +} + +// newLogGathererInternal contains the actual creation code for a logGatherer. +// This function is used in the real implementation, but also allows overriding +// certain configuration values for testing. +func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logGatherer, error) { + args = fillDefaultArgs(args) + client, err := args.newClient(target) + if err != nil { + return nil, fmt.Errorf("could not create log client: %w", err) + } + + g := &logGatherer{ + logGathererArgs: args, + + targetName: target.Name, + client: client, + entryCh: make(chan servicelog.Entry), + pullers: newPullerGroup(target.Name), + } + g.clientCtx, g.clientCancel = context.WithCancel(context.Background()) + g.tomb.Go(g.loop) + return g, nil +} + +func fillDefaultArgs(args logGathererArgs) logGathererArgs { + if args.tickPeriod == 0 { + args.tickPeriod = tickPeriod + } + if args.timeoutFinalFlush == 0 { + args.timeoutFinalFlush = timeoutFinalFlush + } + if args.newClient == nil { + args.newClient = newLogClient + } + return args +} + +// planChanged is called by the LogManager when the plan is changed. +func (g *logGatherer) planChanged(pl *plan.Plan, buffers map[string]*servicelog.RingBuffer) { + // Remove old pullers + g.pullers.RemoveOld(pl) + + // Add new pullers + for _, service := range pl.Services { + target := pl.LogTargets[g.targetName] + if !service.LogsTo(target) { + continue + } + + buffer, bufferExists := buffers[service.Name] + if !bufferExists { + // We don't yet have a reference to the service's ring buffer + // Need to wait until serviceStarted + continue + } + + g.pullers.Add(service.Name, buffer, g.entryCh) + } +} + +// serviceStarted is called by the LogManager on the start of a service which +// logs to this gatherer's target. +func (g *logGatherer) serviceStarted(service *plan.Service, buffer *servicelog.RingBuffer) { + g.pullers.Add(service.Name, buffer, g.entryCh) +} + +// The main control loop for the logGatherer. loop receives logs from the +// pullers on entryCh, and writes them to the client. It also flushes the +// client periodically, and exits when the gatherer's tomb is killed. +func (g *logGatherer) loop() error { + ticker := time.NewTicker(g.tickPeriod) + defer ticker.Stop() + +mainLoop: + for { + select { + case <-g.tomb.Dying(): + break mainLoop + + case <-ticker.C: + // Timeout - flush + err := g.client.Flush(g.clientCtx) + if err != nil { + return fmt.Errorf("sending logs to target %q: %v", g.targetName, err) + } + + case entry := <-g.entryCh: + err := g.client.Write(g.clientCtx, entry) + if err != nil { + return fmt.Errorf("writing logs to target %q: %v", g.targetName, err) + } + } + } + + // Final flush to send any remaining logs buffered in the client + ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush) + defer cancel() + err := g.client.Flush(ctx) + if err != nil { + return fmt.Errorf("sending logs to target %q: %v", g.targetName, err) + } + return nil +} + +// stop tears down the gatherer and associated resources (pullers, client). +// This method will block until gatherer teardown is complete. +// +// The teardown process has several steps: +// - If the main loop is in the middle of a flush when we call stop, this +// will block the pullers from sending logs to the gatherer. Hence, wait +// for the current flush to complete. +// - Wait for the pullers to pull the final logs from the iterator. +// - Kill the main loop. +// - Flush out any final logs buffered in the client. +func (g *logGatherer) stop() { + // Wait up to timeoutCurrentFlush for the current flush to complete (if any) + time.AfterFunc(timeoutCurrentFlush, g.clientCancel) + + // Wait up to timeoutPullers for the pullers to pull the final logs from the + // iterator and send to the main loop. + time.AfterFunc(timeoutPullers, g.pullers.KillAll) + + // Kill the main loop once either: + // - all the pullers are done + // - timeoutMainLoop has passed + select { + case <-g.pullers.Done(): + case <-time.After(timeoutMainLoop): + } + + _ = g.tomb.Killf("gatherer stopped") + // Wait for final flush in the main loop + _ = g.tomb.Wait() +} + +// logClient handles requests to a specific type of log target. It encodes +// log messages in the required format, and sends the messages using the +// protocol required by that log target. +// For example, a logClient for Loki would encode the log messages in the +// JSON format expected by Loki, and send them over HTTP(S). +// +// logClient implementations have some freedom about the semantics of these +// methods. For a buffering client (e.g. HTTP): +// - Write could add the log to the client's internal buffer, calling Flush +// when this buffer reaches capacity. +// - Flush would prepare and send a request with the buffered logs. +// +// For a non-buffering client (e.g. TCP), Write could serialise the log +// directly to the open connection, while Flush would be a no-op. +type logClient interface { + // Write adds the given log entry to the client. Depending on the + // implementation of the client, this may send the log to the remote target, + // or simply add the log to an internal buffer, flushing that buffer when + // required. + Write(context.Context, servicelog.Entry) error + + // Flush sends buffered logs (if any) to the remote target. For clients which + // don't buffer logs, Flush should be a no-op. + Flush(context.Context) error +} + +func newLogClient(target *plan.LogTarget) (logClient, error) { + switch target.Type { + //case plan.LokiTarget: TODO + //case plan.SyslogTarget: TODO + default: + return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name) + } +} diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go new file mode 100644 index 00000000..3ec4ca20 --- /dev/null +++ b/internals/overlord/logstate/gatherer_test.go @@ -0,0 +1,203 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package logstate + +import ( + "context" + "fmt" + "io" + "time" + + . "gopkg.in/check.v1" + + "github.com/canonical/pebble/internals/plan" + "github.com/canonical/pebble/internals/servicelog" +) + +type gathererSuite struct{} + +var _ = Suite(&gathererSuite{}) + +func (s *gathererSuite) TestGatherer(c *C) { + received := make(chan []servicelog.Entry, 1) + gathererArgs := logGathererArgs{ + newClient: func(target *plan.LogTarget) (logClient, error) { + return &testClient{ + bufferSize: 5, + sendCh: received, + }, nil + }, + } + + g, err := newLogGathererInternal(&plan.LogTarget{Name: "tgt1"}, gathererArgs) + c.Assert(err, IsNil) + + testSvc := newTestService("svc1") + g.serviceStarted(testSvc.config, testSvc.ringBuffer) + + testSvc.writeLog("log line #1") + testSvc.writeLog("log line #2") + testSvc.writeLog("log line #3") + testSvc.writeLog("log line #4") + select { + case logs := <-received: + c.Fatalf("wasn't expecting logs, received %#v", logs) + default: + } + + testSvc.writeLog("log line #5") + select { + case <-time.After(5 * time.Millisecond): + c.Fatalf("timeout waiting for logs") + case logs := <-received: + checkLogs(c, logs, []string{"log line #1", "log line #2", "log line #3", "log line #4", "log line #5"}) + } +} + +func (s *gathererSuite) TestGathererTimeout(c *C) { + received := make(chan []servicelog.Entry, 1) + gathererArgs := logGathererArgs{ + tickPeriod: 1 * time.Microsecond, + newClient: func(target *plan.LogTarget) (logClient, error) { + return &testClient{ + bufferSize: 5, + sendCh: received, + }, nil + }, + } + + g, err := newLogGathererInternal(&plan.LogTarget{Name: "tgt1"}, gathererArgs) + c.Assert(err, IsNil) + + testSvc := newTestService("svc1") + g.serviceStarted(testSvc.config, testSvc.ringBuffer) + + testSvc.writeLog("log line #1") + select { + case <-time.After(20 * time.Millisecond): + c.Fatalf("timeout waiting for logs") + case logs := <-received: + checkLogs(c, logs, []string{"log line #1"}) + } +} + +func (s *gathererSuite) TestGathererShutdown(c *C) { + received := make(chan []servicelog.Entry, 1) + gathererArgs := logGathererArgs{ + tickPeriod: 1 * time.Microsecond, + newClient: func(target *plan.LogTarget) (logClient, error) { + return &testClient{ + bufferSize: 5, + sendCh: received, + }, nil + }, + } + + g, err := newLogGathererInternal(&plan.LogTarget{Name: "tgt1"}, gathererArgs) + c.Assert(err, IsNil) + + testSvc := newTestService("svc1") + g.serviceStarted(testSvc.config, testSvc.ringBuffer) + + testSvc.writeLog("log line #1") + err = testSvc.stop() + c.Assert(err, IsNil) + + hasShutdown := make(chan struct{}) + go func() { + g.stop() + close(hasShutdown) + }() + + select { + case <-time.After(20 * time.Millisecond): + c.Fatalf("timeout waiting for gatherer to tear down") + case <-hasShutdown: + } + + // check logs received + select { + case logs := <-received: + checkLogs(c, logs, []string{"log line #1"}) + default: + c.Fatalf(`no logs were received +logs in client buffer: %v`, len(g.client.(*testClient).buffered)) + } +} + +func checkLogs(c *C, received []servicelog.Entry, expected []string) { + c.Assert(received, HasLen, len(expected)) + for i, entry := range received { + c.Check(entry.Message, Equals, expected[i]+"\n") + } +} + +// test implementation of a client with buffer +type testClient struct { + bufferSize int + buffered []servicelog.Entry + sendCh chan []servicelog.Entry +} + +func (c *testClient) Write(ctx context.Context, entry servicelog.Entry) error { + c.buffered = append(c.buffered, entry) + if len(c.buffered) >= c.bufferSize { + return c.Flush(ctx) + } + return nil +} + +func (c *testClient) Flush(ctx context.Context) (err error) { + if len(c.buffered) == 0 { + return + } + + select { + case <-ctx.Done(): + err = fmt.Errorf("timeout flushing, dropping logs") + case c.sendCh <- c.buffered: + } + + c.buffered = c.buffered[:0] + return err +} + +// fake "service" - useful for testing +type testService struct { + name string + config *plan.Service + ringBuffer *servicelog.RingBuffer + writer io.Writer +} + +func newTestService(name string) *testService { + rb := servicelog.NewRingBuffer(1024) + return &testService{ + name: name, + config: &plan.Service{ + Name: name, + }, + ringBuffer: rb, + writer: servicelog.NewFormatWriter(rb, "svc1"), + } +} + +func (s *testService) writeLog(log string) { + _, _ = s.writer.Write([]byte(log + "\n")) +} + +func (s *testService) stop() error { + return s.ringBuffer.Close() +} diff --git a/internals/overlord/logstate/manager.go b/internals/overlord/logstate/manager.go index 4ee2b235..82a8a17b 100644 --- a/internals/overlord/logstate/manager.go +++ b/internals/overlord/logstate/manager.go @@ -15,26 +15,98 @@ package logstate import ( + "sync" + + "github.com/canonical/pebble/internals/logger" "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" ) -type LogManager struct{} +type LogManager struct { + mu sync.Mutex + gatherers map[string]*logGatherer + buffers map[string]*servicelog.RingBuffer + plan *plan.Plan + + newGatherer func(*plan.LogTarget) (*logGatherer, error) +} func NewLogManager() *LogManager { - return &LogManager{} + return &LogManager{ + gatherers: map[string]*logGatherer{}, + buffers: map[string]*servicelog.RingBuffer{}, + newGatherer: newLogGatherer, + } } -// PlanChanged is called by the service manager when the plan changes. We stop -// all running forwarders, and start new forwarders based on the new plan. +// PlanChanged is called by the service manager when the plan changes. +// Based on the new plan, we will stop old gatherers and start new ones. func (m *LogManager) PlanChanged(pl *plan.Plan) { - // TODO: implement + m.mu.Lock() + defer m.mu.Unlock() + + // Create a map to hold gatherers for the new plan. + // Old gatherers will be moved over or deleted. + newGatherers := make(map[string]*logGatherer, len(pl.LogTargets)) + + for _, target := range pl.LogTargets { + gatherer := m.gatherers[target.Name] + if gatherer == nil { + // Create new gatherer + var err error + gatherer, err = m.newGatherer(target) + if err != nil { + logger.Noticef("Internal error: cannot create gatherer for target %q: %v", + target.Name, err) + continue + } + newGatherers[target.Name] = gatherer + } else { + // Copy over existing gatherer + newGatherers[target.Name] = gatherer + delete(m.gatherers, target.Name) + } + + // Update iterators for gatherer + gatherer.planChanged(pl, m.buffers) + } + + // Old gatherers for now-removed targets need to be shut down. + for _, gatherer := range m.gatherers { + go gatherer.stop() + } + m.gatherers = newGatherers + + // Remove old buffers + for svc := range m.buffers { + if _, ok := pl.Services[svc]; !ok { + // Service has been removed + delete(m.buffers, svc) + } + } + + m.plan = pl } // ServiceStarted notifies the log manager that the named service has started, // and provides a reference to the service's log buffer. -func (m *LogManager) ServiceStarted(serviceName string, buffer *servicelog.RingBuffer) { - // TODO: implement +func (m *LogManager) ServiceStarted(service *plan.Service, buffer *servicelog.RingBuffer) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.buffers[service.Name] == buffer { + // Service restarted with same buffer. Don't need to update anything + return + } + + m.buffers[service.Name] = buffer + for _, gatherer := range m.gatherers { + target := m.plan.LogTargets[gatherer.targetName] + if !service.LogsTo(target) { + continue + } + gatherer.serviceStarted(service, buffer) + } } // Ensure implements overlord.StateManager. @@ -44,5 +116,16 @@ func (m *LogManager) Ensure() error { // Stop implements overlord.StateStopper and stops all log forwarding. func (m *LogManager) Stop() { - // TODO: implement + m.mu.Lock() + defer m.mu.Unlock() + + wg := sync.WaitGroup{} + for _, gatherer := range m.gatherers { + wg.Add(1) + go func(gatherer *logGatherer) { + gatherer.stop() + wg.Done() + }(gatherer) + } + wg.Wait() } diff --git a/internals/overlord/logstate/manager_test.go b/internals/overlord/logstate/manager_test.go new file mode 100644 index 00000000..af861b66 --- /dev/null +++ b/internals/overlord/logstate/manager_test.go @@ -0,0 +1,186 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package logstate + +import ( + "context" + "fmt" + "time" + + . "gopkg.in/check.v1" + + "github.com/canonical/pebble/internals/plan" + "github.com/canonical/pebble/internals/servicelog" +) + +type managerSuite struct{} + +var _ = Suite(&managerSuite{}) + +func (s *managerSuite) TestPlanChange(c *C) { + gathererArgs := logGathererArgs{ + newClient: func(target *plan.LogTarget) (logClient, error) { + return &testClient{}, nil + }, + } + m := NewLogManager() + m.newGatherer = func(t *plan.LogTarget) (*logGatherer, error) { + return newLogGathererInternal(t, gathererArgs) + } + + svc1 := newTestService("svc1") + svc2 := newTestService("svc2") + svc3 := newTestService("svc3") + + m.PlanChanged(&plan.Plan{ + Services: map[string]*plan.Service{ + svc1.name: svc1.config, + svc2.name: svc2.config, + svc3.name: svc3.config, + }, + LogTargets: map[string]*plan.LogTarget{ + "tgt1": {Name: "tgt1", Services: []string{"all", "-svc3"}}, + "tgt2": {Name: "tgt2", Services: []string{}}, + "tgt3": {Name: "tgt3", Services: []string{"all"}}, + }, + }) + m.ServiceStarted(svc1.config, svc1.ringBuffer) + m.ServiceStarted(svc2.config, svc2.ringBuffer) + m.ServiceStarted(svc3.config, svc3.ringBuffer) + + checkGatherers(c, m.gatherers, map[string][]string{ + "tgt1": {"svc1", "svc2"}, + "tgt2": {}, + "tgt3": {"svc1", "svc2", "svc3"}, + }) + checkBuffers(c, m.buffers, []string{"svc1", "svc2", "svc3"}) + + svc4 := newTestService("svc4") + + m.PlanChanged(&plan.Plan{ + Services: map[string]*plan.Service{ + svc1.name: svc1.config, + svc2.name: svc2.config, + svc4.name: svc4.config, + }, + LogTargets: map[string]*plan.LogTarget{ + "tgt1": {Name: "tgt1", Services: []string{"svc1"}}, + "tgt2": {Name: "tgt2", Services: []string{"svc1", "svc4"}}, + "tgt4": {Name: "tgt4", Services: []string{"all", "-svc2"}}, + }, + }) + m.ServiceStarted(svc4.config, svc4.ringBuffer) + // simulate service restart for svc2 + m.ServiceStarted(svc2.config, svc2.ringBuffer) + + checkGatherers(c, m.gatherers, map[string][]string{ + "tgt1": {"svc1"}, + "tgt2": {"svc1", "svc4"}, + "tgt4": {"svc1", "svc4"}, + }) + // svc3 no longer exists so we should have dropped the reference to its buffer + checkBuffers(c, m.buffers, []string{"svc1", "svc2", "svc4"}) +} + +func checkGatherers(c *C, gatherers map[string]*logGatherer, expected map[string][]string) { + c.Assert(gatherers, HasLen, len(expected)) + for tgtName, svcs := range expected { + g, ok := gatherers[tgtName] + c.Assert(ok, Equals, true) + + c.Assert(g.pullers.Len(), Equals, len(svcs)) + for _, svc := range svcs { + c.Check(g.pullers.Contains(svc), Equals, true) + } + } +} + +func checkBuffers(c *C, buffers map[string]*servicelog.RingBuffer, expected []string) { + c.Assert(buffers, HasLen, len(expected)) + for _, svcName := range expected { + _, ok := buffers[svcName] + c.Check(ok, Equals, true) + } +} + +func (s *managerSuite) TestTimelyShutdown(c *C) { + gathererArgs := logGathererArgs{ + timeoutFinalFlush: 5 * time.Millisecond, + newClient: func(target *plan.LogTarget) (logClient, error) { + return &slowFlushingClient{ + flushTime: 10 * time.Second, + }, nil + }, + } + + m := NewLogManager() + m.newGatherer = func(t *plan.LogTarget) (*logGatherer, error) { + return newLogGathererInternal(t, gathererArgs) + } + + svc1 := newTestService("svc1") + + // Start 10 log gatherers + logTargets := make(map[string]*plan.LogTarget, 10) + for i := 0; i < 10; i++ { + targetName := fmt.Sprintf("tgt%d", i) + logTargets[targetName] = &plan.LogTarget{ + Name: targetName, + Services: []string{"all"}, + } + } + m.PlanChanged(&plan.Plan{ + Services: map[string]*plan.Service{ + "svc1": svc1.config, + }, + LogTargets: logTargets, + }) + m.ServiceStarted(svc1.config, svc1.ringBuffer) + + c.Assert(m.gatherers, HasLen, 10) + + err := svc1.stop() + c.Assert(err, IsNil) + + // Stop all gatherers and check this happens quickly + done := make(chan struct{}) + go func() { + m.Stop() + close(done) + }() + select { + case <-done: + case <-time.After(50 * time.Millisecond): + c.Fatal("LogManager.Stop() took too long") + } +} + +type slowFlushingClient struct { + flushTime time.Duration +} + +func (c *slowFlushingClient) Write(_ context.Context, _ servicelog.Entry) error { + // no-op + return nil +} + +func (c *slowFlushingClient) Flush(ctx context.Context) error { + select { + case <-ctx.Done(): + return fmt.Errorf("flush timed out") + case <-time.After(c.flushTime): + return nil + } +} diff --git a/internals/overlord/logstate/package_test.go b/internals/overlord/logstate/package_test.go new file mode 100644 index 00000000..e7fe8c86 --- /dev/null +++ b/internals/overlord/logstate/package_test.go @@ -0,0 +1,23 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package logstate + +import ( + "testing" + + . "gopkg.in/check.v1" +) + +func Test(t *testing.T) { TestingT(t) } diff --git a/internals/overlord/logstate/puller.go b/internals/overlord/logstate/puller.go new file mode 100644 index 00000000..6169a696 --- /dev/null +++ b/internals/overlord/logstate/puller.go @@ -0,0 +1,153 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package logstate + +import ( + "context" + "sync" + + "github.com/canonical/pebble/internals/plan" + "github.com/canonical/pebble/internals/servicelog" +) + +// logPuller handles pulling logs from a single iterator and sending to the +// main control loop. +type logPuller struct { + iterator servicelog.Iterator + entryCh chan servicelog.Entry + + ctx context.Context + kill context.CancelFunc +} + +func (p *logPuller) loop() { + defer func() { _ = p.iterator.Close() }() + + parser := servicelog.NewParser(p.iterator, parserSize) + for p.iterator.Next(p.ctx.Done()) { + for parser.Next() { + if err := parser.Err(); err != nil { + return + } + select { + case p.entryCh <- parser.Entry(): + case <-p.ctx.Done(): + return + } + } + } +} + +// pullerGroup represents a group of logPullers, and provides methods for a +// gatherer to manage logPullers (dynamically add/remove, kill all, wait for +// all to finish). +type pullerGroup struct { + targetName string + + // Currently active logPullers, indexed by service name + pullers map[string]*logPuller + // Mutex for pullers map + mu sync.RWMutex + // Common context for all pullers. Each puller uses a derived context so we + // can easily kill all pullers (if required) during teardown. + ctx context.Context + // Cancel func for pullersCtx + kill context.CancelFunc + // WaitGroup for pullers - we use this during teardown to know when all the + // pullers are finished. + wg sync.WaitGroup +} + +func newPullerGroup(targetName string) *pullerGroup { + pg := &pullerGroup{ + targetName: targetName, + pullers: map[string]*logPuller{}, + } + pg.ctx, pg.kill = context.WithCancel(context.Background()) + return pg +} + +func (pg *pullerGroup) Add(serviceName string, buffer *servicelog.RingBuffer, entryCh chan servicelog.Entry) { + lp := &logPuller{ + iterator: buffer.TailIterator(), + entryCh: entryCh, + } + lp.ctx, lp.kill = context.WithCancel(pg.ctx) + + pg.wg.Add(1) // this will be marked as done once loop finishes + go func() { + lp.loop() + pg.wg.Done() + }() + + pg.mu.Lock() + defer pg.mu.Unlock() + if puller, ok := pg.pullers[serviceName]; ok { + // This should never happen, but just in case, shut down the old puller. + puller.kill() + } + pg.pullers[serviceName] = lp +} + +func (pg *pullerGroup) RemoveOld(pl *plan.Plan) { + pg.mu.Lock() + defer pg.mu.Unlock() + + for svcName := range pg.pullers { + svc, svcExists := pl.Services[svcName] + if !svcExists { + pg.remove(svcName) + continue + } + + tgt := pl.LogTargets[pg.targetName] + if !svc.LogsTo(tgt) { + pg.remove(svcName) + } + } +} + +// not thread safe, lock mu before calling +func (pg *pullerGroup) remove(serviceName string) { + pg.pullers[serviceName].kill() + delete(pg.pullers, serviceName) +} + +func (pg *pullerGroup) KillAll() { + pg.kill() +} + +// Done returns a channel which can be waited on until all pullers have finished. +func (pg *pullerGroup) Done() chan struct{} { + done := make(chan struct{}) + go func() { + pg.wg.Wait() + close(done) + }() + return done +} + +func (pg *pullerGroup) Contains(serviceName string) bool { + pg.mu.RLock() + defer pg.mu.RUnlock() + _, ok := pg.pullers[serviceName] + return ok +} + +func (pg *pullerGroup) Len() int { + pg.mu.RLock() + defer pg.mu.RUnlock() + return len(pg.pullers) +} diff --git a/internals/overlord/servstate/handlers.go b/internals/overlord/servstate/handlers.go index 32348a25..3d3a18b2 100644 --- a/internals/overlord/servstate/handlers.go +++ b/internals/overlord/servstate/handlers.go @@ -444,7 +444,7 @@ func (s *serviceData) startInternal() error { } // Pass buffer reference to logMgr to start log forwarding - s.manager.logMgr.ServiceStarted(serviceName, s.logs) + s.manager.logMgr.ServiceStarted(s.config, s.logs) return nil } diff --git a/internals/overlord/servstate/manager.go b/internals/overlord/servstate/manager.go index f8edb2e0..bbd87173 100644 --- a/internals/overlord/servstate/manager.go +++ b/internals/overlord/servstate/manager.go @@ -39,7 +39,7 @@ type ServiceManager struct { } type LogManager interface { - ServiceStarted(serviceName string, logs *servicelog.RingBuffer) + ServiceStarted(service *plan.Service, logs *servicelog.RingBuffer) } // PlanFunc is the type of function used by NotifyPlanChanged. diff --git a/internals/overlord/servstate/manager_test.go b/internals/overlord/servstate/manager_test.go index 48a360c0..e951556b 100644 --- a/internals/overlord/servstate/manager_test.go +++ b/internals/overlord/servstate/manager_test.go @@ -1551,6 +1551,12 @@ func (s *S) TestStopRunningNoServices(c *C) { c.Assert(taskSet, IsNil) } +type fakeLogManager struct{} + +func (f fakeLogManager) ServiceStarted(service *plan.Service, logs *servicelog.RingBuffer) { + // no-op +} + func (s *S) TestNoWorkingDir(c *C) { dir := c.MkDir() err := os.Mkdir(filepath.Join(dir, "layers"), 0755) diff --git a/internals/overlord/stateengine.go b/internals/overlord/stateengine.go index 25710333..745e1593 100644 --- a/internals/overlord/stateengine.go +++ b/internals/overlord/stateengine.go @@ -136,10 +136,17 @@ func (se *StateEngine) Stop() { if se.stopped { return } + + var wg sync.WaitGroup for _, m := range se.managers { if stopper, ok := m.(StateStopper); ok { - stopper.Stop() + wg.Add(1) + go func() { + stopper.Stop() + wg.Done() + }() } } + wg.Wait() se.stopped = true } diff --git a/internals/overlord/stateengine_test.go b/internals/overlord/stateengine_test.go index f12ed98b..cc8a1749 100644 --- a/internals/overlord/stateengine_test.go +++ b/internals/overlord/stateengine_test.go @@ -36,21 +36,21 @@ func (ses *stateEngineSuite) TestNewAndState(c *C) { type fakeManager struct { name string - calls *[]string + calls chan<- string ensureError, stopError error } func (fm *fakeManager) Ensure() error { - *fm.calls = append(*fm.calls, "ensure:"+fm.name) + fm.calls <- "ensure:" + fm.name return fm.ensureError } func (fm *fakeManager) Stop() { - *fm.calls = append(*fm.calls, "stop:"+fm.name) + fm.calls <- "stop:" + fm.name } func (fm *fakeManager) Wait() { - *fm.calls = append(*fm.calls, "wait:"+fm.name) + fm.calls <- "wait:" + fm.name } var _ overlord.StateManager = (*fakeManager)(nil) @@ -59,60 +59,87 @@ func (ses *stateEngineSuite) TestEnsure(c *C) { s := state.New(nil) se := overlord.NewStateEngine(s) - calls := []string{} + calls := make(chan string, 4) - mgr1 := &fakeManager{name: "mgr1", calls: &calls} - mgr2 := &fakeManager{name: "mgr2", calls: &calls} + mgr1 := &fakeManager{name: "mgr1", calls: calls} + mgr2 := &fakeManager{name: "mgr2", calls: calls} se.AddManager(mgr1) se.AddManager(mgr2) err := se.Ensure() c.Assert(err, IsNil) - c.Check(calls, DeepEquals, []string{"ensure:mgr1", "ensure:mgr2"}) + checkCalls(c, calls, "ensure:mgr1", "ensure:mgr2") err = se.Ensure() c.Assert(err, IsNil) - c.Check(calls, DeepEquals, []string{"ensure:mgr1", "ensure:mgr2", "ensure:mgr1", "ensure:mgr2"}) + checkCalls(c, calls, "ensure:mgr1", "ensure:mgr2") } func (ses *stateEngineSuite) TestEnsureError(c *C) { s := state.New(nil) se := overlord.NewStateEngine(s) - calls := []string{} + calls := make(chan string, 2) err1 := errors.New("boom1") err2 := errors.New("boom2") - mgr1 := &fakeManager{name: "mgr1", calls: &calls, ensureError: err1} - mgr2 := &fakeManager{name: "mgr2", calls: &calls, ensureError: err2} + mgr1 := &fakeManager{name: "mgr1", calls: calls, ensureError: err1} + mgr2 := &fakeManager{name: "mgr2", calls: calls, ensureError: err2} se.AddManager(mgr1) se.AddManager(mgr2) err := se.Ensure() c.Check(err.Error(), DeepEquals, "state ensure errors: [boom1 boom2]") - c.Check(calls, DeepEquals, []string{"ensure:mgr1", "ensure:mgr2"}) + checkCalls(c, calls, "ensure:mgr1", "ensure:mgr2") } func (ses *stateEngineSuite) TestStop(c *C) { s := state.New(nil) se := overlord.NewStateEngine(s) - calls := []string{} + calls := make(chan string, 2) - mgr1 := &fakeManager{name: "mgr1", calls: &calls} - mgr2 := &fakeManager{name: "mgr2", calls: &calls} + mgr1 := &fakeManager{name: "mgr1", calls: calls} + mgr2 := &fakeManager{name: "mgr2", calls: calls} se.AddManager(mgr1) se.AddManager(mgr2) se.Stop() - c.Check(calls, DeepEquals, []string{"stop:mgr1", "stop:mgr2"}) + checkCalls(c, calls, "stop:mgr1", "stop:mgr2") se.Stop() - c.Check(calls, HasLen, 2) + c.Check(len(calls), Equals, 0) err := se.Ensure() c.Check(err, ErrorMatches, "state engine already stopped") } + +func checkCalls(c *C, calls <-chan string, expected ...string) { + // Initialise multiset containing calls + expectedCalls := map[string]int{} + for _, expCall := range expected { + expectedCalls[expCall]++ + } + +loop: + for { + select { + case call := <-calls: + expectedCalls[call]-- + if expectedCalls[call] < 0 { + c.Errorf("extra call: %q", call) + } + default: + break loop + } + } + + for call, n := range expectedCalls { + if n > 0 { + c.Errorf("missing %d calls: %q", n, call) + } + } +} diff --git a/internals/servicelog/iterator.go b/internals/servicelog/iterator.go index eb2926b2..29ecd1c3 100644 --- a/internals/servicelog/iterator.go +++ b/internals/servicelog/iterator.go @@ -20,27 +20,29 @@ import ( ) type Iterator interface { - // Close closes the iterator so that buffers can be used for future writes. - // If Close is not called, the iterator will block buffer recycling causing - // write failures. + // Close removes this iterator from the ring buffer. After calling Close, + // any future calls to Next will return false. Close() error - // Next moves the ring buffer read mark forward, making its tail available for reuse - // without truncation. If the ring buffer writer produces data faster than the iterator - // can read it, the iterator will eventually be truncated and restarted. The truncation - // will be identified in the iterator output with the text specified when the iterator was - // created. - // Next returns true if there is more data to read from the RingBuffer. - // If a non-nil cancel channel is passed in, Next will wait for more data to - // become available. Sending on this channel, or closing it, will cause Next to - // return immediately. + + // Next returns true if there is more data to read. If a non-nil cancel + // channel is passed in, Next will wait for more data to become available. + // Sending on this channel, or closing it, will cause Next to return + // immediately. + // If the ring buffer writer produces data faster than the iterator can read + // it, the iterator will eventually be truncated and restarted. The + // truncation will be identified in the iterator output with the text + // specified when the iterator was created. Next(cancel <-chan struct{}) bool - // Notify sets the notification channel. When more data is available, the channel - // passed in to Notify will have true sent on it. If the channel is not receiving (unbuffered) - // or full (buffered), the notification will be dropped. + // Notify sets the notification channel. When more data is available, the + // channel passed in to Notify will have true sent on it. If the channel is + // not receiving (unbuffered) or full (buffered), the notification will be + // dropped. Notify(ch chan bool) + // Buffered returns the approximate number of bytes available to read. Buffered() int + io.Reader io.WriterTo } From 760e7a488d441a2e86c4babe8c8bfbda93c62c05 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 2 Aug 2023 10:34:49 +0800 Subject: [PATCH 02/31] remove extra fakeLogManager decl --- internals/overlord/servstate/manager_test.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/internals/overlord/servstate/manager_test.go b/internals/overlord/servstate/manager_test.go index e951556b..af9befd6 100644 --- a/internals/overlord/servstate/manager_test.go +++ b/internals/overlord/servstate/manager_test.go @@ -1551,12 +1551,6 @@ func (s *S) TestStopRunningNoServices(c *C) { c.Assert(taskSet, IsNil) } -type fakeLogManager struct{} - -func (f fakeLogManager) ServiceStarted(service *plan.Service, logs *servicelog.RingBuffer) { - // no-op -} - func (s *S) TestNoWorkingDir(c *C) { dir := c.MkDir() err := os.Mkdir(filepath.Join(dir, "layers"), 0755) @@ -1873,7 +1867,7 @@ func createZombie() error { type fakeLogManager struct{} -func (f fakeLogManager) ServiceStarted(serviceName string, logs *servicelog.RingBuffer) { +func (f fakeLogManager) ServiceStarted(service *plan.Service, logs *servicelog.RingBuffer) { // no-op } From 6e03c0c02b710d992f0a9869239153f1c5332ccb Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 2 Aug 2023 12:30:18 +0800 Subject: [PATCH 03/31] surface client errors Errors from the gatherer tomb are not being surfaced to the user, as there is nothing checking tomb.Err(). Additionally, a write/flush failure should not bring down the gatherer - it could be e.g. a temporary outage. Better to log the failures rather than exiting the main loop. --- internals/overlord/logstate/gatherer.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index e727bceb..89af5bf8 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -19,6 +19,7 @@ import ( "fmt" "time" + "github.com/canonical/pebble/internals/logger" "gopkg.in/tomb.v2" "github.com/canonical/pebble/internals/plan" @@ -167,13 +168,13 @@ mainLoop: // Timeout - flush err := g.client.Flush(g.clientCtx) if err != nil { - return fmt.Errorf("sending logs to target %q: %v", g.targetName, err) + logger.Noticef("sending logs to target %q: %v", g.targetName, err) } case entry := <-g.entryCh: err := g.client.Write(g.clientCtx, entry) if err != nil { - return fmt.Errorf("writing logs to target %q: %v", g.targetName, err) + logger.Noticef("writing logs to target %q: %v", g.targetName, err) } } } @@ -183,7 +184,7 @@ mainLoop: defer cancel() err := g.client.Flush(ctx) if err != nil { - return fmt.Errorf("sending logs to target %q: %v", g.targetName, err) + logger.Noticef("sending logs to target %q: %v", g.targetName, err) } return nil } From 5ca3752fca23587bc7dcd47f0cdf0b633d2d5139 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 2 Aug 2023 12:45:35 +0800 Subject: [PATCH 04/31] fix imports in gatherer.go --- internals/overlord/logstate/gatherer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index 89af5bf8..aad99fc0 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -19,9 +19,9 @@ import ( "fmt" "time" - "github.com/canonical/pebble/internals/logger" "gopkg.in/tomb.v2" + "github.com/canonical/pebble/internals/logger" "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" ) From e91f75625196ade776f763ddb9ccbbf95ba5ee21 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Thu, 3 Aug 2023 11:57:00 +0800 Subject: [PATCH 05/31] fix a small typo in comment --- internals/overlord/logstate/puller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internals/overlord/logstate/puller.go b/internals/overlord/logstate/puller.go index 6169a696..d8242e8b 100644 --- a/internals/overlord/logstate/puller.go +++ b/internals/overlord/logstate/puller.go @@ -63,7 +63,7 @@ type pullerGroup struct { // Common context for all pullers. Each puller uses a derived context so we // can easily kill all pullers (if required) during teardown. ctx context.Context - // Cancel func for pullersCtx + // Cancel func for ctx kill context.CancelFunc // WaitGroup for pullers - we use this during teardown to know when all the // pullers are finished. From 2f1d96756fef20028cd9bf429d4ea77116b95f17 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 8 Aug 2023 10:04:42 +0700 Subject: [PATCH 06/31] Move plan logic out of pullerGroup --- internals/overlord/logstate/gatherer.go | 13 ++++++++- internals/overlord/logstate/puller.go | 36 ++++++++++++------------- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index aad99fc0..fb2aea64 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -125,7 +125,18 @@ func fillDefaultArgs(args logGathererArgs) logGathererArgs { // planChanged is called by the LogManager when the plan is changed. func (g *logGatherer) planChanged(pl *plan.Plan, buffers map[string]*servicelog.RingBuffer) { // Remove old pullers - g.pullers.RemoveOld(pl) + for _, svcName := range g.pullers.List() { + svc, svcExists := pl.Services[svcName] + if !svcExists { + g.pullers.Remove(svcName) + continue + } + + tgt := pl.LogTargets[g.targetName] + if !svc.LogsTo(tgt) { + g.pullers.Remove(svcName) + } + } // Add new pullers for _, service := range pl.Services { diff --git a/internals/overlord/logstate/puller.go b/internals/overlord/logstate/puller.go index d8242e8b..5215c091 100644 --- a/internals/overlord/logstate/puller.go +++ b/internals/overlord/logstate/puller.go @@ -18,7 +18,6 @@ import ( "context" "sync" - "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" ) @@ -90,6 +89,7 @@ func (pg *pullerGroup) Add(serviceName string, buffer *servicelog.RingBuffer, en go func() { lp.loop() pg.wg.Done() + // TODO: remove puller from map ? }() pg.mu.Lock() @@ -101,28 +101,28 @@ func (pg *pullerGroup) Add(serviceName string, buffer *servicelog.RingBuffer, en pg.pullers[serviceName] = lp } -func (pg *pullerGroup) RemoveOld(pl *plan.Plan) { +// List returns a list of all service names for which we have a currently +// active puller. +func (pg *pullerGroup) List() []string { + pg.mu.RLock() + defer pg.mu.RUnlock() + + var svcs []string + for svc := range pg.pullers { + svcs = append(svcs, svc) + } + return svcs +} + +func (pg *pullerGroup) Remove(serviceName string) { pg.mu.Lock() defer pg.mu.Unlock() - for svcName := range pg.pullers { - svc, svcExists := pl.Services[svcName] - if !svcExists { - pg.remove(svcName) - continue - } - - tgt := pl.LogTargets[pg.targetName] - if !svc.LogsTo(tgt) { - pg.remove(svcName) - } + if puller, ok := pg.pullers[serviceName]; ok { + puller.kill() + delete(pg.pullers, serviceName) } -} -// not thread safe, lock mu before calling -func (pg *pullerGroup) remove(serviceName string) { - pg.pullers[serviceName].kill() - delete(pg.pullers, serviceName) } func (pg *pullerGroup) KillAll() { From ab40a6c4199972ed4d9a66d41d7c17f40c8606d4 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 8 Aug 2023 10:09:00 +0700 Subject: [PATCH 07/31] Add extra ctx.Done check in logPuller.loop --- internals/overlord/logstate/puller.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/internals/overlord/logstate/puller.go b/internals/overlord/logstate/puller.go index 5215c091..3e44dec8 100644 --- a/internals/overlord/logstate/puller.go +++ b/internals/overlord/logstate/puller.go @@ -40,6 +40,14 @@ func (p *logPuller) loop() { if err := parser.Err(); err != nil { return } + + // Check if our context has been cancelled + select { + case <-p.ctx.Done(): + return + default: + } + select { case p.entryCh <- parser.Entry(): case <-p.ctx.Done(): From 7e4d212838f1cd329eb0a405cdf4f9fb79c2430e Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 8 Aug 2023 10:14:22 +0700 Subject: [PATCH 08/31] Uppercase logGatherer methods (PlanChanged, ServiceStarted, Stop) which are intended for use by other types --- internals/overlord/logstate/gatherer.go | 22 ++++++++++---------- internals/overlord/logstate/gatherer_test.go | 8 +++---- internals/overlord/logstate/manager.go | 10 ++++----- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index fb2aea64..f145f907 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -35,7 +35,7 @@ const ( timeoutPullers = 2 * time.Second timeoutMainLoop = 3 * time.Second // timeoutFinalFlush is measured from when the gatherer's main loop finishes, - // NOT from when stop() is called like the other constants. + // NOT from when Stop() is called like the other constants. timeoutFinalFlush = 2 * time.Second ) @@ -52,8 +52,8 @@ const ( // // The client may also flush itself when its internal buffer reaches a certain // size. -// Calling the stop() method will tear down the logGatherer and all of its -// associated logPullers. stop() can be called from an outside goroutine. +// Calling the Stop() method will tear down the logGatherer and all of its +// associated logPullers. Stop() can be called from an outside goroutine. type logGatherer struct { logGathererArgs @@ -122,8 +122,8 @@ func fillDefaultArgs(args logGathererArgs) logGathererArgs { return args } -// planChanged is called by the LogManager when the plan is changed. -func (g *logGatherer) planChanged(pl *plan.Plan, buffers map[string]*servicelog.RingBuffer) { +// PlanChanged is called by the LogManager when the plan is changed. +func (g *logGatherer) PlanChanged(pl *plan.Plan, buffers map[string]*servicelog.RingBuffer) { // Remove old pullers for _, svcName := range g.pullers.List() { svc, svcExists := pl.Services[svcName] @@ -148,7 +148,7 @@ func (g *logGatherer) planChanged(pl *plan.Plan, buffers map[string]*servicelog. buffer, bufferExists := buffers[service.Name] if !bufferExists { // We don't yet have a reference to the service's ring buffer - // Need to wait until serviceStarted + // Need to wait until ServiceStarted continue } @@ -156,9 +156,9 @@ func (g *logGatherer) planChanged(pl *plan.Plan, buffers map[string]*servicelog. } } -// serviceStarted is called by the LogManager on the start of a service which +// ServiceStarted is called by the LogManager on the start of a service which // logs to this gatherer's target. -func (g *logGatherer) serviceStarted(service *plan.Service, buffer *servicelog.RingBuffer) { +func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.RingBuffer) { g.pullers.Add(service.Name, buffer, g.entryCh) } @@ -200,17 +200,17 @@ mainLoop: return nil } -// stop tears down the gatherer and associated resources (pullers, client). +// Stop tears down the gatherer and associated resources (pullers, client). // This method will block until gatherer teardown is complete. // // The teardown process has several steps: -// - If the main loop is in the middle of a flush when we call stop, this +// - If the main loop is in the middle of a flush when we call Stop, this // will block the pullers from sending logs to the gatherer. Hence, wait // for the current flush to complete. // - Wait for the pullers to pull the final logs from the iterator. // - Kill the main loop. // - Flush out any final logs buffered in the client. -func (g *logGatherer) stop() { +func (g *logGatherer) Stop() { // Wait up to timeoutCurrentFlush for the current flush to complete (if any) time.AfterFunc(timeoutCurrentFlush, g.clientCancel) diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go index 3ec4ca20..2bc105cc 100644 --- a/internals/overlord/logstate/gatherer_test.go +++ b/internals/overlord/logstate/gatherer_test.go @@ -45,7 +45,7 @@ func (s *gathererSuite) TestGatherer(c *C) { c.Assert(err, IsNil) testSvc := newTestService("svc1") - g.serviceStarted(testSvc.config, testSvc.ringBuffer) + g.ServiceStarted(testSvc.config, testSvc.ringBuffer) testSvc.writeLog("log line #1") testSvc.writeLog("log line #2") @@ -82,7 +82,7 @@ func (s *gathererSuite) TestGathererTimeout(c *C) { c.Assert(err, IsNil) testSvc := newTestService("svc1") - g.serviceStarted(testSvc.config, testSvc.ringBuffer) + g.ServiceStarted(testSvc.config, testSvc.ringBuffer) testSvc.writeLog("log line #1") select { @@ -109,7 +109,7 @@ func (s *gathererSuite) TestGathererShutdown(c *C) { c.Assert(err, IsNil) testSvc := newTestService("svc1") - g.serviceStarted(testSvc.config, testSvc.ringBuffer) + g.ServiceStarted(testSvc.config, testSvc.ringBuffer) testSvc.writeLog("log line #1") err = testSvc.stop() @@ -117,7 +117,7 @@ func (s *gathererSuite) TestGathererShutdown(c *C) { hasShutdown := make(chan struct{}) go func() { - g.stop() + g.Stop() close(hasShutdown) }() diff --git a/internals/overlord/logstate/manager.go b/internals/overlord/logstate/manager.go index 82a8a17b..46582899 100644 --- a/internals/overlord/logstate/manager.go +++ b/internals/overlord/logstate/manager.go @@ -40,7 +40,7 @@ func NewLogManager() *LogManager { } // PlanChanged is called by the service manager when the plan changes. -// Based on the new plan, we will stop old gatherers and start new ones. +// Based on the new plan, we will Stop old gatherers and start new ones. func (m *LogManager) PlanChanged(pl *plan.Plan) { m.mu.Lock() defer m.mu.Unlock() @@ -68,12 +68,12 @@ func (m *LogManager) PlanChanged(pl *plan.Plan) { } // Update iterators for gatherer - gatherer.planChanged(pl, m.buffers) + gatherer.PlanChanged(pl, m.buffers) } // Old gatherers for now-removed targets need to be shut down. for _, gatherer := range m.gatherers { - go gatherer.stop() + go gatherer.Stop() } m.gatherers = newGatherers @@ -105,7 +105,7 @@ func (m *LogManager) ServiceStarted(service *plan.Service, buffer *servicelog.Ri if !service.LogsTo(target) { continue } - gatherer.serviceStarted(service, buffer) + gatherer.ServiceStarted(service, buffer) } } @@ -123,7 +123,7 @@ func (m *LogManager) Stop() { for _, gatherer := range m.gatherers { wg.Add(1) go func(gatherer *logGatherer) { - gatherer.stop() + gatherer.Stop() wg.Done() }(gatherer) } From b03882bb7b5b531c71d238f3cd759efc6c5d8e62 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 8 Aug 2023 10:16:49 +0700 Subject: [PATCH 09/31] fix error/log messages --- internals/overlord/logstate/gatherer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index f145f907..9e5cdd44 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -93,7 +93,7 @@ func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logG args = fillDefaultArgs(args) client, err := args.newClient(target) if err != nil { - return nil, fmt.Errorf("could not create log client: %w", err) + return nil, fmt.Errorf("cannot create log client: %w", err) } g := &logGatherer{ @@ -179,13 +179,13 @@ mainLoop: // Timeout - flush err := g.client.Flush(g.clientCtx) if err != nil { - logger.Noticef("sending logs to target %q: %v", g.targetName, err) + logger.Noticef("Error sending logs to target %q: %v", g.targetName, err) } case entry := <-g.entryCh: err := g.client.Write(g.clientCtx, entry) if err != nil { - logger.Noticef("writing logs to target %q: %v", g.targetName, err) + logger.Noticef("Error writing logs to target %q: %v", g.targetName, err) } } } @@ -195,7 +195,7 @@ mainLoop: defer cancel() err := g.client.Flush(ctx) if err != nil { - logger.Noticef("sending logs to target %q: %v", g.targetName, err) + logger.Noticef("Error sending logs to target %q: %v", g.targetName, err) } return nil } From 783c3ca9285615d08dcc9ed4983c63e5fdc2ef56 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 8 Aug 2023 10:47:59 +0700 Subject: [PATCH 10/31] Revert changes to state engine --- internals/overlord/stateengine.go | 9 +--- internals/overlord/stateengine_test.go | 63 ++++++++------------------ 2 files changed, 19 insertions(+), 53 deletions(-) diff --git a/internals/overlord/stateengine.go b/internals/overlord/stateengine.go index 745e1593..25710333 100644 --- a/internals/overlord/stateengine.go +++ b/internals/overlord/stateengine.go @@ -136,17 +136,10 @@ func (se *StateEngine) Stop() { if se.stopped { return } - - var wg sync.WaitGroup for _, m := range se.managers { if stopper, ok := m.(StateStopper); ok { - wg.Add(1) - go func() { - stopper.Stop() - wg.Done() - }() + stopper.Stop() } } - wg.Wait() se.stopped = true } diff --git a/internals/overlord/stateengine_test.go b/internals/overlord/stateengine_test.go index cc8a1749..f12ed98b 100644 --- a/internals/overlord/stateengine_test.go +++ b/internals/overlord/stateengine_test.go @@ -36,21 +36,21 @@ func (ses *stateEngineSuite) TestNewAndState(c *C) { type fakeManager struct { name string - calls chan<- string + calls *[]string ensureError, stopError error } func (fm *fakeManager) Ensure() error { - fm.calls <- "ensure:" + fm.name + *fm.calls = append(*fm.calls, "ensure:"+fm.name) return fm.ensureError } func (fm *fakeManager) Stop() { - fm.calls <- "stop:" + fm.name + *fm.calls = append(*fm.calls, "stop:"+fm.name) } func (fm *fakeManager) Wait() { - fm.calls <- "wait:" + fm.name + *fm.calls = append(*fm.calls, "wait:"+fm.name) } var _ overlord.StateManager = (*fakeManager)(nil) @@ -59,87 +59,60 @@ func (ses *stateEngineSuite) TestEnsure(c *C) { s := state.New(nil) se := overlord.NewStateEngine(s) - calls := make(chan string, 4) + calls := []string{} - mgr1 := &fakeManager{name: "mgr1", calls: calls} - mgr2 := &fakeManager{name: "mgr2", calls: calls} + mgr1 := &fakeManager{name: "mgr1", calls: &calls} + mgr2 := &fakeManager{name: "mgr2", calls: &calls} se.AddManager(mgr1) se.AddManager(mgr2) err := se.Ensure() c.Assert(err, IsNil) - checkCalls(c, calls, "ensure:mgr1", "ensure:mgr2") + c.Check(calls, DeepEquals, []string{"ensure:mgr1", "ensure:mgr2"}) err = se.Ensure() c.Assert(err, IsNil) - checkCalls(c, calls, "ensure:mgr1", "ensure:mgr2") + c.Check(calls, DeepEquals, []string{"ensure:mgr1", "ensure:mgr2", "ensure:mgr1", "ensure:mgr2"}) } func (ses *stateEngineSuite) TestEnsureError(c *C) { s := state.New(nil) se := overlord.NewStateEngine(s) - calls := make(chan string, 2) + calls := []string{} err1 := errors.New("boom1") err2 := errors.New("boom2") - mgr1 := &fakeManager{name: "mgr1", calls: calls, ensureError: err1} - mgr2 := &fakeManager{name: "mgr2", calls: calls, ensureError: err2} + mgr1 := &fakeManager{name: "mgr1", calls: &calls, ensureError: err1} + mgr2 := &fakeManager{name: "mgr2", calls: &calls, ensureError: err2} se.AddManager(mgr1) se.AddManager(mgr2) err := se.Ensure() c.Check(err.Error(), DeepEquals, "state ensure errors: [boom1 boom2]") - checkCalls(c, calls, "ensure:mgr1", "ensure:mgr2") + c.Check(calls, DeepEquals, []string{"ensure:mgr1", "ensure:mgr2"}) } func (ses *stateEngineSuite) TestStop(c *C) { s := state.New(nil) se := overlord.NewStateEngine(s) - calls := make(chan string, 2) + calls := []string{} - mgr1 := &fakeManager{name: "mgr1", calls: calls} - mgr2 := &fakeManager{name: "mgr2", calls: calls} + mgr1 := &fakeManager{name: "mgr1", calls: &calls} + mgr2 := &fakeManager{name: "mgr2", calls: &calls} se.AddManager(mgr1) se.AddManager(mgr2) se.Stop() - checkCalls(c, calls, "stop:mgr1", "stop:mgr2") + c.Check(calls, DeepEquals, []string{"stop:mgr1", "stop:mgr2"}) se.Stop() - c.Check(len(calls), Equals, 0) + c.Check(calls, HasLen, 2) err := se.Ensure() c.Check(err, ErrorMatches, "state engine already stopped") } - -func checkCalls(c *C, calls <-chan string, expected ...string) { - // Initialise multiset containing calls - expectedCalls := map[string]int{} - for _, expCall := range expected { - expectedCalls[expCall]++ - } - -loop: - for { - select { - case call := <-calls: - expectedCalls[call]-- - if expectedCalls[call] < 0 { - c.Errorf("extra call: %q", call) - } - default: - break loop - } - } - - for call, n := range expectedCalls { - if n > 0 { - c.Errorf("missing %d calls: %q", n, call) - } - } -} From baabeb24d6180ee9bccb99519441dbd8e134f098 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 8 Aug 2023 10:59:17 +0700 Subject: [PATCH 11/31] Stop log manager after service manager - close ringbuffer on service stop - add some more debug logging --- internals/overlord/logstate/gatherer.go | 7 ++++++- internals/overlord/overlord.go | 4 +++- internals/overlord/servstate/handlers.go | 3 +++ internals/overlord/stateengine.go | 1 + 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index 9e5cdd44..4fded3e5 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -216,14 +216,19 @@ func (g *logGatherer) Stop() { // Wait up to timeoutPullers for the pullers to pull the final logs from the // iterator and send to the main loop. - time.AfterFunc(timeoutPullers, g.pullers.KillAll) + time.AfterFunc(timeoutPullers, func() { + logger.Debugf("gatherer %q: force killing log pullers", g.targetName) + g.pullers.KillAll() + }) // Kill the main loop once either: // - all the pullers are done // - timeoutMainLoop has passed select { case <-g.pullers.Done(): + logger.Debugf("gatherer %q: pullers have finished", g.targetName) case <-time.After(timeoutMainLoop): + logger.Debugf("gatherer %q: force killing main loop", g.targetName) } _ = g.tomb.Killf("gatherer stopped") diff --git a/internals/overlord/overlord.go b/internals/overlord/overlord.go index 771a670d..1ccd56c8 100644 --- a/internals/overlord/overlord.go +++ b/internals/overlord/overlord.go @@ -108,13 +108,15 @@ func New(pebbleDir string, restartHandler restart.Handler, serviceOutput io.Writ o.runner.AddOptionalHandler(matchAnyUnknownTask, handleUnknownTask, nil) o.logMgr = logstate.NewLogManager() - o.addManager(o.logMgr) o.serviceMgr, err = servstate.NewManager(s, o.runner, o.pebbleDir, serviceOutput, restartHandler, o.logMgr) if err != nil { return nil, err } o.addManager(o.serviceMgr) + // The log manager should be stopped after the service manager, so we can + // collect any final logs from the service. + o.addManager(o.logMgr) o.commandMgr = cmdstate.NewManager(o.runner) o.addManager(o.commandMgr) diff --git a/internals/overlord/servstate/handlers.go b/internals/overlord/servstate/handlers.go index 3d3a18b2..cdb1285d 100644 --- a/internals/overlord/servstate/handlers.go +++ b/internals/overlord/servstate/handlers.go @@ -223,6 +223,9 @@ func (m *ServiceManager) doStop(task *state.Task, tomb *tomb.Tomb) error { if service == nil { return nil } + // Close the ringbuffer to signal to consumers (e.g. log forwarding) that + // there are no more logs coming. + defer service.logs.Close() // Stop service: send SIGTERM, and if that doesn't stop the process in a // short time, send SIGKILL. diff --git a/internals/overlord/stateengine.go b/internals/overlord/stateengine.go index 25710333..18908ffc 100644 --- a/internals/overlord/stateengine.go +++ b/internals/overlord/stateengine.go @@ -138,6 +138,7 @@ func (se *StateEngine) Stop() { } for _, m := range se.managers { if stopper, ok := m.(StateStopper); ok { + logger.Debugf("state engine: stopping %T", m) stopper.Stop() } } From 04a3354ad4a702a9cbbf375de1d418f434994410 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 8 Aug 2023 11:08:03 +0700 Subject: [PATCH 12/31] kill gatherer tomb with nil - catch and log error from tomb.Wait() --- internals/overlord/logstate/gatherer.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index 4fded3e5..7128d095 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -231,9 +231,12 @@ func (g *logGatherer) Stop() { logger.Debugf("gatherer %q: force killing main loop", g.targetName) } - _ = g.tomb.Killf("gatherer stopped") + g.tomb.Kill(nil) // Wait for final flush in the main loop - _ = g.tomb.Wait() + err := g.tomb.Wait() + if err != nil { + logger.Noticef("Error shutting down gatherer: %v", err) + } } // logClient handles requests to a specific type of log target. It encodes From b35a8b0e12906f2d82735473e71940a8f222428b Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 8 Aug 2023 11:18:39 +0700 Subject: [PATCH 13/31] make some channels read/write only --- internals/overlord/logstate/puller.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internals/overlord/logstate/puller.go b/internals/overlord/logstate/puller.go index 3e44dec8..b195ba10 100644 --- a/internals/overlord/logstate/puller.go +++ b/internals/overlord/logstate/puller.go @@ -25,7 +25,7 @@ import ( // main control loop. type logPuller struct { iterator servicelog.Iterator - entryCh chan servicelog.Entry + entryCh chan<- servicelog.Entry ctx context.Context kill context.CancelFunc @@ -86,7 +86,7 @@ func newPullerGroup(targetName string) *pullerGroup { return pg } -func (pg *pullerGroup) Add(serviceName string, buffer *servicelog.RingBuffer, entryCh chan servicelog.Entry) { +func (pg *pullerGroup) Add(serviceName string, buffer *servicelog.RingBuffer, entryCh chan<- servicelog.Entry) { lp := &logPuller{ iterator: buffer.TailIterator(), entryCh: entryCh, @@ -138,7 +138,7 @@ func (pg *pullerGroup) KillAll() { } // Done returns a channel which can be waited on until all pullers have finished. -func (pg *pullerGroup) Done() chan struct{} { +func (pg *pullerGroup) Done() <-chan struct{} { done := make(chan struct{}) go func() { pg.wg.Wait() From b31e4bee75dfd34fd81701f20d8ba62e9f85d59e Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 8 Aug 2023 11:40:44 +0700 Subject: [PATCH 14/31] pullerGroup.Add: log warning if puller already exists - add comment about when puller.loop exits --- internals/overlord/logstate/puller.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/internals/overlord/logstate/puller.go b/internals/overlord/logstate/puller.go index b195ba10..96272200 100644 --- a/internals/overlord/logstate/puller.go +++ b/internals/overlord/logstate/puller.go @@ -18,6 +18,7 @@ import ( "context" "sync" + "github.com/canonical/pebble/internals/logger" "github.com/canonical/pebble/internals/servicelog" ) @@ -31,6 +32,11 @@ type logPuller struct { kill context.CancelFunc } +// loop pulls logs off the iterator and sends them on the entryCh. +// The loop will terminate: +// - if the puller's context is cancelled +// - once the ringbuffer is closed and the iterator finishes reading all +// remaining logs. func (p *logPuller) loop() { defer func() { _ = p.iterator.Close() }() @@ -104,6 +110,7 @@ func (pg *pullerGroup) Add(serviceName string, buffer *servicelog.RingBuffer, en defer pg.mu.Unlock() if puller, ok := pg.pullers[serviceName]; ok { // This should never happen, but just in case, shut down the old puller. + logger.Debugf("puller for service %q already exists, shutting down old puller", serviceName) puller.kill() } pg.pullers[serviceName] = lp From 01bd92aa16d94d02204498bef262ea8792e0cdc1 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 8 Aug 2023 11:59:30 +0700 Subject: [PATCH 15/31] rename logPuller.kill to cancel - extra comment for gatherer final flush ctx --- internals/overlord/logstate/gatherer.go | 1 + internals/overlord/logstate/puller.go | 10 +++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index 7128d095..b1b395be 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -191,6 +191,7 @@ mainLoop: } // Final flush to send any remaining logs buffered in the client + // We need to create a new context, as the previous one may have been cancelled. ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush) defer cancel() err := g.client.Flush(ctx) diff --git a/internals/overlord/logstate/puller.go b/internals/overlord/logstate/puller.go index 96272200..d42d211b 100644 --- a/internals/overlord/logstate/puller.go +++ b/internals/overlord/logstate/puller.go @@ -28,8 +28,8 @@ type logPuller struct { iterator servicelog.Iterator entryCh chan<- servicelog.Entry - ctx context.Context - kill context.CancelFunc + ctx context.Context + cancel context.CancelFunc } // loop pulls logs off the iterator and sends them on the entryCh. @@ -97,7 +97,7 @@ func (pg *pullerGroup) Add(serviceName string, buffer *servicelog.RingBuffer, en iterator: buffer.TailIterator(), entryCh: entryCh, } - lp.ctx, lp.kill = context.WithCancel(pg.ctx) + lp.ctx, lp.cancel = context.WithCancel(pg.ctx) pg.wg.Add(1) // this will be marked as done once loop finishes go func() { @@ -111,7 +111,7 @@ func (pg *pullerGroup) Add(serviceName string, buffer *servicelog.RingBuffer, en if puller, ok := pg.pullers[serviceName]; ok { // This should never happen, but just in case, shut down the old puller. logger.Debugf("puller for service %q already exists, shutting down old puller", serviceName) - puller.kill() + puller.cancel() } pg.pullers[serviceName] = lp } @@ -134,7 +134,7 @@ func (pg *pullerGroup) Remove(serviceName string) { defer pg.mu.Unlock() if puller, ok := pg.pullers[serviceName]; ok { - puller.kill() + puller.cancel() delete(pg.pullers, serviceName) } From ab46d3135e7e802a675de78ae1f3b007b562baf7 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 8 Aug 2023 12:01:11 +0700 Subject: [PATCH 16/31] flesh out logGatherer.PlanChanged doc comment --- internals/overlord/logstate/gatherer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index b1b395be..c6fd73cd 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -122,7 +122,8 @@ func fillDefaultArgs(args logGathererArgs) logGathererArgs { return args } -// PlanChanged is called by the LogManager when the plan is changed. +// PlanChanged is called by the LogManager when the plan is changed, if this +// gatherer's target exists in the new plan. func (g *logGatherer) PlanChanged(pl *plan.Plan, buffers map[string]*servicelog.RingBuffer) { // Remove old pullers for _, svcName := range g.pullers.List() { From 6bae051f3098ee40c7868c09b678cc423a5fa982 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 8 Aug 2023 13:10:06 +0700 Subject: [PATCH 17/31] use timer instead of ticker --- internals/overlord/logstate/gatherer.go | 47 ++++++++++++++++++-- internals/overlord/logstate/gatherer_test.go | 4 +- 2 files changed, 45 insertions(+), 6 deletions(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index c6fd73cd..49ce5ee4 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -167,8 +167,7 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R // pullers on entryCh, and writes them to the client. It also flushes the // client periodically, and exits when the gatherer's tomb is killed. func (g *logGatherer) loop() error { - ticker := time.NewTicker(g.tickPeriod) - defer ticker.Stop() + timer := newTimer() mainLoop: for { @@ -176,8 +175,9 @@ mainLoop: case <-g.tomb.Dying(): break mainLoop - case <-ticker.C: - // Timeout - flush + case <-timer.Finished(): + // Mark timer as unset + timer.Stop() err := g.client.Flush(g.clientCtx) if err != nil { logger.Noticef("Error sending logs to target %q: %v", g.targetName, err) @@ -188,6 +188,8 @@ mainLoop: if err != nil { logger.Noticef("Error writing logs to target %q: %v", g.targetName, err) } + // Set timer if not already set + timer.EnsureSet(g.tickPeriod) } } @@ -241,6 +243,43 @@ func (g *logGatherer) Stop() { } } +// timer wraps time.Timer and provides a better API. +type timer struct { + timer *time.Timer + set bool +} + +func newTimer() timer { + t := timer{ + timer: time.NewTimer(1 * time.Hour), + } + t.Stop() + return t +} + +func (t *timer) Finished() <-chan time.Time { + return t.timer.C +} + +func (t *timer) Stop() { + t.timer.Stop() + t.set = false + // Drain timer channel + select { + case <-t.timer.C: + default: + } +} + +func (t *timer) EnsureSet(timeout time.Duration) { + if t.set { + return + } + + t.timer.Reset(timeout) + t.set = true +} + // logClient handles requests to a specific type of log target. It encodes // log messages in the required format, and sends the messages using the // protocol required by that log target. diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go index 2bc105cc..11cf2ce4 100644 --- a/internals/overlord/logstate/gatherer_test.go +++ b/internals/overlord/logstate/gatherer_test.go @@ -69,7 +69,7 @@ func (s *gathererSuite) TestGatherer(c *C) { func (s *gathererSuite) TestGathererTimeout(c *C) { received := make(chan []servicelog.Entry, 1) gathererArgs := logGathererArgs{ - tickPeriod: 1 * time.Microsecond, + tickPeriod: 1 * time.Millisecond, newClient: func(target *plan.LogTarget) (logClient, error) { return &testClient{ bufferSize: 5, @@ -86,7 +86,7 @@ func (s *gathererSuite) TestGathererTimeout(c *C) { testSvc.writeLog("log line #1") select { - case <-time.After(20 * time.Millisecond): + case <-time.After(1 * time.Second): c.Fatalf("timeout waiting for logs") case logs := <-received: checkLogs(c, logs, []string{"log line #1"}) From be25405911e985a51a8ae537983c0f25a7a66a90 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Fri, 11 Aug 2023 10:40:45 +0700 Subject: [PATCH 18/31] rewrite pullerGroup using tombs --- internals/overlord/logstate/puller.go | 91 ++++++++++++++------------- 1 file changed, 48 insertions(+), 43 deletions(-) diff --git a/internals/overlord/logstate/puller.go b/internals/overlord/logstate/puller.go index d42d211b..e078ba8e 100644 --- a/internals/overlord/logstate/puller.go +++ b/internals/overlord/logstate/puller.go @@ -15,11 +15,11 @@ package logstate import ( - "context" "sync" "github.com/canonical/pebble/internals/logger" "github.com/canonical/pebble/internals/servicelog" + "gopkg.in/tomb.v2" ) // logPuller handles pulling logs from a single iterator and sending to the @@ -28,8 +28,7 @@ type logPuller struct { iterator servicelog.Iterator entryCh chan<- servicelog.Entry - ctx context.Context - cancel context.CancelFunc + tomb tomb.Tomb } // loop pulls logs off the iterator and sends them on the entryCh. @@ -37,30 +36,24 @@ type logPuller struct { // - if the puller's context is cancelled // - once the ringbuffer is closed and the iterator finishes reading all // remaining logs. -func (p *logPuller) loop() { +func (p *logPuller) loop() error { defer func() { _ = p.iterator.Close() }() parser := servicelog.NewParser(p.iterator, parserSize) - for p.iterator.Next(p.ctx.Done()) { + for p.iterator.Next(p.tomb.Dying()) { for parser.Next() { if err := parser.Err(); err != nil { - return - } - - // Check if our context has been cancelled - select { - case <-p.ctx.Done(): - return - default: + return err } select { case p.entryCh <- parser.Entry(): - case <-p.ctx.Done(): - return + case <-p.tomb.Dying(): + return nil } } } + return nil } // pullerGroup represents a group of logPullers, and provides methods for a @@ -73,14 +66,6 @@ type pullerGroup struct { pullers map[string]*logPuller // Mutex for pullers map mu sync.RWMutex - // Common context for all pullers. Each puller uses a derived context so we - // can easily kill all pullers (if required) during teardown. - ctx context.Context - // Cancel func for ctx - kill context.CancelFunc - // WaitGroup for pullers - we use this during teardown to know when all the - // pullers are finished. - wg sync.WaitGroup } func newPullerGroup(targetName string) *pullerGroup { @@ -88,7 +73,6 @@ func newPullerGroup(targetName string) *pullerGroup { targetName: targetName, pullers: map[string]*logPuller{}, } - pg.ctx, pg.kill = context.WithCancel(context.Background()) return pg } @@ -97,22 +81,19 @@ func (pg *pullerGroup) Add(serviceName string, buffer *servicelog.RingBuffer, en iterator: buffer.TailIterator(), entryCh: entryCh, } - lp.ctx, lp.cancel = context.WithCancel(pg.ctx) + lp.tomb.Go(func() error { + err := lp.loop() + // Once the puller exits, we should remove it from the group. + //pg.Remove(serviceName) - this seems to cause deadlock though + return err + }) - pg.wg.Add(1) // this will be marked as done once loop finishes - go func() { - lp.loop() - pg.wg.Done() - // TODO: remove puller from map ? - }() + // There shouldn't already be a puller for this service, but if there is, + // shut it down first and wait for it to die. + pg.Remove(serviceName) pg.mu.Lock() defer pg.mu.Unlock() - if puller, ok := pg.pullers[serviceName]; ok { - // This should never happen, but just in case, shut down the old puller. - logger.Debugf("puller for service %q already exists, shutting down old puller", serviceName) - puller.cancel() - } pg.pullers[serviceName] = lp } @@ -130,27 +111,51 @@ func (pg *pullerGroup) List() []string { } func (pg *pullerGroup) Remove(serviceName string) { - pg.mu.Lock() - defer pg.mu.Unlock() + pg.mu.RLock() + puller, pullerExists := pg.pullers[serviceName] + pg.mu.RUnlock() + + if !pullerExists { + return + } - if puller, ok := pg.pullers[serviceName]; ok { - puller.cancel() - delete(pg.pullers, serviceName) + puller.tomb.Kill(nil) + err := puller.tomb.Wait() + if err != nil { + logger.Noticef("Error from log puller: %v", err) } + pg.mu.Lock() + defer pg.mu.Unlock() + delete(pg.pullers, serviceName) } func (pg *pullerGroup) KillAll() { - pg.kill() + pg.mu.RLock() + defer pg.mu.RUnlock() + + for _, puller := range pg.pullers { + puller.tomb.Kill(nil) + } } // Done returns a channel which can be waited on until all pullers have finished. func (pg *pullerGroup) Done() <-chan struct{} { + pg.mu.RLock() + pullers := pg.pullers + pg.mu.RUnlock() + done := make(chan struct{}) go func() { - pg.wg.Wait() + for _, puller := range pullers { + err := puller.tomb.Wait() + if err != nil { + logger.Noticef("Error from log puller: %v", err) + } + } close(done) }() + return done } From 04de3351805cad7153cc2d90bdd5af577b8861fe Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Fri, 11 Aug 2023 11:42:17 +0700 Subject: [PATCH 19/31] close service ring buffers on Pebble shutdown --- internals/overlord/servstate/handlers.go | 14 +++++++++++--- internals/overlord/servstate/manager.go | 3 +++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/internals/overlord/servstate/handlers.go b/internals/overlord/servstate/handlers.go index cdb1285d..4fcbf3d8 100644 --- a/internals/overlord/servstate/handlers.go +++ b/internals/overlord/servstate/handlers.go @@ -223,9 +223,6 @@ func (m *ServiceManager) doStop(task *state.Task, tomb *tomb.Tomb) error { if service == nil { return nil } - // Close the ringbuffer to signal to consumers (e.g. log forwarding) that - // there are no more logs coming. - defer service.logs.Close() // Stop service: send SIGTERM, and if that doesn't stop the process in a // short time, send SIGKILL. @@ -284,6 +281,17 @@ func (m *ServiceManager) removeService(name string) { m.servicesLock.Lock() defer m.servicesLock.Unlock() + svc, svcExists := m.services[name] + if !svcExists { + return + } + if svc.logs != nil { + err := svc.logs.Close() + if err != nil { + logger.Noticef("Error closing service %q ring buffer: %v", name, err) + } + } + delete(m.services, name) } diff --git a/internals/overlord/servstate/manager.go b/internals/overlord/servstate/manager.go index bbd87173..0e3f193d 100644 --- a/internals/overlord/servstate/manager.go +++ b/internals/overlord/servstate/manager.go @@ -88,6 +88,9 @@ func (m *ServiceManager) Stop() { if err != nil { logger.Noticef("Cannot stop child process reaper: %v", err) } + for name := range m.services { + m.removeService(name) + } } // NotifyPlanChanged adds f to the list of functions that are called whenever From b39c6d029866c7d02f48a4ea8a13b360a50fa2b4 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Fri, 11 Aug 2023 11:44:22 +0700 Subject: [PATCH 20/31] fix imports --- internals/overlord/logstate/puller.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internals/overlord/logstate/puller.go b/internals/overlord/logstate/puller.go index e078ba8e..f9b541ee 100644 --- a/internals/overlord/logstate/puller.go +++ b/internals/overlord/logstate/puller.go @@ -17,9 +17,10 @@ package logstate import ( "sync" + "gopkg.in/tomb.v2" + "github.com/canonical/pebble/internals/logger" "github.com/canonical/pebble/internals/servicelog" - "gopkg.in/tomb.v2" ) // logPuller handles pulling logs from a single iterator and sending to the From 58995322978c16dc4ad2d090a0937467b8608869 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Fri, 11 Aug 2023 11:55:05 +0700 Subject: [PATCH 21/31] small comment fix in overlord --- internals/overlord/overlord.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internals/overlord/overlord.go b/internals/overlord/overlord.go index 1ccd56c8..b731c34e 100644 --- a/internals/overlord/overlord.go +++ b/internals/overlord/overlord.go @@ -114,8 +114,9 @@ func New(pebbleDir string, restartHandler restart.Handler, serviceOutput io.Writ return nil, err } o.addManager(o.serviceMgr) - // The log manager should be stopped after the service manager, so we can - // collect any final logs from the service. + // The log manager should be stopped after the service manager, because + // ServiceManager.Stop closes the service ring buffers, which signals to the + // log manager that it's okay to stop log forwarding. o.addManager(o.logMgr) o.commandMgr = cmdstate.NewManager(o.runner) From 2b4f8eb4ce0b254a201fe93613564f4e7b91bdaa Mon Sep 17 00:00:00 2001 From: Harry Pidcock Date: Mon, 14 Aug 2023 14:06:35 +1000 Subject: [PATCH 22/31] Improve handling of log gatherer tear down. --- internals/overlord/logstate/gatherer.go | 4 +- internals/overlord/logstate/puller.go | 56 ++++++++++--------------- 2 files changed, 24 insertions(+), 36 deletions(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index 49ce5ee4..9b149060 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -106,6 +106,8 @@ func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logG } g.clientCtx, g.clientCancel = context.WithCancel(context.Background()) g.tomb.Go(g.loop) + g.tomb.Go(g.pullers.tomb.Wait) + return g, nil } @@ -239,7 +241,7 @@ func (g *logGatherer) Stop() { // Wait for final flush in the main loop err := g.tomb.Wait() if err != nil { - logger.Noticef("Error shutting down gatherer: %v", err) + logger.Noticef("Internal error: cannot shut down gatherer: %v", err) } } diff --git a/internals/overlord/logstate/puller.go b/internals/overlord/logstate/puller.go index f9b541ee..b05808b1 100644 --- a/internals/overlord/logstate/puller.go +++ b/internals/overlord/logstate/puller.go @@ -67,6 +67,8 @@ type pullerGroup struct { pullers map[string]*logPuller // Mutex for pullers map mu sync.RWMutex + + tomb tomb.Tomb } func newPullerGroup(targetName string) *pullerGroup { @@ -78,23 +80,20 @@ func newPullerGroup(targetName string) *pullerGroup { } func (pg *pullerGroup) Add(serviceName string, buffer *servicelog.RingBuffer, entryCh chan<- servicelog.Entry) { + pg.mu.Lock() + defer pg.mu.Unlock() + + // There shouldn't already be a puller for this service, but if there is, + // shut it down first and wait for it to die. + pg.remove(serviceName) + lp := &logPuller{ iterator: buffer.TailIterator(), entryCh: entryCh, } - lp.tomb.Go(func() error { - err := lp.loop() - // Once the puller exits, we should remove it from the group. - //pg.Remove(serviceName) - this seems to cause deadlock though - return err - }) + lp.tomb.Go(lp.loop) + pg.tomb.Go(lp.tomb.Wait) - // There shouldn't already be a puller for this service, but if there is, - // shut it down first and wait for it to die. - pg.Remove(serviceName) - - pg.mu.Lock() - defer pg.mu.Unlock() pg.pullers[serviceName] = lp } @@ -112,23 +111,24 @@ func (pg *pullerGroup) List() []string { } func (pg *pullerGroup) Remove(serviceName string) { - pg.mu.RLock() - puller, pullerExists := pg.pullers[serviceName] - pg.mu.RUnlock() + pg.mu.Lock() + defer pg.mu.Unlock() + pg.remove(serviceName) +} +func (pg *pullerGroup) remove(serviceName string) { + puller, pullerExists := pg.pullers[serviceName] if !pullerExists { return } puller.tomb.Kill(nil) + delete(pg.pullers, serviceName) + err := puller.tomb.Wait() if err != nil { logger.Noticef("Error from log puller: %v", err) } - - pg.mu.Lock() - defer pg.mu.Unlock() - delete(pg.pullers, serviceName) } func (pg *pullerGroup) KillAll() { @@ -138,26 +138,12 @@ func (pg *pullerGroup) KillAll() { for _, puller := range pg.pullers { puller.tomb.Kill(nil) } + pg.tomb.Kill(nil) } // Done returns a channel which can be waited on until all pullers have finished. func (pg *pullerGroup) Done() <-chan struct{} { - pg.mu.RLock() - pullers := pg.pullers - pg.mu.RUnlock() - - done := make(chan struct{}) - go func() { - for _, puller := range pullers { - err := puller.tomb.Wait() - if err != nil { - logger.Noticef("Error from log puller: %v", err) - } - } - close(done) - }() - - return done + return pg.tomb.Dead() } func (pg *pullerGroup) Contains(serviceName string) bool { From 97e7d9430f3a050f04654a5c8db52381908ba0f7 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Mon, 14 Aug 2023 11:57:53 +0700 Subject: [PATCH 23/31] add persistent goroutine to pullerGroup tomb This avoids runtime panic from calling Go after all goroutines have returned. Also, we need to kill the pullerGroup tomb in Stop() to tell the pullerGroup that it's time to shut down. --- internals/overlord/logstate/gatherer.go | 1 + internals/overlord/logstate/gatherer_test.go | 2 +- internals/overlord/logstate/puller.go | 4 ++++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index 9b149060..e91de987 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -230,6 +230,7 @@ func (g *logGatherer) Stop() { // Kill the main loop once either: // - all the pullers are done // - timeoutMainLoop has passed + g.pullers.tomb.Kill(nil) select { case <-g.pullers.Done(): logger.Debugf("gatherer %q: pullers have finished", g.targetName) diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go index 11cf2ce4..78e65e5c 100644 --- a/internals/overlord/logstate/gatherer_test.go +++ b/internals/overlord/logstate/gatherer_test.go @@ -122,7 +122,7 @@ func (s *gathererSuite) TestGathererShutdown(c *C) { }() select { - case <-time.After(20 * time.Millisecond): + case <-time.After(100 * time.Millisecond): c.Fatalf("timeout waiting for gatherer to tear down") case <-hasShutdown: } diff --git a/internals/overlord/logstate/puller.go b/internals/overlord/logstate/puller.go index b05808b1..8f3b909e 100644 --- a/internals/overlord/logstate/puller.go +++ b/internals/overlord/logstate/puller.go @@ -76,6 +76,10 @@ func newPullerGroup(targetName string) *pullerGroup { targetName: targetName, pullers: map[string]*logPuller{}, } + pg.tomb.Go(func() error { + <-pg.tomb.Dying() + return nil + }) return pg } From 9465a3a6de83c23af2637bba207b39fd2ea78979 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Mon, 14 Aug 2023 12:47:56 +0700 Subject: [PATCH 24/31] rename tickPeriod to bufferTimeout --- internals/overlord/logstate/gatherer.go | 12 ++++++------ internals/overlord/logstate/gatherer_test.go | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index e91de987..fff77581 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -27,8 +27,8 @@ import ( ) const ( - parserSize = 4 * 1024 - tickPeriod = 1 * time.Second + parserSize = 4 * 1024 + bufferTimeout = 1 * time.Second // These constants control the maximum time allowed for each teardown step. timeoutCurrentFlush = 1 * time.Second @@ -76,7 +76,7 @@ type logGatherer struct { // logGathererArgs allows overriding the newLogClient method and time values // in testing. type logGathererArgs struct { - tickPeriod time.Duration + bufferTimeout time.Duration timeoutFinalFlush time.Duration // method to get a new client newClient func(*plan.LogTarget) (logClient, error) @@ -112,8 +112,8 @@ func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logG } func fillDefaultArgs(args logGathererArgs) logGathererArgs { - if args.tickPeriod == 0 { - args.tickPeriod = tickPeriod + if args.bufferTimeout == 0 { + args.bufferTimeout = bufferTimeout } if args.timeoutFinalFlush == 0 { args.timeoutFinalFlush = timeoutFinalFlush @@ -191,7 +191,7 @@ mainLoop: logger.Noticef("Error writing logs to target %q: %v", g.targetName, err) } // Set timer if not already set - timer.EnsureSet(g.tickPeriod) + timer.EnsureSet(g.bufferTimeout) } } diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go index 78e65e5c..b7220cbe 100644 --- a/internals/overlord/logstate/gatherer_test.go +++ b/internals/overlord/logstate/gatherer_test.go @@ -69,7 +69,7 @@ func (s *gathererSuite) TestGatherer(c *C) { func (s *gathererSuite) TestGathererTimeout(c *C) { received := make(chan []servicelog.Entry, 1) gathererArgs := logGathererArgs{ - tickPeriod: 1 * time.Millisecond, + bufferTimeout: 1 * time.Millisecond, newClient: func(target *plan.LogTarget) (logClient, error) { return &testClient{ bufferSize: 5, @@ -96,7 +96,7 @@ func (s *gathererSuite) TestGathererTimeout(c *C) { func (s *gathererSuite) TestGathererShutdown(c *C) { received := make(chan []servicelog.Entry, 1) gathererArgs := logGathererArgs{ - tickPeriod: 1 * time.Microsecond, + bufferTimeout: 1 * time.Microsecond, newClient: func(target *plan.LogTarget) (logClient, error) { return &testClient{ bufferSize: 5, From 5e77fb39e5d9832b6a34dcc9bce7cfcaa0d7ad30 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 15 Aug 2023 11:46:16 +0700 Subject: [PATCH 25/31] address Ben's review comments --- internals/overlord/logstate/gatherer.go | 16 ++++++++-------- internals/overlord/logstate/puller.go | 8 ++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index fff77581..218e560a 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -47,7 +47,7 @@ const ( // logs from. Each logPuller will run in a separate goroutine, and send logs to // the logGatherer via a shared channel. // The logGatherer will "flush" the client: -// - on a regular cadence (e.g. every 1 second) +// - after a timeout (1s) has passed since the first log was written; // - when it is told to shut down. // // The client may also flush itself when its internal buffer reaches a certain @@ -128,7 +128,7 @@ func fillDefaultArgs(args logGathererArgs) logGathererArgs { // gatherer's target exists in the new plan. func (g *logGatherer) PlanChanged(pl *plan.Plan, buffers map[string]*servicelog.RingBuffer) { // Remove old pullers - for _, svcName := range g.pullers.List() { + for _, svcName := range g.pullers.Services() { svc, svcExists := pl.Services[svcName] if !svcExists { g.pullers.Remove(svcName) @@ -170,6 +170,7 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R // client periodically, and exits when the gatherer's tomb is killed. func (g *logGatherer) loop() error { timer := newTimer() + defer timer.Stop() mainLoop: for { @@ -177,20 +178,19 @@ mainLoop: case <-g.tomb.Dying(): break mainLoop - case <-timer.Finished(): + case <-timer.Expired(): // Mark timer as unset timer.Stop() err := g.client.Flush(g.clientCtx) if err != nil { - logger.Noticef("Error sending logs to target %q: %v", g.targetName, err) + logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) } case entry := <-g.entryCh: err := g.client.Write(g.clientCtx, entry) if err != nil { - logger.Noticef("Error writing logs to target %q: %v", g.targetName, err) + logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err) } - // Set timer if not already set timer.EnsureSet(g.bufferTimeout) } } @@ -201,7 +201,7 @@ mainLoop: defer cancel() err := g.client.Flush(ctx) if err != nil { - logger.Noticef("Error sending logs to target %q: %v", g.targetName, err) + logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err) } return nil } @@ -260,7 +260,7 @@ func newTimer() timer { return t } -func (t *timer) Finished() <-chan time.Time { +func (t *timer) Expired() <-chan time.Time { return t.timer.C } diff --git a/internals/overlord/logstate/puller.go b/internals/overlord/logstate/puller.go index 8f3b909e..0c73ad02 100644 --- a/internals/overlord/logstate/puller.go +++ b/internals/overlord/logstate/puller.go @@ -101,13 +101,13 @@ func (pg *pullerGroup) Add(serviceName string, buffer *servicelog.RingBuffer, en pg.pullers[serviceName] = lp } -// List returns a list of all service names for which we have a currently -// active puller. -func (pg *pullerGroup) List() []string { +// Services returns a list containing the name of each service for which we +// have a puller in this group. +func (pg *pullerGroup) Services() []string { pg.mu.RLock() defer pg.mu.RUnlock() - var svcs []string + svcs := make([]string, 0, len(pg.pullers)) for svc := range pg.pullers { svcs = append(svcs, svc) } From 28d2d51324ff12196d5e02b1ab681965ff4e0a8f Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 15 Aug 2023 12:42:05 +0700 Subject: [PATCH 26/31] lowercase pullerGroup contains and len methods --- internals/overlord/logstate/manager_test.go | 4 ++-- internals/overlord/logstate/puller.go | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/internals/overlord/logstate/manager_test.go b/internals/overlord/logstate/manager_test.go index af861b66..366250be 100644 --- a/internals/overlord/logstate/manager_test.go +++ b/internals/overlord/logstate/manager_test.go @@ -100,9 +100,9 @@ func checkGatherers(c *C, gatherers map[string]*logGatherer, expected map[string g, ok := gatherers[tgtName] c.Assert(ok, Equals, true) - c.Assert(g.pullers.Len(), Equals, len(svcs)) + c.Assert(g.pullers.len(), Equals, len(svcs)) for _, svc := range svcs { - c.Check(g.pullers.Contains(svc), Equals, true) + c.Check(g.pullers.contains(svc), Equals, true) } } } diff --git a/internals/overlord/logstate/puller.go b/internals/overlord/logstate/puller.go index 0c73ad02..eeea91a8 100644 --- a/internals/overlord/logstate/puller.go +++ b/internals/overlord/logstate/puller.go @@ -150,14 +150,16 @@ func (pg *pullerGroup) Done() <-chan struct{} { return pg.tomb.Dead() } -func (pg *pullerGroup) Contains(serviceName string) bool { +// contains is used for testing. +func (pg *pullerGroup) contains(serviceName string) bool { pg.mu.RLock() defer pg.mu.RUnlock() _, ok := pg.pullers[serviceName] return ok } -func (pg *pullerGroup) Len() int { +// len is used for testing. +func (pg *pullerGroup) len() int { pg.mu.RLock() defer pg.mu.RUnlock() return len(pg.pullers) From 0b0d873d708af4ebc976766bbfd879e6dabcc423 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 15 Aug 2023 12:43:43 +0700 Subject: [PATCH 27/31] remove fake client implementations --- internals/overlord/logstate/fake.go | 70 ----------------------------- 1 file changed, 70 deletions(-) delete mode 100644 internals/overlord/logstate/fake.go diff --git a/internals/overlord/logstate/fake.go b/internals/overlord/logstate/fake.go deleted file mode 100644 index 8356e236..00000000 --- a/internals/overlord/logstate/fake.go +++ /dev/null @@ -1,70 +0,0 @@ -package logstate - -import ( - "context" - "fmt" - "time" - - "github.com/canonical/pebble/internals/servicelog" -) - -// Fake sample implementations of logClient -// TODO: remove this file before merging - -type nonBufferingClient struct{} - -var _ logClient = &nonBufferingClient{} - -func (c *nonBufferingClient) Write(_ context.Context, entry servicelog.Entry) error { - fmt.Printf("%v [%s] %s", entry.Time, entry.Service, entry.Message) - return nil -} - -func (c *nonBufferingClient) Flush(_ context.Context) error { - // no-op - return nil -} - -type bufferingClient struct { - entries []servicelog.Entry - threshold int -} - -var _ logClient = &bufferingClient{} - -func (c *bufferingClient) Write(ctx context.Context, entry servicelog.Entry) error { - c.entries = append(c.entries, entry) - if c.threshold > 0 && len(c.entries) >= c.threshold { - return c.Flush(ctx) - } - return nil -} - -func (c *bufferingClient) Flush(_ context.Context) error { - for _, entry := range c.entries { - fmt.Printf("%v [%s] %s", entry.Time, entry.Service, entry.Message) - } - fmt.Println() - c.entries = c.entries[:0] - return nil -} - -// a slow client where Flush takes a long time -type slowClient struct { - flushTime time.Duration -} - -var _ logClient = &slowClient{} - -func (c *slowClient) Write(_ context.Context, _ servicelog.Entry) error { - return nil -} - -func (c *slowClient) Flush(ctx context.Context) error { - select { - case <-time.After(c.flushTime): - return nil - case <-ctx.Done(): - return fmt.Errorf("timeout flushing logs") - } -} From 97c468180dfa0e386be4e4a4f5e04100c979d0c7 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 15 Aug 2023 12:52:10 +0700 Subject: [PATCH 28/31] add comment on pullerGroup tomb --- internals/overlord/logstate/puller.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internals/overlord/logstate/puller.go b/internals/overlord/logstate/puller.go index eeea91a8..775d30aa 100644 --- a/internals/overlord/logstate/puller.go +++ b/internals/overlord/logstate/puller.go @@ -76,10 +76,15 @@ func newPullerGroup(targetName string) *pullerGroup { targetName: targetName, pullers: map[string]*logPuller{}, } + // This goroutine lives for the lifetime of the pullerGroup. This is so that, + // if needed, we can safely remove all pullers and then add more, without + // causing a panic (tombs can't be reused once all the tracked goroutines + // have finished). pg.tomb.Go(func() error { <-pg.tomb.Dying() return nil }) + return pg } From 8863487971170dddeb9a2f677ee3e2148a1ee363 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Tue, 15 Aug 2023 12:56:25 +0700 Subject: [PATCH 29/31] obtain lock before reading ServiceManager.services --- internals/overlord/servstate/handlers.go | 4 ++++ internals/overlord/servstate/manager.go | 6 +++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/internals/overlord/servstate/handlers.go b/internals/overlord/servstate/handlers.go index 4fcbf3d8..2db5f6cc 100644 --- a/internals/overlord/servstate/handlers.go +++ b/internals/overlord/servstate/handlers.go @@ -280,7 +280,11 @@ func (m *ServiceManager) serviceForStop(task *state.Task, name string) *serviceD func (m *ServiceManager) removeService(name string) { m.servicesLock.Lock() defer m.servicesLock.Unlock() + m.removeServiceInternal(name) +} +// not concurrency-safe, please lock m.servicesLock before calling +func (m *ServiceManager) removeServiceInternal(name string) { svc, svcExists := m.services[name] if !svcExists { return diff --git a/internals/overlord/servstate/manager.go b/internals/overlord/servstate/manager.go index 0e3f193d..f85bfeb5 100644 --- a/internals/overlord/servstate/manager.go +++ b/internals/overlord/servstate/manager.go @@ -88,8 +88,12 @@ func (m *ServiceManager) Stop() { if err != nil { logger.Noticef("Cannot stop child process reaper: %v", err) } + + // Close all the service ringbuffers + m.servicesLock.Lock() + defer m.servicesLock.Unlock() for name := range m.services { - m.removeService(name) + m.removeServiceInternal(name) } } From c0584cea7252798841aa6dd74461e0badbe68dd3 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Thu, 17 Aug 2023 12:16:10 +0700 Subject: [PATCH 30/31] Add some more doc comments to pullerGroup methods --- internals/overlord/logstate/puller.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internals/overlord/logstate/puller.go b/internals/overlord/logstate/puller.go index 775d30aa..cd12c5d8 100644 --- a/internals/overlord/logstate/puller.go +++ b/internals/overlord/logstate/puller.go @@ -88,6 +88,8 @@ func newPullerGroup(targetName string) *pullerGroup { return pg } +// Add adds a new puller to the group. This puller will read from the given +// buffer, and send parsed logs on the provided channel. func (pg *pullerGroup) Add(serviceName string, buffer *servicelog.RingBuffer, entryCh chan<- servicelog.Entry) { pg.mu.Lock() defer pg.mu.Unlock() @@ -119,12 +121,15 @@ func (pg *pullerGroup) Services() []string { return svcs } +// Remove removes the puller for the named service. func (pg *pullerGroup) Remove(serviceName string) { pg.mu.Lock() defer pg.mu.Unlock() pg.remove(serviceName) } +// remove removes the puller for the named service. +// This method is not concurrency-safe - please lock pg.mu before calling. func (pg *pullerGroup) remove(serviceName string) { puller, pullerExists := pg.pullers[serviceName] if !pullerExists { @@ -140,6 +145,7 @@ func (pg *pullerGroup) remove(serviceName string) { } } +// KillAll kills all pullers in this group. func (pg *pullerGroup) KillAll() { pg.mu.RLock() defer pg.mu.RUnlock() From 3c92f43c6695c8ca23c23ebaa5cf7c23c9c67566 Mon Sep 17 00:00:00 2001 From: Ben Hoyt Date: Tue, 22 Aug 2023 08:27:04 +0100 Subject: [PATCH 31/31] Remove internal error on tomb.Wait log --- internals/overlord/logstate/gatherer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index 218e560a..001b9e2f 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -242,7 +242,7 @@ func (g *logGatherer) Stop() { // Wait for final flush in the main loop err := g.tomb.Wait() if err != nil { - logger.Noticef("Internal error: cannot shut down gatherer: %v", err) + logger.Noticef("Cannot shut down gatherer: %v", err) } }