Skip to content

Commit

Permalink
fix TestGathererTimeout
Browse files Browse the repository at this point in the history
- add IsEmpty method to logBuffer interface
- add buffer empty check to logGatherer.flush
  • Loading branch information
barrettj12 committed Jul 3, 2023
1 parent 4e757b1 commit 926e99a
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 16 deletions.
5 changes: 5 additions & 0 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ func (g *logGatherer) flush(force bool) {
g.bufferLock.Lock()
defer g.bufferLock.Unlock()

if g.buffer.IsEmpty() {
// No point doing anything
return
}
if !force {
if !g.buffer.IsFull() {
return
Expand Down Expand Up @@ -133,6 +137,7 @@ func (g *logGatherer) stop() {
// the logBuffer using a sync.Mutex.
type logBuffer interface {
Write(servicelog.Entry)
IsEmpty() bool
IsFull() bool
Request() (io.Reader, error)
Reset()
Expand Down
35 changes: 19 additions & 16 deletions internals/overlord/logstate/gatherer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@ import (
"io"
"time"

"gopkg.in/yaml.v3"

"github.com/canonical/pebble/internals/servicelog"

. "gopkg.in/check.v1"
"gopkg.in/yaml.v3"
)

type gathererSuite struct{}

var _ = Suite(&gathererSuite{})

func (s *gathererSuite) TestGathererTimeout(c *C) {
client := &testClient{}
g := newLogGathererForTest(100*time.Nanosecond, 2, client)
recv := make(chan []servicelog.Entry)
g := newLogGathererForTest(1*time.Microsecond, 2, recv)
go g.loop()

entry := servicelog.Entry{
Expand All @@ -39,25 +41,22 @@ func (s *gathererSuite) TestGathererTimeout(c *C) {
}
g.addLog(entry)
c.Assert(g.buffer.IsFull(), Equals, false)
c.Assert(client.received, HasLen, 0)

time.Sleep(110 * time.Nanosecond)
c.Assert(client.received, HasLen, 1)
c.Assert(client.received[0], HasLen, 1)
c.Assert(client.received[0][0], DeepEquals, entry)
select {
case entries := <-recv:
c.Assert(entries, DeepEquals, []servicelog.Entry{entry})
case <-time.After(10 * time.Millisecond):
c.Fatal("timeout waiting to receive logs")
}
}

func newLogGathererForTest(
tickPeriod time.Duration,
bufferCapacity int,
client logClient,
) *logGatherer {
func newLogGathererForTest(tickPeriod time.Duration, bufferCapacity int, recv chan []servicelog.Entry) *logGatherer {
return &logGatherer{
tickPeriod: tickPeriod,
buffer: &testBuffer{
capacity: bufferCapacity,
},
client: client,
client: &testClient{recv: recv},
writeCh: make(chan struct{}),
cancel: make(chan struct{}),
}
Expand All @@ -73,6 +72,10 @@ func (b *testBuffer) Write(entry servicelog.Entry) {
b.entries = append(b.entries, entry)
}

func (b *testBuffer) IsEmpty() bool {
return len(b.entries) == 0
}

func (b *testBuffer) IsFull() bool {
return len(b.entries) > b.capacity
}
Expand All @@ -91,7 +94,7 @@ func (b *testBuffer) Reset() {

// Fake implementation of logClient, to use in testing
type testClient struct {
received [][]servicelog.Entry
recv chan []servicelog.Entry
}

func (c testClient) Send(body io.Reader) error {
Expand All @@ -101,6 +104,6 @@ func (c testClient) Send(body io.Reader) error {
if err != nil {
return err
}
c.received = append(c.received, entries)
c.recv <- entries
return nil
}

0 comments on commit 926e99a

Please sign in to comment.