Skip to content

Commit

Permalink
rewrite TestGathererTimeout (why is it failing?)
Browse files Browse the repository at this point in the history
- add error retval to logBuffer.Request
- add test implementations of logBuffer & logClient
- implement LogManager.Stop
  • Loading branch information
barrettj12 committed Jul 3, 2023
1 parent e02e518 commit 4e757b1
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 24 deletions.
1 change: 1 addition & 0 deletions internals/overlord/logstate/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package logstate

import (
Expand Down
30 changes: 19 additions & 11 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type logGatherer struct {
target *plan.LogTarget

tickPeriod time.Duration
ticker *time.Ticker

bufferLock sync.Mutex
buffer logBuffer
Expand All @@ -54,7 +53,6 @@ func newLogGatherer(target *plan.LogTarget) *logGatherer {
return &logGatherer{
target: target,
tickPeriod: tickPeriod,
ticker: time.NewTicker(tickPeriod),
buffer: newLogBuffer(target),
client: newLogClient(target),
writeCh: make(chan struct{}),
Expand All @@ -63,18 +61,21 @@ func newLogGatherer(target *plan.LogTarget) *logGatherer {
}

func (g *logGatherer) loop() {
defer g.ticker.Stop()
ticker := time.NewTicker(g.tickPeriod)
defer ticker.Stop()

for {
select {
case <-g.ticker.C:
//fmt.Println("timeout")
case <-ticker.C:
// Timeout - flush
g.flush(true)
case <-g.writeCh: // is the buffer full?
//fmt.Println("got a write")

case <-g.writeCh:
// Got a write - check if buffer is full
g.flush(false)

case <-g.cancel:
//fmt.Println("cancelled")
// Gatherer has been stopped - flush any remaining logs
g.flush(true)
return
}
Expand Down Expand Up @@ -109,7 +110,12 @@ func (g *logGatherer) flush(force bool) {
}
}

err := g.client.Send(g.buffer.Request())
req, err := g.buffer.Request()
if err != nil {
logger.Noticef("couldn't generate request for target %q: %v", g.target.Name, err)
}

err = g.client.Send(req)
if err != nil {
logger.Noticef("couldn't send logs to target %q: %v", g.target.Name, err)
}
Expand All @@ -128,18 +134,20 @@ func (g *logGatherer) stop() {
type logBuffer interface {
Write(servicelog.Entry)
IsFull() bool
Request() io.Reader
Request() (io.Reader, error)
Reset()
}

func newLogBuffer(target *plan.LogTarget) logBuffer {
return &fakeLogBuffer{}
// TODO: check target.Type and return the corresponding logBuffer
return nil
}

type logClient interface {
Send(io.Reader) error
}

func newLogClient(target *plan.LogTarget) logClient {
// TODO: check target.Type and return the corresponding logClient
return &fakeLogClient{}
}
80 changes: 69 additions & 11 deletions internals/overlord/logstate/gatherer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,93 @@
package logstate

import (
"bytes"
"io"
"time"

"github.com/canonical/pebble/internals/servicelog"
. "gopkg.in/check.v1"
"gopkg.in/yaml.v3"
)

type gathererSuite struct{}

var _ = Suite(&gathererSuite{})

func (s *gathererSuite) TestGathererTimeout(c *C) {
g := newLogGathererForTest(1 * time.Second)
client := &testClient{}
g := newLogGathererForTest(100*time.Nanosecond, 2, client)
go g.loop()

g.addLog(servicelog.Entry{
Time: time.Date(2023, 1, 1, 14, 34, 56, 789, nil),
entry := servicelog.Entry{
Time: time.Date(2023, 1, 1, 14, 34, 56, 789, time.UTC),
Service: "foobar",
Message: "this is a log",
})
time.Sleep(1 * time.Second)
c.Assert(g.buffer.IsFull(), Equals, true)
}
g.addLog(entry)
c.Assert(g.buffer.IsFull(), Equals, false)
c.Assert(client.received, HasLen, 0)

time.Sleep(110 * time.Nanosecond)
c.Assert(client.received, HasLen, 1)
c.Assert(client.received[0], HasLen, 1)
c.Assert(client.received[0][0], DeepEquals, entry)
}

func newLogGathererForTest(tickPeriod time.Duration) *logGatherer {
func newLogGathererForTest(
tickPeriod time.Duration,
bufferCapacity int,
client logClient,
) *logGatherer {
return &logGatherer{
tickPeriod: tickPeriod,
buffer: &fakeLogBuffer{},
client: &fakeLogClient{},
writeCh: make(chan struct{}),
cancel: make(chan struct{}),
buffer: &testBuffer{
capacity: bufferCapacity,
},
client: client,
writeCh: make(chan struct{}),
cancel: make(chan struct{}),
}
}

// Fake implementation of logBuffer, to use in testing
type testBuffer struct {
entries []servicelog.Entry
capacity int
}

func (b *testBuffer) Write(entry servicelog.Entry) {
b.entries = append(b.entries, entry)
}

func (b *testBuffer) IsFull() bool {
return len(b.entries) > b.capacity
}

func (b *testBuffer) Request() (io.Reader, error) {
req, err := yaml.Marshal(b.entries)
if err != nil {
return nil, err
}
return bytes.NewReader(req), nil
}

func (b *testBuffer) Reset() {
b.entries = []servicelog.Entry{}
}

// Fake implementation of logClient, to use in testing
type testClient struct {
received [][]servicelog.Entry
}

func (c testClient) Send(body io.Reader) error {
entries := []servicelog.Entry{}
decoder := yaml.NewDecoder(body)
err := decoder.Decode(&entries)
if err != nil {
return err
}
c.received = append(c.received, entries)
return nil
}
12 changes: 11 additions & 1 deletion internals/overlord/logstate/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package logstate

import (
"sync"

"github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
Expand Down Expand Up @@ -114,5 +116,13 @@ func (m *LogManager) Ensure() error {

// Stop implements overlord.StateStopper and stops all log forwarding.
func (m *LogManager) Stop() {
// TODO: implement
wg := sync.WaitGroup{}
for _, f := range m.forwarders {
wg.Add(1)
go func(f *logForwarder) {
f.stop()
wg.Done()
}(f)
}
wg.Wait()
}
2 changes: 1 addition & 1 deletion internals/overlord/logstate/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func checkGatherers(c *C, gatherers map[string]*logGatherer, expected []string)
// expectLogs(`{"streams":[{"stream":{"pebble_service":"svc1"},"values":[["1675128227000000000","log line #3"],["1675128228000000000","log line #4"]]}]}`)
//}

func (s *managerSuite) TestFlushLogsOnInterrupt() {
func (s *managerSuite) TestFlushLogsOnInterrupt(c *C) {
m := newLogManagerForTest()

m.Stop()
Expand Down

0 comments on commit 4e757b1

Please sign in to comment.