Skip to content

Commit

Permalink
buffer logGatherer.writeCh
Browse files Browse the repository at this point in the history
  • Loading branch information
barrettj12 committed Jul 4, 2023
1 parent 1f4fe21 commit c8cbae5
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
9 changes: 6 additions & 3 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ func newLogGatherer(target *plan.LogTarget) *logGatherer {
tickPeriod: tickPeriod,
buffer: newLogBuffer(target),
client: newLogClient(target),
writeCh: make(chan struct{}),
cancel: make(chan struct{}),
// writeCh should be buffered, so that addLog can send write notifications,
// even when the control loop is not ready to receive.
writeCh: make(chan struct{}, 1),
cancel: make(chan struct{}),
}
}

Expand Down Expand Up @@ -85,8 +87,8 @@ func (g *logGatherer) loop() {

func (g *logGatherer) addLog(entry servicelog.Entry) {
g.bufferLock.Lock()
defer g.bufferLock.Unlock()
g.buffer.Write(entry)
g.bufferLock.Unlock()

// Try to notify the control loop of a new write to the buffer.
// We don't want this method to block, so if the control loop is not ready
Expand Down Expand Up @@ -119,6 +121,7 @@ func (g *logGatherer) flush(force bool) {
req, err := g.buffer.Request()
if err != nil {
logger.Noticef("couldn't generate request for target %q: %v", g.target.Name, err)
return
}

err = g.client.Send(req)
Expand Down
15 changes: 6 additions & 9 deletions internals/overlord/logstate/gatherer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,13 @@ func newLogGathererForTest(
target *plan.LogTarget,
tickPeriod time.Duration, bufferCapacity int, recv chan []servicelog.Entry,
) *logGatherer {
return &logGatherer{
target: target,
tickPeriod: tickPeriod,
buffer: &testBuffer{
capacity: bufferCapacity,
},
client: &testClient{recv: recv},
writeCh: make(chan struct{}),
cancel: make(chan struct{}),
g := newLogGatherer(target)
g.tickPeriod = tickPeriod
g.buffer = &testBuffer{
capacity: bufferCapacity,
}
g.client = &testClient{recv: recv}
return g
}

// testBuffer is a "fake" implementation of logBuffer, for use in testing.
Expand Down

0 comments on commit c8cbae5

Please sign in to comment.