From 288437d51488b4058fd82bc95f30606a3bdf71c0 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Mon, 3 Jul 2023 15:04:45 +1200 Subject: [PATCH 1/5] [WIP] Log forwarding implementation --- internals/overlord/logstate/forwarder.go | 71 ++++++ internals/overlord/logstate/forwarder_test.go | 68 ++++++ internals/overlord/logstate/gatherer.go | 174 +++++++++++++ internals/overlord/logstate/gatherer_test.go | 180 ++++++++++++++ internals/overlord/logstate/manager.go | 94 ++++++- internals/overlord/logstate/manager_test.go | 230 ++++++++++++++++++ internals/overlord/logstate/package_test.go | 23 ++ 7 files changed, 833 insertions(+), 7 deletions(-) create mode 100644 internals/overlord/logstate/forwarder.go create mode 100644 internals/overlord/logstate/forwarder_test.go create mode 100644 internals/overlord/logstate/gatherer.go create mode 100644 internals/overlord/logstate/gatherer_test.go create mode 100644 internals/overlord/logstate/manager_test.go create mode 100644 internals/overlord/logstate/package_test.go diff --git a/internals/overlord/logstate/forwarder.go b/internals/overlord/logstate/forwarder.go new file mode 100644 index 00000000..1edaee00 --- /dev/null +++ b/internals/overlord/logstate/forwarder.go @@ -0,0 +1,71 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package logstate + +import ( + "sync" + + "github.com/canonical/pebble/internals/logger" + "github.com/canonical/pebble/internals/servicelog" +) + +// logForwarder is responsible for pulling logs from a service's ringbuffer, +// and distributing each log message to its logGatherers. Its gatherers field +// holds a reference to the gatherer for each log target that the service is +// sending logs to. +// One logForwarder will run per service. Its forward() method should be run +// in its own goroutine. +type logForwarder struct { + serviceName string + + mu sync.Mutex // mutex for gatherers + gatherers []*logGatherer + + cancel chan struct{} +} + +func newLogForwarder(serviceName string) *logForwarder { + f := &logForwarder{ + serviceName: serviceName, + cancel: make(chan struct{}), + } + + return f +} + +func (f *logForwarder) forward(buffer *servicelog.RingBuffer) { + iterator := buffer.TailIterator() + // TODO: don't use the parser, just pull/write bytes from iterator + parser := servicelog.NewParser(iterator, 1024 /* TODO*/) + + for iterator.Next(f.cancel) { + for parser.Next() { + entry := parser.Entry() + f.mu.Lock() + gatherers := f.gatherers + f.mu.Unlock() + for _, c := range gatherers { + c.addLog(entry) + } + } + if err := parser.Err(); err != nil { + logger.Noticef("Cannot read logs from service %q: %v", f.serviceName, err) + } + } +} + +func (f *logForwarder) stop() { + close(f.cancel) +} diff --git a/internals/overlord/logstate/forwarder_test.go b/internals/overlord/logstate/forwarder_test.go new file mode 100644 index 00000000..6778cb24 --- /dev/null +++ b/internals/overlord/logstate/forwarder_test.go @@ -0,0 +1,68 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package logstate + +import ( + "fmt" + "time" + + "github.com/canonical/pebble/internals/servicelog" + . "gopkg.in/check.v1" +) + +type forwarderSuite struct{} + +var _ = Suite(&forwarderSuite{}) + +func (s *forwarderSuite) TestForwarder(c *C) { + serviceName := "foobar" + ringBuffer := servicelog.NewRingBuffer(1024) + logWriter := servicelog.NewFormatWriter(ringBuffer, serviceName) + + recv1, recv2 := make(chan []servicelog.Entry), make(chan []servicelog.Entry) + gatherer1 := newLogGathererForTest(nil, 1*time.Microsecond, 5, recv1) + go gatherer1.loop() + gatherer2 := newLogGathererForTest(nil, 1*time.Microsecond, 5, recv2) + go gatherer2.loop() + + forwarder := newLogForwarder(serviceName) + go forwarder.forward(ringBuffer) + + forwarder.mu.Lock() + forwarder.gatherers = []*logGatherer{gatherer1, gatherer2} + forwarder.mu.Unlock() + + message := "this is a log line" + _, err := fmt.Fprintln(logWriter, message) + c.Assert(err, IsNil) + + select { + case entries := <-recv1: + c.Assert(entries, HasLen, 1) + c.Check(entries[0].Service, Equals, serviceName) + c.Check(entries[0].Message, Equals, message+"\n") + case <-time.After(10 * time.Millisecond): + c.Fatal("timeout waiting to receive logs from gatherer1") + } + + select { + case entries := <-recv2: + c.Assert(entries, HasLen, 1) + c.Check(entries[0].Service, Equals, serviceName) + c.Check(entries[0].Message, Equals, message+"\n") + case <-time.After(10 * time.Millisecond): + c.Fatal("timeout waiting to receive logs from gatherer2") + } +} diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go new file mode 100644 index 00000000..c6ef9761 --- /dev/null +++ b/internals/overlord/logstate/gatherer.go @@ -0,0 +1,174 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package logstate + +import ( + "io" + "sync" + "time" + + "github.com/canonical/pebble/internals/logger" + + "github.com/canonical/pebble/internals/plan" + "github.com/canonical/pebble/internals/servicelog" +) + +// logGatherer is responsible for collecting service logs from a forwarder, +// writing them to its internal logBuffer, and sending the request via its +// logClient. +// One logGatherer will run per log target. Its loop() method should be run +// in its own goroutine, while the addLog() method can be invoked in a +// separate goroutine by a logForwarder. +// The logGatherer will "flush" and send a request to the client: +// - on a regular cadence (e.g. every 1 second) +// - when the buffer reaches a certain size +// - when it is told to shut down. +type logGatherer struct { + target *plan.LogTarget + + tickPeriod time.Duration + + bufferLock sync.Mutex + buffer logBuffer + client logClient + + writeCh chan struct{} + cancel chan struct{} +} + +func newLogGatherer(target *plan.LogTarget) *logGatherer { + tickPeriod := 1 * time.Second + + return &logGatherer{ + target: target, + tickPeriod: tickPeriod, + buffer: newLogBuffer(target), + client: newLogClient(target), + // writeCh should be buffered, so that addLog can send write notifications, + // even when the control loop is not ready to receive. + writeCh: make(chan struct{}, 1), + cancel: make(chan struct{}), + } +} + +func (g *logGatherer) loop() { + ticker := time.NewTicker(g.tickPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // Timeout - flush + g.flush(true) + + case <-g.writeCh: + // Got a write - check if buffer is full + g.flush(false) + + case <-g.cancel: + // Gatherer has been stopped - flush any remaining logs + g.flush(true) + return + } + } +} + +func (g *logGatherer) addLog(entry servicelog.Entry) { + g.bufferLock.Lock() + g.buffer.Write(entry) + g.bufferLock.Unlock() + + // Try to notify the control loop of a new write to the buffer. + // We don't want this method to block, so if the control loop is not ready + // to receive, then drop the notification. + // TODO: this is getting dropped 99% of the time. Not good. + select { + case g.writeCh <- struct{}{}: + default: + } +} + +// flush obtains a lock on the buffer, prepares the request, sends to the +// remote server, and empties the buffer. +// If force is false, flush will check first if the buffer is full, and only +// flush if it is full. +func (g *logGatherer) flush(force bool) { + g.bufferLock.Lock() + defer g.bufferLock.Unlock() + + if g.buffer.IsEmpty() { + // No point doing anything + return + } + if !force { + if !g.buffer.IsFull() { + return + } + } + + req, err := g.buffer.Request() + if err != nil { + logger.Noticef("couldn't generate request for target %q: %v", g.target.Name, err) + return + } + + err = g.client.Send(req) + if err != nil { + logger.Noticef("couldn't send logs to target %q: %v", g.target.Name, err) + } + + g.buffer.Reset() +} + +// stop closes the cancel channel, thereby terminating the main loop. +func (g *logGatherer) stop() { + close(g.cancel) +} + +// logBuffer is an interface encapsulating format-specific buffering of log +// messages. E.g. a logBuffer for Loki would encode the log messages in the +// JSON format expected by Loki. +// A logBuffer's methods may not be concurrency-safe. Callers should protect +// the logBuffer using a sync.Mutex. +type logBuffer interface { + IsEmpty() bool + IsFull() bool + + // Write encodes the provided log message and adds it to the buffer. + Write(servicelog.Entry) // TODO: return error? + + // Request returns an io.Reader which can be used as the body of a request + // to the remote log target. + Request() (io.Reader, error) + + // Reset empties the buffer. + Reset() +} + +func newLogBuffer(target *plan.LogTarget) logBuffer { + // TODO: check target.Type and return the corresponding logBuffer + return nil +} + +// logClient is implemented by a client to a specific type of log target. +// It sends requests using the protocol preferred by that log target. +type logClient interface { + Send(io.Reader) error +} + +func newLogClient(target *plan.LogTarget) logClient { + // TODO: check target.Type and return the corresponding logClient + return nil +} diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go new file mode 100644 index 00000000..b6ecab73 --- /dev/null +++ b/internals/overlord/logstate/gatherer_test.go @@ -0,0 +1,180 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package logstate + +import ( + "bytes" + "io" + "time" + + "github.com/canonical/pebble/internals/plan" + "gopkg.in/yaml.v3" + + "github.com/canonical/pebble/internals/servicelog" + + . "gopkg.in/check.v1" +) + +type gathererSuite struct{} + +var _ = Suite(&gathererSuite{}) + +func (s *gathererSuite) TestGathererBufferFull(c *C) { + recv := make(chan []servicelog.Entry) + g := newLogGathererForTest(nil, 1*time.Hour, 5, recv) + go g.loop() + + entries := []servicelog.Entry{{ + Time: time.Date(2023, 1, 1, 14, 34, 56, 789, time.UTC), + Service: "foobar", + Message: "log line #1", + }, { + Time: time.Date(2023, 1, 1, 14, 34, 57, 789, time.UTC), + Service: "foobar", + Message: "log line #2", + }, { + Time: time.Date(2023, 1, 1, 14, 34, 58, 789, time.UTC), + Service: "foobar", + Message: "log line #3", + }, { + Time: time.Date(2023, 1, 1, 14, 34, 59, 789, time.UTC), + Service: "foobar", + Message: "log line #4", + }, { + Time: time.Date(2023, 1, 1, 14, 35, 14, 789, time.UTC), + Service: "foobar", + Message: "log line #5", + }} + + for _, entry := range entries { + c.Assert(g.buffer.IsFull(), Equals, false) + g.addLog(entry) + } + c.Assert(g.buffer.IsFull(), Equals, true) + + select { + case received := <-recv: + c.Assert(received, DeepEquals, entries) + case <-time.After(10 * time.Millisecond): + c.Fatal("timeout waiting to receive logs") + } +} + +func (s *gathererSuite) TestGathererTimeout(c *C) { + recv := make(chan []servicelog.Entry) + g := newLogGathererForTest(nil, 1*time.Microsecond, 2, recv) + go g.loop() + + entry := servicelog.Entry{ + Time: time.Date(2023, 1, 1, 14, 34, 56, 789, time.UTC), + Service: "foobar", + Message: "this is a log", + } + g.addLog(entry) + c.Assert(g.buffer.IsFull(), Equals, false) + + select { + case entries := <-recv: + c.Assert(entries, DeepEquals, []servicelog.Entry{entry}) + case <-time.After(10 * time.Millisecond): + c.Fatal("timeout waiting to receive logs") + } +} + +func (s *gathererSuite) TestGathererStop(c *C) { + recv := make(chan []servicelog.Entry) + g := newLogGathererForTest(nil, 1*time.Hour, 5, recv) + go g.loop() + + entry := servicelog.Entry{ + Time: time.Date(2023, 1, 1, 14, 34, 56, 789, time.UTC), + Service: "foobar", + Message: "this is a log", + } + g.addLog(entry) + c.Assert(g.buffer.IsFull(), Equals, false) + + g.stop() + select { + case entries := <-recv: + c.Assert(entries, DeepEquals, []servicelog.Entry{entry}) + case <-time.After(10 * time.Millisecond): + c.Fatal("timeout waiting to receive logs") + } +} + +func newLogGathererForTest( + target *plan.LogTarget, + tickPeriod time.Duration, bufferCapacity int, recv chan []servicelog.Entry, +) *logGatherer { + g := newLogGatherer(target) + g.tickPeriod = tickPeriod + g.buffer = &testBuffer{ + capacity: bufferCapacity, + } + g.client = &testClient{recv: recv} + return g +} + +// testBuffer is a "fake" implementation of logBuffer, for use in testing. +// It stores log entries internally in a slice. +type testBuffer struct { + entries []servicelog.Entry + capacity int +} + +func (b *testBuffer) Write(entry servicelog.Entry) { + b.entries = append(b.entries, entry) +} + +func (b *testBuffer) IsEmpty() bool { + return len(b.entries) == 0 +} + +func (b *testBuffer) IsFull() bool { + return len(b.entries) >= b.capacity +} + +// Request returns an io.Reader reading a YAML encoding of the entries +// currently stored in the testBuffer's entries slice. +func (b *testBuffer) Request() (io.Reader, error) { + req, err := yaml.Marshal(b.entries) + if err != nil { + return nil, err + } + return bytes.NewReader(req), nil +} + +func (b *testBuffer) Reset() { + b.entries = []servicelog.Entry{} +} + +// testClient is a "fake" implementation of logClient, for use in testing. +type testClient struct { + recv chan []servicelog.Entry +} + +// Send reads from the provided io.Reader, attempts to decode from YAML to a +// []servicelog.Entry, then sends the decoded value on the recv channel. +func (c testClient) Send(body io.Reader) error { + entries := []servicelog.Entry{} + decoder := yaml.NewDecoder(body) + err := decoder.Decode(&entries) + if err != nil { + return err + } + c.recv <- entries + return nil +} diff --git a/internals/overlord/logstate/manager.go b/internals/overlord/logstate/manager.go index 4ee2b235..3b46d96e 100644 --- a/internals/overlord/logstate/manager.go +++ b/internals/overlord/logstate/manager.go @@ -15,26 +15,98 @@ package logstate import ( + "sync" + + "github.com/canonical/pebble/internals/logger" "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" ) -type LogManager struct{} +type LogManager struct { + forwarders map[string]*logForwarder + gatherers map[string]*logGatherer + + newForwarder func(serviceName string) *logForwarder + newGatherer func(*plan.LogTarget) *logGatherer +} func NewLogManager() *LogManager { - return &LogManager{} + return &LogManager{ + forwarders: map[string]*logForwarder{}, + gatherers: map[string]*logGatherer{}, + newForwarder: newLogForwarder, + newGatherer: newLogGatherer, + } } -// PlanChanged is called by the service manager when the plan changes. We stop -// all running forwarders, and start new forwarders based on the new plan. +// PlanChanged is called by the service manager when the plan changes. We update the list of gatherers for each forwarder based on the new plan. func (m *LogManager) PlanChanged(pl *plan.Plan) { - // TODO: implement + // Create a map to hold forwarders/gatherers for the new plan. + // Old forwarders/gatherers will be moved over or deleted. + newForwarders := make(map[string]*logForwarder, len(pl.Services)) + newGatherers := make(map[string]*logGatherer, len(pl.LogTargets)) + + for serviceName, service := range pl.Services { + forwarder := m.forwarders[serviceName] + if forwarder == nil { + // Create new forwarder + forwarder = m.newForwarder(serviceName) + newForwarders[serviceName] = forwarder + } else { + // Copy over existing forwarder + newForwarders[serviceName] = forwarder + delete(m.forwarders, serviceName) + } + + // update clients + forwarder.mu.Lock() + forwarder.gatherers = []*logGatherer{} + + for _, target := range pl.LogTargets { + // Only create the gatherer if there is a service logging to it. + // Don't need gatherers for disabled or unselected targets. + if service.LogsTo(target) { + gatherer := m.gatherers[serviceName] + if gatherer == nil { + // Create new gatherer + gatherer = m.newGatherer(target) + go gatherer.loop() + newGatherers[target.Name] = gatherer + } else { + // Copy over existing gatherer + newGatherers[target.Name] = gatherer + delete(m.gatherers, target.Name) + } + + forwarder.gatherers = append(forwarder.gatherers, gatherer) + } + } + + forwarder.mu.Unlock() + } + + // Old forwarders for now-removed services need to be shut down. + for _, forwarder := range m.forwarders { + forwarder.stop() + } + m.forwarders = newForwarders + + // Same with old gatherers. + for _, gatherer := range m.gatherers { + gatherer.stop() + } + m.gatherers = newGatherers } // ServiceStarted notifies the log manager that the named service has started, // and provides a reference to the service's log buffer. func (m *LogManager) ServiceStarted(serviceName string, buffer *servicelog.RingBuffer) { - // TODO: implement + forwarder := m.forwarders[serviceName] + if forwarder == nil { + logger.Noticef("Internal error: couldn't find forwarder for %q", serviceName) + return + } + go forwarder.forward(buffer) } // Ensure implements overlord.StateManager. @@ -44,5 +116,13 @@ func (m *LogManager) Ensure() error { // Stop implements overlord.StateStopper and stops all log forwarding. func (m *LogManager) Stop() { - // TODO: implement + wg := sync.WaitGroup{} + for _, f := range m.forwarders { + wg.Add(1) + go func(f *logForwarder) { + f.stop() + wg.Done() + }(f) + } + wg.Wait() } diff --git a/internals/overlord/logstate/manager_test.go b/internals/overlord/logstate/manager_test.go new file mode 100644 index 00000000..668a2f2d --- /dev/null +++ b/internals/overlord/logstate/manager_test.go @@ -0,0 +1,230 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package logstate + +import ( + "bytes" + "sync" + "time" + + "github.com/canonical/pebble/internals/logger" + "github.com/canonical/pebble/internals/plan" + "github.com/canonical/pebble/internals/servicelog" + "github.com/canonical/pebble/internals/testutil" + . "gopkg.in/check.v1" +) + +type managerSuite struct { + logbuf *bytes.Buffer + restoreLogger func() +} + +var _ = Suite(&managerSuite{}) + +func (s *managerSuite) SetUpTest(c *C) { + s.logbuf, s.restoreLogger = logger.MockLogger("PREFIX: ") +} + +func (s *managerSuite) TearDownTest(c *C) { + s.restoreLogger() +} + +func (s *managerSuite) TestLogManager(c *C) { + m := newLogManagerForTest(1*time.Second, 10, make(chan []servicelog.Entry)) + // Fake ringbuffer so that log manager can create forwarders + rb := servicelog.RingBuffer{} + + // Call PlanChanged with new plan + m.PlanChanged(&plan.Plan{ + Services: map[string]*plan.Service{ + "svc1": {}, + "svc2": {LogTargets: []string{"tgt3", "tgt4"}}, + "svc3": {LogTargets: []string{"tgt1"}}, + }, + LogTargets: map[string]*plan.LogTarget{ + "tgt1": {Name: "tgt1", Type: plan.LokiTarget, Selection: plan.UnsetSelection}, + "tgt2": {Name: "tgt2", Type: plan.LokiTarget, Selection: plan.OptOutSelection}, + "tgt3": {Name: "tgt3", Type: plan.LokiTarget, Selection: plan.OptInSelection}, + "tgt4": {Name: "tgt4", Type: plan.LokiTarget, Selection: plan.DisabledSelection}, + }, + }) + + // Start the three services. We do this concurrently to simulate Pebble's + // actual service startup, and check there are no race conditions. + var wg sync.WaitGroup + wg.Add(3) + go func() { + defer wg.Done() + m.ServiceStarted("svc1", &rb) + }() + go func() { + defer wg.Done() + m.ServiceStarted("svc2", &rb) + }() + go func() { + defer wg.Done() + m.ServiceStarted("svc3", &rb) + }() + + wg.Wait() + checkForwarders(c, m.forwarders, map[string][]string{ + "svc1": {"tgt1", "tgt2"}, + "svc2": {"tgt3"}, + "svc3": {"tgt1"}, + }) + checkGatherers(c, m.gatherers, []string{"tgt1", "tgt2", "tgt3"}) + + // Update the plan + m.PlanChanged(&plan.Plan{ + Services: map[string]*plan.Service{ + "svc1": {}, + "svc2": {LogTargets: []string{"tgt2", "tgt4"}}, + "svc4": {LogTargets: []string{"tgt3"}}, + }, + LogTargets: map[string]*plan.LogTarget{ + "tgt1": {Name: "tgt1", Type: plan.LokiTarget, Selection: plan.UnsetSelection}, + "tgt2": {Name: "tgt2", Type: plan.LokiTarget, Selection: plan.OptOutSelection}, + "tgt3": {Name: "tgt3", Type: plan.LokiTarget, Selection: plan.OptInSelection}, + "tgt4": {Name: "tgt4", Type: plan.LokiTarget, Selection: plan.DisabledSelection}, + }, + }) + + // Call ServiceStarted + m.ServiceStarted("svc4", &rb) + checkForwarders(c, m.forwarders, map[string][]string{ + "svc1": {"tgt1", "tgt2"}, + "svc2": {"tgt2"}, + "svc4": {"tgt3"}, + }) + checkGatherers(c, m.gatherers, []string{"tgt1", "tgt2", "tgt3"}) +} + +// checkForwarders checks that the arrangement of forwarders -> gatherers is +// as described in the provided map. +func checkForwarders(c *C, forwarders map[string]*logForwarder, expected map[string][]string) { + c.Check(len(forwarders), Equals, len(expected)) + for serviceName := range expected { + forwarder, ok := forwarders[serviceName] + c.Assert(ok, Equals, true) + c.Assert(forwarder, Not(IsNil)) + //forwarder.mu.Lock() - race still passes without this + c.Check(len(forwarder.gatherers), Equals, len(expected[serviceName])) + for _, gatherer := range forwarder.gatherers { + c.Check(expected[serviceName], testutil.Contains, gatherer.target.Name) + } + //forwarder.mu.Unlock() + } +} + +// checkGatherers checks that the expected gatherers exist. +func checkGatherers(c *C, gatherers map[string]*logGatherer, expected []string) { + c.Check(len(gatherers), Equals, len(expected)) + for targetName := range gatherers { + c.Check(expected, testutil.Contains, targetName) + } +} + +//func (s *managerSuite) TestNoLogDuplication(c *C) { +// // Reduce Loki flush time +// flushDelayOld := flushDelay +// flushDelay = 10 * time.Millisecond +// defer func() { +// flushDelay = flushDelayOld +// }() +// +// m := NewLogManager() +// rb := servicelog.NewRingBuffer(1024) +// +// // Set up fake "Loki" server +// requests := make(chan string, 2) +// srv := newFakeLokiServer(requests) +// defer srv.Close() +// +// // Utility functions for this test +// writeLog := func(timestamp time.Time, logLine string) { +// _, err := fmt.Fprintf(rb, "%s [svc1] %s\n", +// timestamp.UTC().Format("2006-01-02T15:04:05.000Z07:00"), logLine) +// c.Assert(err, IsNil) +// } +// expectLogs := func(expected string) { +// select { +// case req := <-requests: +// c.Assert(req, Equals, expected) +// case <-time.After(1 * time.Second): +// c.Fatalf("timed out waiting for request %q", expected) +// } +// } +// +// m.PlanChanged(&plan.Plan{ +// Services: map[string]*plan.Service{ +// "svc1": {}, +// }, +// LogTargets: map[string]*plan.LogTarget{ +// "tgt1": { +// Type: plan.LokiTarget, +// Location: srv.URL(), +// Selection: plan.UnsetSelection, +// }, +// }, +// }) +// m.ServiceStarted("svc1", rb) +// c.Assert(m.forwarders, HasLen, 1) +// +// // Write logs +// writeLog(time.Date(2023, 1, 31, 1, 23, 45, 67890, time.UTC), "log line #1") +// writeLog(time.Date(2023, 1, 31, 1, 23, 46, 67890, time.UTC), "log line #2") +// expectLogs(`{"streams":[{"stream":{"pebble_service":"svc1"},"values":[["1675128225000000000","log line #1"],["1675128226000000000","log line #2"]]}]}`) +// +// // Call PlanChanged again +// m.PlanChanged(&plan.Plan{ +// Services: map[string]*plan.Service{ +// "svc1": {}, +// }, +// LogTargets: map[string]*plan.LogTarget{ +// "tgt1": { +// Type: plan.LokiTarget, +// Location: srv.URL(), +// Selection: plan.UnsetSelection, +// }, +// }, +// }) +// c.Check(m.forwarders, HasLen, 1) +// +// // Write logs +// writeLog(time.Date(2023, 1, 31, 1, 23, 47, 67890, time.UTC), "log line #3") +// writeLog(time.Date(2023, 1, 31, 1, 23, 48, 67890, time.UTC), "log line #4") +// expectLogs(`{"streams":[{"stream":{"pebble_service":"svc1"},"values":[["1675128227000000000","log line #3"],["1675128228000000000","log line #4"]]}]}`) +//} + +func (s *managerSuite) TestFlushLogsOnInterrupt(c *C) { + m := newLogManagerForTest(1*time.Second, 10, make(chan []servicelog.Entry)) + + m.Stop() + + // check buffered logs are sent through +} + +func newLogManagerForTest( + tickPeriod time.Duration, bufferCapacity int, recv chan []servicelog.Entry, +) *LogManager { + return &LogManager{ + forwarders: map[string]*logForwarder{}, + gatherers: map[string]*logGatherer{}, + newForwarder: newLogForwarder, // ForTest ? + newGatherer: func(target *plan.LogTarget) *logGatherer { + return newLogGathererForTest(target, tickPeriod, bufferCapacity, recv) + }, + } +} diff --git a/internals/overlord/logstate/package_test.go b/internals/overlord/logstate/package_test.go new file mode 100644 index 00000000..e7fe8c86 --- /dev/null +++ b/internals/overlord/logstate/package_test.go @@ -0,0 +1,23 @@ +// Copyright (c) 2023 Canonical Ltd +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 3 as +// published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package logstate + +import ( + "testing" + + . "gopkg.in/check.v1" +) + +func Test(t *testing.T) { TestingT(t) } From 23d8b2b57b3ceeb510bbe9b266fcf6eef35b9f35 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 5 Jul 2023 11:46:32 +1200 Subject: [PATCH 2/5] fix manager test --- internals/overlord/logstate/manager_test.go | 80 +++++++++------------ 1 file changed, 34 insertions(+), 46 deletions(-) diff --git a/internals/overlord/logstate/manager_test.go b/internals/overlord/logstate/manager_test.go index 668a2f2d..bf6bb49b 100644 --- a/internals/overlord/logstate/manager_test.go +++ b/internals/overlord/logstate/manager_test.go @@ -16,13 +16,14 @@ package logstate import ( "bytes" + "sort" "sync" "time" "github.com/canonical/pebble/internals/logger" "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" - "github.com/canonical/pebble/internals/testutil" + . "gopkg.in/check.v1" ) @@ -49,15 +50,15 @@ func (s *managerSuite) TestLogManager(c *C) { // Call PlanChanged with new plan m.PlanChanged(&plan.Plan{ Services: map[string]*plan.Service{ - "svc1": {}, - "svc2": {LogTargets: []string{"tgt3", "tgt4"}}, - "svc3": {LogTargets: []string{"tgt1"}}, + "svc1": {Name: "svc1"}, + "svc2": {Name: "svc2"}, + "svc3": {Name: "svc3"}, }, LogTargets: map[string]*plan.LogTarget{ - "tgt1": {Name: "tgt1", Type: plan.LokiTarget, Selection: plan.UnsetSelection}, - "tgt2": {Name: "tgt2", Type: plan.LokiTarget, Selection: plan.OptOutSelection}, - "tgt3": {Name: "tgt3", Type: plan.LokiTarget, Selection: plan.OptInSelection}, - "tgt4": {Name: "tgt4", Type: plan.LokiTarget, Selection: plan.DisabledSelection}, + "tgt1": {Name: "tgt1", Type: plan.LokiTarget, Services: []string{"svc1"}}, + "tgt2": {Name: "tgt2", Type: plan.LokiTarget, Services: []string{"all", "-svc2"}}, + "tgt3": {Name: "tgt3", Type: plan.LokiTarget, Services: []string{"svc1", "svc3", "-svc1"}}, + "tgt4": {Name: "tgt4", Type: plan.LokiTarget, Services: []string{}}, }, }) @@ -79,61 +80,48 @@ func (s *managerSuite) TestLogManager(c *C) { }() wg.Wait() - checkForwarders(c, m.forwarders, map[string][]string{ - "svc1": {"tgt1", "tgt2"}, - "svc2": {"tgt3"}, - "svc3": {"tgt1"}, - }) - checkGatherers(c, m.gatherers, []string{"tgt1", "tgt2", "tgt3"}) + c.Assert(getServiceNames(m.forwarders), DeepEquals, []string{"svc1", "svc2", "svc3"}) + c.Assert(getTargets(m.forwarders["svc1"]), DeepEquals, []string{"tgt1", "tgt2"}) + c.Assert(getTargets(m.forwarders["svc2"]), DeepEquals, []string(nil)) + c.Assert(getTargets(m.forwarders["svc3"]), DeepEquals, []string{"tgt2", "tgt3"}) // Update the plan m.PlanChanged(&plan.Plan{ Services: map[string]*plan.Service{ - "svc1": {}, - "svc2": {LogTargets: []string{"tgt2", "tgt4"}}, - "svc4": {LogTargets: []string{"tgt3"}}, + "svc1": {Name: "svc1"}, + "svc2": {Name: "svc2"}, + "svc4": {Name: "svc4"}, }, LogTargets: map[string]*plan.LogTarget{ - "tgt1": {Name: "tgt1", Type: plan.LokiTarget, Selection: plan.UnsetSelection}, - "tgt2": {Name: "tgt2", Type: plan.LokiTarget, Selection: plan.OptOutSelection}, - "tgt3": {Name: "tgt3", Type: plan.LokiTarget, Selection: plan.OptInSelection}, - "tgt4": {Name: "tgt4", Type: plan.LokiTarget, Selection: plan.DisabledSelection}, + "tgt1": {Name: "tgt1", Type: plan.LokiTarget, Services: []string{"svc1", "svc2"}}, + "tgt2": {Name: "tgt2", Type: plan.LokiTarget, Services: []string{"svc2"}}, + "tgt3": {Name: "tgt3", Type: plan.LokiTarget, Services: []string{}}, + "tgt4": {Name: "tgt4", Type: plan.LokiTarget, Services: []string{"all"}}, }, }) // Call ServiceStarted m.ServiceStarted("svc4", &rb) - checkForwarders(c, m.forwarders, map[string][]string{ - "svc1": {"tgt1", "tgt2"}, - "svc2": {"tgt2"}, - "svc4": {"tgt3"}, - }) - checkGatherers(c, m.gatherers, []string{"tgt1", "tgt2", "tgt3"}) + c.Assert(getServiceNames(m.forwarders), DeepEquals, []string{"svc1", "svc2", "svc4"}) + c.Assert(getTargets(m.forwarders["svc1"]), DeepEquals, []string{"tgt1", "tgt4"}) + c.Assert(getTargets(m.forwarders["svc2"]), DeepEquals, []string{"tgt1", "tgt2", "tgt4"}) + c.Assert(getTargets(m.forwarders["svc4"]), DeepEquals, []string{"tgt4"}) } -// checkForwarders checks that the arrangement of forwarders -> gatherers is -// as described in the provided map. -func checkForwarders(c *C, forwarders map[string]*logForwarder, expected map[string][]string) { - c.Check(len(forwarders), Equals, len(expected)) - for serviceName := range expected { - forwarder, ok := forwarders[serviceName] - c.Assert(ok, Equals, true) - c.Assert(forwarder, Not(IsNil)) - //forwarder.mu.Lock() - race still passes without this - c.Check(len(forwarder.gatherers), Equals, len(expected[serviceName])) - for _, gatherer := range forwarder.gatherers { - c.Check(expected[serviceName], testutil.Contains, gatherer.target.Name) - } - //forwarder.mu.Unlock() +func getServiceNames(forwarders map[string]*logForwarder) (serviceNames []string) { + for serviceName := range forwarders { + serviceNames = append(serviceNames, serviceName) } + sort.Strings(serviceNames) + return } -// checkGatherers checks that the expected gatherers exist. -func checkGatherers(c *C, gatherers map[string]*logGatherer, expected []string) { - c.Check(len(gatherers), Equals, len(expected)) - for targetName := range gatherers { - c.Check(expected, testutil.Contains, targetName) +func getTargets(forwarder *logForwarder) (targetNames []string) { + for _, gatherers := range forwarder.gatherers { + targetNames = append(targetNames, gatherers.target.Name) } + sort.Strings(targetNames) + return } //func (s *managerSuite) TestNoLogDuplication(c *C) { From bad0611a935cee638979cedc9567868cab64ce4d Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 5 Jul 2023 12:25:35 +1200 Subject: [PATCH 3/5] add TestNoLogDuplication for LogManager --- internals/overlord/logstate/manager_test.go | 132 +++++++++----------- 1 file changed, 60 insertions(+), 72 deletions(-) diff --git a/internals/overlord/logstate/manager_test.go b/internals/overlord/logstate/manager_test.go index bf6bb49b..b9360457 100644 --- a/internals/overlord/logstate/manager_test.go +++ b/internals/overlord/logstate/manager_test.go @@ -16,6 +16,7 @@ package logstate import ( "bytes" + "fmt" "sort" "sync" "time" @@ -124,80 +125,67 @@ func getTargets(forwarder *logForwarder) (targetNames []string) { return } -//func (s *managerSuite) TestNoLogDuplication(c *C) { -// // Reduce Loki flush time -// flushDelayOld := flushDelay -// flushDelay = 10 * time.Millisecond -// defer func() { -// flushDelay = flushDelayOld -// }() -// -// m := NewLogManager() -// rb := servicelog.NewRingBuffer(1024) -// -// // Set up fake "Loki" server -// requests := make(chan string, 2) -// srv := newFakeLokiServer(requests) -// defer srv.Close() -// -// // Utility functions for this test -// writeLog := func(timestamp time.Time, logLine string) { -// _, err := fmt.Fprintf(rb, "%s [svc1] %s\n", -// timestamp.UTC().Format("2006-01-02T15:04:05.000Z07:00"), logLine) -// c.Assert(err, IsNil) -// } -// expectLogs := func(expected string) { -// select { -// case req := <-requests: -// c.Assert(req, Equals, expected) -// case <-time.After(1 * time.Second): -// c.Fatalf("timed out waiting for request %q", expected) -// } -// } -// -// m.PlanChanged(&plan.Plan{ -// Services: map[string]*plan.Service{ -// "svc1": {}, -// }, -// LogTargets: map[string]*plan.LogTarget{ -// "tgt1": { -// Type: plan.LokiTarget, -// Location: srv.URL(), -// Selection: plan.UnsetSelection, -// }, -// }, -// }) -// m.ServiceStarted("svc1", rb) -// c.Assert(m.forwarders, HasLen, 1) -// -// // Write logs -// writeLog(time.Date(2023, 1, 31, 1, 23, 45, 67890, time.UTC), "log line #1") -// writeLog(time.Date(2023, 1, 31, 1, 23, 46, 67890, time.UTC), "log line #2") -// expectLogs(`{"streams":[{"stream":{"pebble_service":"svc1"},"values":[["1675128225000000000","log line #1"],["1675128226000000000","log line #2"]]}]}`) -// -// // Call PlanChanged again -// m.PlanChanged(&plan.Plan{ -// Services: map[string]*plan.Service{ -// "svc1": {}, -// }, -// LogTargets: map[string]*plan.LogTarget{ -// "tgt1": { -// Type: plan.LokiTarget, -// Location: srv.URL(), -// Selection: plan.UnsetSelection, -// }, -// }, -// }) -// c.Check(m.forwarders, HasLen, 1) -// -// // Write logs -// writeLog(time.Date(2023, 1, 31, 1, 23, 47, 67890, time.UTC), "log line #3") -// writeLog(time.Date(2023, 1, 31, 1, 23, 48, 67890, time.UTC), "log line #4") -// expectLogs(`{"streams":[{"stream":{"pebble_service":"svc1"},"values":[["1675128227000000000","log line #3"],["1675128228000000000","log line #4"]]}]}`) -//} +func (s *managerSuite) TestNoLogDuplication(c *C) { + recv := make(chan []servicelog.Entry) + m := newLogManagerForTest(10*time.Microsecond, 10, recv) + rb := servicelog.NewRingBuffer(1024) + + // Utility functions for this test + writeLog := func(timestamp time.Time, logLine string) { + _, err := fmt.Fprintf(rb, "%s [svc1] %s\n", + timestamp.UTC().Format("2006-01-02T15:04:05.000Z07:00"), logLine) + c.Assert(err, IsNil) + } + expectLogs := func(expected ...string) { + select { + case entries := <-recv: + c.Assert(entries, HasLen, len(expected)) + for i, entry := range entries { + c.Check(entry.Message, Equals, expected[i]+"\n") + } + + case <-time.After(10 * time.Millisecond): + c.Fatalf("timed out waiting for request %q", expected) + } + } + + m.PlanChanged(&plan.Plan{ + Services: map[string]*plan.Service{ + "svc1": {Name: "svc1"}, + }, + LogTargets: map[string]*plan.LogTarget{ + "tgt1": {Name: "tgt1", Services: []string{"svc1"}}, + }, + }) + m.ServiceStarted("svc1", rb) + c.Assert(getServiceNames(m.forwarders), DeepEquals, []string{"svc1"}) + c.Assert(getTargets(m.forwarders["svc1"]), DeepEquals, []string{"tgt1"}) + + // Write logs + writeLog(time.Now(), "log line #1") + writeLog(time.Now(), "log line #2") + expectLogs("log line #1", "log line #2") + + // Call PlanChanged again + m.PlanChanged(&plan.Plan{ + Services: map[string]*plan.Service{ + "svc1": {Name: "svc1"}, + }, + LogTargets: map[string]*plan.LogTarget{ + "tgt1": {Name: "tgt1", Services: []string{"svc1"}}, + }, + }) + c.Assert(getServiceNames(m.forwarders), DeepEquals, []string{"svc1"}) + c.Assert(getTargets(m.forwarders["svc1"]), DeepEquals, []string{"tgt1"}) + + // Write logs + writeLog(time.Now(), "log line #3") + writeLog(time.Now(), "log line #4") + expectLogs("log line #3", "log line #4") +} func (s *managerSuite) TestFlushLogsOnInterrupt(c *C) { - m := newLogManagerForTest(1*time.Second, 10, make(chan []servicelog.Entry)) + m := newLogManagerForTest(1*time.Hour, 10, make(chan []servicelog.Entry)) m.Stop() From 2dc14488b8ceb033ffef823ef91629a10c37de57 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 5 Jul 2023 13:08:46 +1200 Subject: [PATCH 4/5] write TestFlushLogsOnInterrupt for LogManager - move test helper functions to end of manager_test.go file - remove unused mock logger from managerSuite --- internals/overlord/logstate/manager_test.go | 96 ++++++++++++++------- 1 file changed, 63 insertions(+), 33 deletions(-) diff --git a/internals/overlord/logstate/manager_test.go b/internals/overlord/logstate/manager_test.go index b9360457..141e2f74 100644 --- a/internals/overlord/logstate/manager_test.go +++ b/internals/overlord/logstate/manager_test.go @@ -15,34 +15,21 @@ package logstate import ( - "bytes" "fmt" "sort" "sync" "time" - "github.com/canonical/pebble/internals/logger" "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" . "gopkg.in/check.v1" ) -type managerSuite struct { - logbuf *bytes.Buffer - restoreLogger func() -} +type managerSuite struct{} var _ = Suite(&managerSuite{}) -func (s *managerSuite) SetUpTest(c *C) { - s.logbuf, s.restoreLogger = logger.MockLogger("PREFIX: ") -} - -func (s *managerSuite) TearDownTest(c *C) { - s.restoreLogger() -} - func (s *managerSuite) TestLogManager(c *C) { m := newLogManagerForTest(1*time.Second, 10, make(chan []servicelog.Entry)) // Fake ringbuffer so that log manager can create forwarders @@ -109,22 +96,6 @@ func (s *managerSuite) TestLogManager(c *C) { c.Assert(getTargets(m.forwarders["svc4"]), DeepEquals, []string{"tgt4"}) } -func getServiceNames(forwarders map[string]*logForwarder) (serviceNames []string) { - for serviceName := range forwarders { - serviceNames = append(serviceNames, serviceName) - } - sort.Strings(serviceNames) - return -} - -func getTargets(forwarder *logForwarder) (targetNames []string) { - for _, gatherers := range forwarder.gatherers { - targetNames = append(targetNames, gatherers.target.Name) - } - sort.Strings(targetNames) - return -} - func (s *managerSuite) TestNoLogDuplication(c *C) { recv := make(chan []servicelog.Entry) m := newLogManagerForTest(10*time.Microsecond, 10, recv) @@ -185,11 +156,54 @@ func (s *managerSuite) TestNoLogDuplication(c *C) { } func (s *managerSuite) TestFlushLogsOnInterrupt(c *C) { - m := newLogManagerForTest(1*time.Hour, 10, make(chan []servicelog.Entry)) + recv := make(chan []servicelog.Entry) + m := newLogManagerForTest(10*time.Microsecond, 10, recv) + rb := servicelog.NewRingBuffer(1024) - m.Stop() + // Utility functions for this test + writeLog := func(timestamp time.Time, logLine string) { + _, err := fmt.Fprintf(rb, "%s [svc1] %s\n", + timestamp.UTC().Format("2006-01-02T15:04:05.000Z07:00"), logLine) + c.Assert(err, IsNil) + } + expectLogs := func(expected ...string) { + select { + case entries := <-recv: + c.Assert(entries, HasLen, len(expected)) + for i, entry := range entries { + c.Check(entry.Message, Equals, expected[i]+"\n") + } - // check buffered logs are sent through + case <-time.After(10 * time.Millisecond): + c.Fatalf("timed out waiting for request %q", expected) + } + } + + m.PlanChanged(&plan.Plan{ + Services: map[string]*plan.Service{ + "svc1": {Name: "svc1"}, + }, + LogTargets: map[string]*plan.LogTarget{ + "tgt1": {Name: "tgt1", Services: []string{"svc1"}}, + }, + }) + m.ServiceStarted("svc1", rb) + c.Assert(getServiceNames(m.forwarders), DeepEquals, []string{"svc1"}) + c.Assert(getTargets(m.forwarders["svc1"]), DeepEquals, []string{"tgt1"}) + + // Write logs + writeLog(time.Now(), "log line #1") + writeLog(time.Now(), "log line #2") + + // Logs shouldn't be sent through yet + select { + case e := <-recv: + c.Fatalf("unexpected logs received: %v", e) + default: + } + + m.Stop() + expectLogs("log line #1", "log line #2") } func newLogManagerForTest( @@ -204,3 +218,19 @@ func newLogManagerForTest( }, } } + +func getServiceNames(forwarders map[string]*logForwarder) (serviceNames []string) { + for serviceName := range forwarders { + serviceNames = append(serviceNames, serviceName) + } + sort.Strings(serviceNames) + return +} + +func getTargets(forwarder *logForwarder) (targetNames []string) { + for _, gatherers := range forwarder.gatherers { + targetNames = append(targetNames, gatherers.target.Name) + } + sort.Strings(targetNames) + return +} From ebd32f99a253ac0f16c083768e4a9c412d935534 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Wed, 5 Jul 2023 14:27:18 +1200 Subject: [PATCH 5/5] fix up import order - fix up some comments - rearrange logGatherer fields - remove LogTarget.Type in test --- internals/overlord/logstate/forwarder_test.go | 3 ++- internals/overlord/logstate/gatherer.go | 26 ++++++++----------- internals/overlord/logstate/gatherer_test.go | 10 +++---- internals/overlord/logstate/manager.go | 2 +- internals/overlord/logstate/manager_test.go | 22 ++++++++-------- 5 files changed, 28 insertions(+), 35 deletions(-) diff --git a/internals/overlord/logstate/forwarder_test.go b/internals/overlord/logstate/forwarder_test.go index 6778cb24..5569dfbb 100644 --- a/internals/overlord/logstate/forwarder_test.go +++ b/internals/overlord/logstate/forwarder_test.go @@ -18,8 +18,9 @@ import ( "fmt" "time" - "github.com/canonical/pebble/internals/servicelog" . "gopkg.in/check.v1" + + "github.com/canonical/pebble/internals/servicelog" ) type forwarderSuite struct{} diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index c6ef9761..95523570 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -20,7 +20,6 @@ import ( "time" "github.com/canonical/pebble/internals/logger" - "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" ) @@ -36,16 +35,14 @@ import ( // - when the buffer reaches a certain size // - when it is told to shut down. type logGatherer struct { - target *plan.LogTarget - + target *plan.LogTarget tickPeriod time.Duration + writeCh chan struct{} + cancel chan struct{} bufferLock sync.Mutex buffer logBuffer client logClient - - writeCh chan struct{} - cancel chan struct{} } func newLogGatherer(target *plan.LogTarget) *logGatherer { @@ -54,12 +51,12 @@ func newLogGatherer(target *plan.LogTarget) *logGatherer { return &logGatherer{ target: target, tickPeriod: tickPeriod, - buffer: newLogBuffer(target), - client: newLogClient(target), // writeCh should be buffered, so that addLog can send write notifications, // even when the control loop is not ready to receive. writeCh: make(chan struct{}, 1), cancel: make(chan struct{}), + buffer: newLogBuffer(target), + client: newLogClient(target), } } @@ -91,9 +88,8 @@ func (g *logGatherer) addLog(entry servicelog.Entry) { g.bufferLock.Unlock() // Try to notify the control loop of a new write to the buffer. - // We don't want this method to block, so if the control loop is not ready - // to receive, then drop the notification. - // TODO: this is getting dropped 99% of the time. Not good. + // If there is already a notification waiting, no need to notify again - just + // drop it. select { case g.writeCh <- struct{}{}: default: @@ -112,10 +108,9 @@ func (g *logGatherer) flush(force bool) { // No point doing anything return } - if !force { - if !g.buffer.IsFull() { - return - } + if !force && !g.buffer.IsFull() { + // Not ready to flush yet + return } req, err := g.buffer.Request() @@ -127,6 +122,7 @@ func (g *logGatherer) flush(force bool) { err = g.client.Send(req) if err != nil { logger.Noticef("couldn't send logs to target %q: %v", g.target.Name, err) + // TODO: early return here? should we reset buffer if send fails? } g.buffer.Reset() diff --git a/internals/overlord/logstate/gatherer_test.go b/internals/overlord/logstate/gatherer_test.go index b6ecab73..8b22c44c 100644 --- a/internals/overlord/logstate/gatherer_test.go +++ b/internals/overlord/logstate/gatherer_test.go @@ -19,12 +19,11 @@ import ( "io" "time" - "github.com/canonical/pebble/internals/plan" + . "gopkg.in/check.v1" "gopkg.in/yaml.v3" + "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" - - . "gopkg.in/check.v1" ) type gathererSuite struct{} @@ -115,10 +114,7 @@ func (s *gathererSuite) TestGathererStop(c *C) { } } -func newLogGathererForTest( - target *plan.LogTarget, - tickPeriod time.Duration, bufferCapacity int, recv chan []servicelog.Entry, -) *logGatherer { +func newLogGathererForTest(target *plan.LogTarget, tickPeriod time.Duration, bufferCapacity int, recv chan []servicelog.Entry) *logGatherer { g := newLogGatherer(target) g.tickPeriod = tickPeriod g.buffer = &testBuffer{ diff --git a/internals/overlord/logstate/manager.go b/internals/overlord/logstate/manager.go index 3b46d96e..feff88f6 100644 --- a/internals/overlord/logstate/manager.go +++ b/internals/overlord/logstate/manager.go @@ -47,6 +47,7 @@ func (m *LogManager) PlanChanged(pl *plan.Plan) { newGatherers := make(map[string]*logGatherer, len(pl.LogTargets)) for serviceName, service := range pl.Services { + // TODO: don't create forwarders if there are no targets for this service? forwarder := m.forwarders[serviceName] if forwarder == nil { // Create new forwarder @@ -64,7 +65,6 @@ func (m *LogManager) PlanChanged(pl *plan.Plan) { for _, target := range pl.LogTargets { // Only create the gatherer if there is a service logging to it. - // Don't need gatherers for disabled or unselected targets. if service.LogsTo(target) { gatherer := m.gatherers[serviceName] if gatherer == nil { diff --git a/internals/overlord/logstate/manager_test.go b/internals/overlord/logstate/manager_test.go index 141e2f74..7e9d5cd9 100644 --- a/internals/overlord/logstate/manager_test.go +++ b/internals/overlord/logstate/manager_test.go @@ -20,10 +20,10 @@ import ( "sync" "time" + . "gopkg.in/check.v1" + "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" - - . "gopkg.in/check.v1" ) type managerSuite struct{} @@ -43,10 +43,10 @@ func (s *managerSuite) TestLogManager(c *C) { "svc3": {Name: "svc3"}, }, LogTargets: map[string]*plan.LogTarget{ - "tgt1": {Name: "tgt1", Type: plan.LokiTarget, Services: []string{"svc1"}}, - "tgt2": {Name: "tgt2", Type: plan.LokiTarget, Services: []string{"all", "-svc2"}}, - "tgt3": {Name: "tgt3", Type: plan.LokiTarget, Services: []string{"svc1", "svc3", "-svc1"}}, - "tgt4": {Name: "tgt4", Type: plan.LokiTarget, Services: []string{}}, + "tgt1": {Name: "tgt1", Services: []string{"svc1"}}, + "tgt2": {Name: "tgt2", Services: []string{"all", "-svc2"}}, + "tgt3": {Name: "tgt3", Services: []string{"svc1", "svc3", "-svc1"}}, + "tgt4": {Name: "tgt4", Services: []string{}}, }, }) @@ -81,10 +81,10 @@ func (s *managerSuite) TestLogManager(c *C) { "svc4": {Name: "svc4"}, }, LogTargets: map[string]*plan.LogTarget{ - "tgt1": {Name: "tgt1", Type: plan.LokiTarget, Services: []string{"svc1", "svc2"}}, - "tgt2": {Name: "tgt2", Type: plan.LokiTarget, Services: []string{"svc2"}}, - "tgt3": {Name: "tgt3", Type: plan.LokiTarget, Services: []string{}}, - "tgt4": {Name: "tgt4", Type: plan.LokiTarget, Services: []string{"all"}}, + "tgt1": {Name: "tgt1", Services: []string{"svc1", "svc2"}}, + "tgt2": {Name: "tgt2", Services: []string{"svc2"}}, + "tgt3": {Name: "tgt3", Services: []string{}}, + "tgt4": {Name: "tgt4", Services: []string{"all"}}, }, }) @@ -212,7 +212,7 @@ func newLogManagerForTest( return &LogManager{ forwarders: map[string]*logForwarder{}, gatherers: map[string]*logGatherer{}, - newForwarder: newLogForwarder, // ForTest ? + newForwarder: newLogForwarder, newGatherer: func(target *plan.LogTarget) *logGatherer { return newLogGathererForTest(target, tickPeriod, bufferCapacity, recv) },