diff --git a/internals/overlord/logstate/forwarder.go b/internals/overlord/logstate/forwarder.go
new file mode 100644
index 00000000..1edaee00
--- /dev/null
+++ b/internals/overlord/logstate/forwarder.go
@@ -0,0 +1,71 @@
+// Copyright (c) 2023 Canonical Ltd
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3 as
+// published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+package logstate
+
+import (
+ "sync"
+
+ "github.com/canonical/pebble/internals/logger"
+ "github.com/canonical/pebble/internals/servicelog"
+)
+
+// logForwarder is responsible for pulling logs from a service's ringbuffer,
+// and distributing each log message to its logGatherers. Its gatherers field
+// holds a reference to the gatherer for each log target that the service is
+// sending logs to.
+// One logForwarder will run per service. Its forward() method should be run
+// in its own goroutine.
+type logForwarder struct {
+ serviceName string
+
+ mu sync.Mutex // mutex for gatherers
+ gatherers []*logGatherer
+
+ cancel chan struct{}
+}
+
+func newLogForwarder(serviceName string) *logForwarder {
+ f := &logForwarder{
+ serviceName: serviceName,
+ cancel: make(chan struct{}),
+ }
+
+ return f
+}
+
+func (f *logForwarder) forward(buffer *servicelog.RingBuffer) {
+ iterator := buffer.TailIterator()
+ // TODO: don't use the parser, just pull/write bytes from iterator
+ parser := servicelog.NewParser(iterator, 1024 /* TODO*/)
+
+ for iterator.Next(f.cancel) {
+ for parser.Next() {
+ entry := parser.Entry()
+ f.mu.Lock()
+ gatherers := f.gatherers
+ f.mu.Unlock()
+ for _, c := range gatherers {
+ c.addLog(entry)
+ }
+ }
+ if err := parser.Err(); err != nil {
+ logger.Noticef("Cannot read logs from service %q: %v", f.serviceName, err)
+ }
+ }
+}
+
+func (f *logForwarder) stop() {
+ close(f.cancel)
+}
diff --git a/internals/overlord/logstate/forwarder_test.go b/internals/overlord/logstate/forwarder_test.go
new file mode 100644
index 00000000..5569dfbb
--- /dev/null
+++ b/internals/overlord/logstate/forwarder_test.go
@@ -0,0 +1,69 @@
+// Copyright (c) 2023 Canonical Ltd
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3 as
+// published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+package logstate
+
+import (
+ "fmt"
+ "time"
+
+ . "gopkg.in/check.v1"
+
+ "github.com/canonical/pebble/internals/servicelog"
+)
+
+type forwarderSuite struct{}
+
+var _ = Suite(&forwarderSuite{})
+
+func (s *forwarderSuite) TestForwarder(c *C) {
+ serviceName := "foobar"
+ ringBuffer := servicelog.NewRingBuffer(1024)
+ logWriter := servicelog.NewFormatWriter(ringBuffer, serviceName)
+
+ recv1, recv2 := make(chan []servicelog.Entry), make(chan []servicelog.Entry)
+ gatherer1 := newLogGathererForTest(nil, 1*time.Microsecond, 5, recv1)
+ go gatherer1.loop()
+ gatherer2 := newLogGathererForTest(nil, 1*time.Microsecond, 5, recv2)
+ go gatherer2.loop()
+
+ forwarder := newLogForwarder(serviceName)
+ go forwarder.forward(ringBuffer)
+
+ forwarder.mu.Lock()
+ forwarder.gatherers = []*logGatherer{gatherer1, gatherer2}
+ forwarder.mu.Unlock()
+
+ message := "this is a log line"
+ _, err := fmt.Fprintln(logWriter, message)
+ c.Assert(err, IsNil)
+
+ select {
+ case entries := <-recv1:
+ c.Assert(entries, HasLen, 1)
+ c.Check(entries[0].Service, Equals, serviceName)
+ c.Check(entries[0].Message, Equals, message+"\n")
+ case <-time.After(10 * time.Millisecond):
+ c.Fatal("timeout waiting to receive logs from gatherer1")
+ }
+
+ select {
+ case entries := <-recv2:
+ c.Assert(entries, HasLen, 1)
+ c.Check(entries[0].Service, Equals, serviceName)
+ c.Check(entries[0].Message, Equals, message+"\n")
+ case <-time.After(10 * time.Millisecond):
+ c.Fatal("timeout waiting to receive logs from gatherer2")
+ }
+}
diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go
new file mode 100644
index 00000000..95523570
--- /dev/null
+++ b/internals/overlord/logstate/gatherer.go
@@ -0,0 +1,170 @@
+// Copyright (c) 2023 Canonical Ltd
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3 as
+// published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+package logstate
+
+import (
+ "io"
+ "sync"
+ "time"
+
+ "github.com/canonical/pebble/internals/logger"
+ "github.com/canonical/pebble/internals/plan"
+ "github.com/canonical/pebble/internals/servicelog"
+)
+
+// logGatherer is responsible for collecting service logs from a forwarder,
+// writing them to its internal logBuffer, and sending the request via its
+// logClient.
+// One logGatherer will run per log target. Its loop() method should be run
+// in its own goroutine, while the addLog() method can be invoked in a
+// separate goroutine by a logForwarder.
+// The logGatherer will "flush" and send a request to the client:
+// - on a regular cadence (e.g. every 1 second)
+// - when the buffer reaches a certain size
+// - when it is told to shut down.
+type logGatherer struct {
+ target *plan.LogTarget
+ tickPeriod time.Duration
+ writeCh chan struct{}
+ cancel chan struct{}
+
+ bufferLock sync.Mutex
+ buffer logBuffer
+ client logClient
+}
+
+func newLogGatherer(target *plan.LogTarget) *logGatherer {
+ tickPeriod := 1 * time.Second
+
+ return &logGatherer{
+ target: target,
+ tickPeriod: tickPeriod,
+ // writeCh should be buffered, so that addLog can send write notifications,
+ // even when the control loop is not ready to receive.
+ writeCh: make(chan struct{}, 1),
+ cancel: make(chan struct{}),
+ buffer: newLogBuffer(target),
+ client: newLogClient(target),
+ }
+}
+
+func (g *logGatherer) loop() {
+ ticker := time.NewTicker(g.tickPeriod)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ // Timeout - flush
+ g.flush(true)
+
+ case <-g.writeCh:
+ // Got a write - check if buffer is full
+ g.flush(false)
+
+ case <-g.cancel:
+ // Gatherer has been stopped - flush any remaining logs
+ g.flush(true)
+ return
+ }
+ }
+}
+
+func (g *logGatherer) addLog(entry servicelog.Entry) {
+ g.bufferLock.Lock()
+ g.buffer.Write(entry)
+ g.bufferLock.Unlock()
+
+ // Try to notify the control loop of a new write to the buffer.
+ // If there is already a notification waiting, no need to notify again - just
+ // drop it.
+ select {
+ case g.writeCh <- struct{}{}:
+ default:
+ }
+}
+
+// flush obtains a lock on the buffer, prepares the request, sends to the
+// remote server, and empties the buffer.
+// If force is false, flush will check first if the buffer is full, and only
+// flush if it is full.
+func (g *logGatherer) flush(force bool) {
+ g.bufferLock.Lock()
+ defer g.bufferLock.Unlock()
+
+ if g.buffer.IsEmpty() {
+ // No point doing anything
+ return
+ }
+ if !force && !g.buffer.IsFull() {
+ // Not ready to flush yet
+ return
+ }
+
+ req, err := g.buffer.Request()
+ if err != nil {
+ logger.Noticef("couldn't generate request for target %q: %v", g.target.Name, err)
+ return
+ }
+
+ err = g.client.Send(req)
+ if err != nil {
+ logger.Noticef("couldn't send logs to target %q: %v", g.target.Name, err)
+ // TODO: early return here? should we reset buffer if send fails?
+ }
+
+ g.buffer.Reset()
+}
+
+// stop closes the cancel channel, thereby terminating the main loop.
+func (g *logGatherer) stop() {
+ close(g.cancel)
+}
+
+// logBuffer is an interface encapsulating format-specific buffering of log
+// messages. E.g. a logBuffer for Loki would encode the log messages in the
+// JSON format expected by Loki.
+// A logBuffer's methods may not be concurrency-safe. Callers should protect
+// the logBuffer using a sync.Mutex.
+type logBuffer interface {
+ IsEmpty() bool
+ IsFull() bool
+
+ // Write encodes the provided log message and adds it to the buffer.
+ Write(servicelog.Entry) // TODO: return error?
+
+ // Request returns an io.Reader which can be used as the body of a request
+ // to the remote log target.
+ Request() (io.Reader, error)
+
+ // Reset empties the buffer.
+ Reset()
+}
+
+func newLogBuffer(target *plan.LogTarget) logBuffer {
+ // TODO: check target.Type and return the corresponding logBuffer
+ return nil
+}
+
+// logClient is implemented by a client to a specific type of log target.
+// It sends requests using the protocol preferred by that log target.
+type logClient interface {
+ Send(io.Reader) error
+}
+
+func newLogClient(target *plan.LogTarget) logClient {
+ // TODO: check target.Type and return the corresponding logClient
+ return nil
+}
diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go
new file mode 100644
index 00000000..8b22c44c
--- /dev/null
+++ b/internals/overlord/logstate/gatherer_test.go
@@ -0,0 +1,176 @@
+// Copyright (c) 2023 Canonical Ltd
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3 as
+// published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+package logstate
+
+import (
+ "bytes"
+ "io"
+ "time"
+
+ . "gopkg.in/check.v1"
+ "gopkg.in/yaml.v3"
+
+ "github.com/canonical/pebble/internals/plan"
+ "github.com/canonical/pebble/internals/servicelog"
+)
+
+type gathererSuite struct{}
+
+var _ = Suite(&gathererSuite{})
+
+func (s *gathererSuite) TestGathererBufferFull(c *C) {
+ recv := make(chan []servicelog.Entry)
+ g := newLogGathererForTest(nil, 1*time.Hour, 5, recv)
+ go g.loop()
+
+ entries := []servicelog.Entry{{
+ Time: time.Date(2023, 1, 1, 14, 34, 56, 789, time.UTC),
+ Service: "foobar",
+ Message: "log line #1",
+ }, {
+ Time: time.Date(2023, 1, 1, 14, 34, 57, 789, time.UTC),
+ Service: "foobar",
+ Message: "log line #2",
+ }, {
+ Time: time.Date(2023, 1, 1, 14, 34, 58, 789, time.UTC),
+ Service: "foobar",
+ Message: "log line #3",
+ }, {
+ Time: time.Date(2023, 1, 1, 14, 34, 59, 789, time.UTC),
+ Service: "foobar",
+ Message: "log line #4",
+ }, {
+ Time: time.Date(2023, 1, 1, 14, 35, 14, 789, time.UTC),
+ Service: "foobar",
+ Message: "log line #5",
+ }}
+
+ for _, entry := range entries {
+ c.Assert(g.buffer.IsFull(), Equals, false)
+ g.addLog(entry)
+ }
+ c.Assert(g.buffer.IsFull(), Equals, true)
+
+ select {
+ case received := <-recv:
+ c.Assert(received, DeepEquals, entries)
+ case <-time.After(10 * time.Millisecond):
+ c.Fatal("timeout waiting to receive logs")
+ }
+}
+
+func (s *gathererSuite) TestGathererTimeout(c *C) {
+ recv := make(chan []servicelog.Entry)
+ g := newLogGathererForTest(nil, 1*time.Microsecond, 2, recv)
+ go g.loop()
+
+ entry := servicelog.Entry{
+ Time: time.Date(2023, 1, 1, 14, 34, 56, 789, time.UTC),
+ Service: "foobar",
+ Message: "this is a log",
+ }
+ g.addLog(entry)
+ c.Assert(g.buffer.IsFull(), Equals, false)
+
+ select {
+ case entries := <-recv:
+ c.Assert(entries, DeepEquals, []servicelog.Entry{entry})
+ case <-time.After(10 * time.Millisecond):
+ c.Fatal("timeout waiting to receive logs")
+ }
+}
+
+func (s *gathererSuite) TestGathererStop(c *C) {
+ recv := make(chan []servicelog.Entry)
+ g := newLogGathererForTest(nil, 1*time.Hour, 5, recv)
+ go g.loop()
+
+ entry := servicelog.Entry{
+ Time: time.Date(2023, 1, 1, 14, 34, 56, 789, time.UTC),
+ Service: "foobar",
+ Message: "this is a log",
+ }
+ g.addLog(entry)
+ c.Assert(g.buffer.IsFull(), Equals, false)
+
+ g.stop()
+ select {
+ case entries := <-recv:
+ c.Assert(entries, DeepEquals, []servicelog.Entry{entry})
+ case <-time.After(10 * time.Millisecond):
+ c.Fatal("timeout waiting to receive logs")
+ }
+}
+
+func newLogGathererForTest(target *plan.LogTarget, tickPeriod time.Duration, bufferCapacity int, recv chan []servicelog.Entry) *logGatherer {
+ g := newLogGatherer(target)
+ g.tickPeriod = tickPeriod
+ g.buffer = &testBuffer{
+ capacity: bufferCapacity,
+ }
+ g.client = &testClient{recv: recv}
+ return g
+}
+
+// testBuffer is a "fake" implementation of logBuffer, for use in testing.
+// It stores log entries internally in a slice.
+type testBuffer struct {
+ entries []servicelog.Entry
+ capacity int
+}
+
+func (b *testBuffer) Write(entry servicelog.Entry) {
+ b.entries = append(b.entries, entry)
+}
+
+func (b *testBuffer) IsEmpty() bool {
+ return len(b.entries) == 0
+}
+
+func (b *testBuffer) IsFull() bool {
+ return len(b.entries) >= b.capacity
+}
+
+// Request returns an io.Reader reading a YAML encoding of the entries
+// currently stored in the testBuffer's entries slice.
+func (b *testBuffer) Request() (io.Reader, error) {
+ req, err := yaml.Marshal(b.entries)
+ if err != nil {
+ return nil, err
+ }
+ return bytes.NewReader(req), nil
+}
+
+func (b *testBuffer) Reset() {
+ b.entries = []servicelog.Entry{}
+}
+
+// testClient is a "fake" implementation of logClient, for use in testing.
+type testClient struct {
+ recv chan []servicelog.Entry
+}
+
+// Send reads from the provided io.Reader, attempts to decode from YAML to a
+// []servicelog.Entry, then sends the decoded value on the recv channel.
+func (c testClient) Send(body io.Reader) error {
+ entries := []servicelog.Entry{}
+ decoder := yaml.NewDecoder(body)
+ err := decoder.Decode(&entries)
+ if err != nil {
+ return err
+ }
+ c.recv <- entries
+ return nil
+}
diff --git a/internals/overlord/logstate/manager.go b/internals/overlord/logstate/manager.go
index 4ee2b235..feff88f6 100644
--- a/internals/overlord/logstate/manager.go
+++ b/internals/overlord/logstate/manager.go
@@ -15,26 +15,98 @@
package logstate
import (
+ "sync"
+
+ "github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)
-type LogManager struct{}
+type LogManager struct {
+ forwarders map[string]*logForwarder
+ gatherers map[string]*logGatherer
+
+ newForwarder func(serviceName string) *logForwarder
+ newGatherer func(*plan.LogTarget) *logGatherer
+}
func NewLogManager() *LogManager {
- return &LogManager{}
+ return &LogManager{
+ forwarders: map[string]*logForwarder{},
+ gatherers: map[string]*logGatherer{},
+ newForwarder: newLogForwarder,
+ newGatherer: newLogGatherer,
+ }
}
-// PlanChanged is called by the service manager when the plan changes. We stop
-// all running forwarders, and start new forwarders based on the new plan.
+// PlanChanged is called by the service manager when the plan changes. We update the list of gatherers for each forwarder based on the new plan.
func (m *LogManager) PlanChanged(pl *plan.Plan) {
- // TODO: implement
+ // Create a map to hold forwarders/gatherers for the new plan.
+ // Old forwarders/gatherers will be moved over or deleted.
+ newForwarders := make(map[string]*logForwarder, len(pl.Services))
+ newGatherers := make(map[string]*logGatherer, len(pl.LogTargets))
+
+ for serviceName, service := range pl.Services {
+ // TODO: don't create forwarders if there are no targets for this service?
+ forwarder := m.forwarders[serviceName]
+ if forwarder == nil {
+ // Create new forwarder
+ forwarder = m.newForwarder(serviceName)
+ newForwarders[serviceName] = forwarder
+ } else {
+ // Copy over existing forwarder
+ newForwarders[serviceName] = forwarder
+ delete(m.forwarders, serviceName)
+ }
+
+ // update clients
+ forwarder.mu.Lock()
+ forwarder.gatherers = []*logGatherer{}
+
+ for _, target := range pl.LogTargets {
+ // Only create the gatherer if there is a service logging to it.
+ if service.LogsTo(target) {
+ gatherer := m.gatherers[serviceName]
+ if gatherer == nil {
+ // Create new gatherer
+ gatherer = m.newGatherer(target)
+ go gatherer.loop()
+ newGatherers[target.Name] = gatherer
+ } else {
+ // Copy over existing gatherer
+ newGatherers[target.Name] = gatherer
+ delete(m.gatherers, target.Name)
+ }
+
+ forwarder.gatherers = append(forwarder.gatherers, gatherer)
+ }
+ }
+
+ forwarder.mu.Unlock()
+ }
+
+ // Old forwarders for now-removed services need to be shut down.
+ for _, forwarder := range m.forwarders {
+ forwarder.stop()
+ }
+ m.forwarders = newForwarders
+
+ // Same with old gatherers.
+ for _, gatherer := range m.gatherers {
+ gatherer.stop()
+ }
+ m.gatherers = newGatherers
}
// ServiceStarted notifies the log manager that the named service has started,
// and provides a reference to the service's log buffer.
func (m *LogManager) ServiceStarted(serviceName string, buffer *servicelog.RingBuffer) {
- // TODO: implement
+ forwarder := m.forwarders[serviceName]
+ if forwarder == nil {
+ logger.Noticef("Internal error: couldn't find forwarder for %q", serviceName)
+ return
+ }
+ go forwarder.forward(buffer)
}
// Ensure implements overlord.StateManager.
@@ -44,5 +116,13 @@ func (m *LogManager) Ensure() error {
// Stop implements overlord.StateStopper and stops all log forwarding.
func (m *LogManager) Stop() {
- // TODO: implement
+ wg := sync.WaitGroup{}
+ for _, f := range m.forwarders {
+ wg.Add(1)
+ go func(f *logForwarder) {
+ f.stop()
+ wg.Done()
+ }(f)
+ }
+ wg.Wait()
}
diff --git a/internals/overlord/logstate/manager_test.go b/internals/overlord/logstate/manager_test.go
new file mode 100644
index 00000000..7e9d5cd9
--- /dev/null
+++ b/internals/overlord/logstate/manager_test.go
@@ -0,0 +1,236 @@
+// Copyright (c) 2023 Canonical Ltd
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3 as
+// published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+package logstate
+
+import (
+ "fmt"
+ "sort"
+ "sync"
+ "time"
+
+ . "gopkg.in/check.v1"
+
+ "github.com/canonical/pebble/internals/plan"
+ "github.com/canonical/pebble/internals/servicelog"
+)
+
+type managerSuite struct{}
+
+var _ = Suite(&managerSuite{})
+
+func (s *managerSuite) TestLogManager(c *C) {
+ m := newLogManagerForTest(1*time.Second, 10, make(chan []servicelog.Entry))
+ // Fake ringbuffer so that log manager can create forwarders
+ rb := servicelog.RingBuffer{}
+
+ // Call PlanChanged with new plan
+ m.PlanChanged(&plan.Plan{
+ Services: map[string]*plan.Service{
+ "svc1": {Name: "svc1"},
+ "svc2": {Name: "svc2"},
+ "svc3": {Name: "svc3"},
+ },
+ LogTargets: map[string]*plan.LogTarget{
+ "tgt1": {Name: "tgt1", Services: []string{"svc1"}},
+ "tgt2": {Name: "tgt2", Services: []string{"all", "-svc2"}},
+ "tgt3": {Name: "tgt3", Services: []string{"svc1", "svc3", "-svc1"}},
+ "tgt4": {Name: "tgt4", Services: []string{}},
+ },
+ })
+
+ // Start the three services. We do this concurrently to simulate Pebble's
+ // actual service startup, and check there are no race conditions.
+ var wg sync.WaitGroup
+ wg.Add(3)
+ go func() {
+ defer wg.Done()
+ m.ServiceStarted("svc1", &rb)
+ }()
+ go func() {
+ defer wg.Done()
+ m.ServiceStarted("svc2", &rb)
+ }()
+ go func() {
+ defer wg.Done()
+ m.ServiceStarted("svc3", &rb)
+ }()
+
+ wg.Wait()
+ c.Assert(getServiceNames(m.forwarders), DeepEquals, []string{"svc1", "svc2", "svc3"})
+ c.Assert(getTargets(m.forwarders["svc1"]), DeepEquals, []string{"tgt1", "tgt2"})
+ c.Assert(getTargets(m.forwarders["svc2"]), DeepEquals, []string(nil))
+ c.Assert(getTargets(m.forwarders["svc3"]), DeepEquals, []string{"tgt2", "tgt3"})
+
+ // Update the plan
+ m.PlanChanged(&plan.Plan{
+ Services: map[string]*plan.Service{
+ "svc1": {Name: "svc1"},
+ "svc2": {Name: "svc2"},
+ "svc4": {Name: "svc4"},
+ },
+ LogTargets: map[string]*plan.LogTarget{
+ "tgt1": {Name: "tgt1", Services: []string{"svc1", "svc2"}},
+ "tgt2": {Name: "tgt2", Services: []string{"svc2"}},
+ "tgt3": {Name: "tgt3", Services: []string{}},
+ "tgt4": {Name: "tgt4", Services: []string{"all"}},
+ },
+ })
+
+ // Call ServiceStarted
+ m.ServiceStarted("svc4", &rb)
+ c.Assert(getServiceNames(m.forwarders), DeepEquals, []string{"svc1", "svc2", "svc4"})
+ c.Assert(getTargets(m.forwarders["svc1"]), DeepEquals, []string{"tgt1", "tgt4"})
+ c.Assert(getTargets(m.forwarders["svc2"]), DeepEquals, []string{"tgt1", "tgt2", "tgt4"})
+ c.Assert(getTargets(m.forwarders["svc4"]), DeepEquals, []string{"tgt4"})
+}
+
+func (s *managerSuite) TestNoLogDuplication(c *C) {
+ recv := make(chan []servicelog.Entry)
+ m := newLogManagerForTest(10*time.Microsecond, 10, recv)
+ rb := servicelog.NewRingBuffer(1024)
+
+ // Utility functions for this test
+ writeLog := func(timestamp time.Time, logLine string) {
+ _, err := fmt.Fprintf(rb, "%s [svc1] %s\n",
+ timestamp.UTC().Format("2006-01-02T15:04:05.000Z07:00"), logLine)
+ c.Assert(err, IsNil)
+ }
+ expectLogs := func(expected ...string) {
+ select {
+ case entries := <-recv:
+ c.Assert(entries, HasLen, len(expected))
+ for i, entry := range entries {
+ c.Check(entry.Message, Equals, expected[i]+"\n")
+ }
+
+ case <-time.After(10 * time.Millisecond):
+ c.Fatalf("timed out waiting for request %q", expected)
+ }
+ }
+
+ m.PlanChanged(&plan.Plan{
+ Services: map[string]*plan.Service{
+ "svc1": {Name: "svc1"},
+ },
+ LogTargets: map[string]*plan.LogTarget{
+ "tgt1": {Name: "tgt1", Services: []string{"svc1"}},
+ },
+ })
+ m.ServiceStarted("svc1", rb)
+ c.Assert(getServiceNames(m.forwarders), DeepEquals, []string{"svc1"})
+ c.Assert(getTargets(m.forwarders["svc1"]), DeepEquals, []string{"tgt1"})
+
+ // Write logs
+ writeLog(time.Now(), "log line #1")
+ writeLog(time.Now(), "log line #2")
+ expectLogs("log line #1", "log line #2")
+
+ // Call PlanChanged again
+ m.PlanChanged(&plan.Plan{
+ Services: map[string]*plan.Service{
+ "svc1": {Name: "svc1"},
+ },
+ LogTargets: map[string]*plan.LogTarget{
+ "tgt1": {Name: "tgt1", Services: []string{"svc1"}},
+ },
+ })
+ c.Assert(getServiceNames(m.forwarders), DeepEquals, []string{"svc1"})
+ c.Assert(getTargets(m.forwarders["svc1"]), DeepEquals, []string{"tgt1"})
+
+ // Write logs
+ writeLog(time.Now(), "log line #3")
+ writeLog(time.Now(), "log line #4")
+ expectLogs("log line #3", "log line #4")
+}
+
+func (s *managerSuite) TestFlushLogsOnInterrupt(c *C) {
+ recv := make(chan []servicelog.Entry)
+ m := newLogManagerForTest(10*time.Microsecond, 10, recv)
+ rb := servicelog.NewRingBuffer(1024)
+
+ // Utility functions for this test
+ writeLog := func(timestamp time.Time, logLine string) {
+ _, err := fmt.Fprintf(rb, "%s [svc1] %s\n",
+ timestamp.UTC().Format("2006-01-02T15:04:05.000Z07:00"), logLine)
+ c.Assert(err, IsNil)
+ }
+ expectLogs := func(expected ...string) {
+ select {
+ case entries := <-recv:
+ c.Assert(entries, HasLen, len(expected))
+ for i, entry := range entries {
+ c.Check(entry.Message, Equals, expected[i]+"\n")
+ }
+
+ case <-time.After(10 * time.Millisecond):
+ c.Fatalf("timed out waiting for request %q", expected)
+ }
+ }
+
+ m.PlanChanged(&plan.Plan{
+ Services: map[string]*plan.Service{
+ "svc1": {Name: "svc1"},
+ },
+ LogTargets: map[string]*plan.LogTarget{
+ "tgt1": {Name: "tgt1", Services: []string{"svc1"}},
+ },
+ })
+ m.ServiceStarted("svc1", rb)
+ c.Assert(getServiceNames(m.forwarders), DeepEquals, []string{"svc1"})
+ c.Assert(getTargets(m.forwarders["svc1"]), DeepEquals, []string{"tgt1"})
+
+ // Write logs
+ writeLog(time.Now(), "log line #1")
+ writeLog(time.Now(), "log line #2")
+
+ // Logs shouldn't be sent through yet
+ select {
+ case e := <-recv:
+ c.Fatalf("unexpected logs received: %v", e)
+ default:
+ }
+
+ m.Stop()
+ expectLogs("log line #1", "log line #2")
+}
+
+func newLogManagerForTest(
+ tickPeriod time.Duration, bufferCapacity int, recv chan []servicelog.Entry,
+) *LogManager {
+ return &LogManager{
+ forwarders: map[string]*logForwarder{},
+ gatherers: map[string]*logGatherer{},
+ newForwarder: newLogForwarder,
+ newGatherer: func(target *plan.LogTarget) *logGatherer {
+ return newLogGathererForTest(target, tickPeriod, bufferCapacity, recv)
+ },
+ }
+}
+
+func getServiceNames(forwarders map[string]*logForwarder) (serviceNames []string) {
+ for serviceName := range forwarders {
+ serviceNames = append(serviceNames, serviceName)
+ }
+ sort.Strings(serviceNames)
+ return
+}
+
+func getTargets(forwarder *logForwarder) (targetNames []string) {
+ for _, gatherers := range forwarder.gatherers {
+ targetNames = append(targetNames, gatherers.target.Name)
+ }
+ sort.Strings(targetNames)
+ return
+}
diff --git a/internals/overlord/logstate/package_test.go b/internals/overlord/logstate/package_test.go
new file mode 100644
index 00000000..e7fe8c86
--- /dev/null
+++ b/internals/overlord/logstate/package_test.go
@@ -0,0 +1,23 @@
+// Copyright (c) 2023 Canonical Ltd
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License version 3 as
+// published by the Free Software Foundation.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+package logstate
+
+import (
+ "testing"
+
+ . "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) { TestingT(t) }