Skip to content

Commit

Permalink
use timer instead of ticker
Browse files Browse the repository at this point in the history
  • Loading branch information
barrettj12 committed Aug 8, 2023
1 parent ab46d31 commit 6bae051
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 6 deletions.
47 changes: 43 additions & 4 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,17 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R
// pullers on entryCh, and writes them to the client. It also flushes the
// client periodically, and exits when the gatherer's tomb is killed.
func (g *logGatherer) loop() error {
ticker := time.NewTicker(g.tickPeriod)
defer ticker.Stop()
timer := newTimer()

mainLoop:
for {
select {
case <-g.tomb.Dying():
break mainLoop

case <-ticker.C:
// Timeout - flush
case <-timer.Finished():
// Mark timer as unset
timer.Stop()
err := g.client.Flush(g.clientCtx)
if err != nil {
logger.Noticef("Error sending logs to target %q: %v", g.targetName, err)
Expand All @@ -188,6 +188,8 @@ mainLoop:
if err != nil {
logger.Noticef("Error writing logs to target %q: %v", g.targetName, err)
}
// Set timer if not already set
timer.EnsureSet(g.tickPeriod)
}
}

Expand Down Expand Up @@ -241,6 +243,43 @@ func (g *logGatherer) Stop() {
}
}

// timer wraps time.Timer and provides a better API.
type timer struct {
timer *time.Timer
set bool
}

func newTimer() timer {
t := timer{
timer: time.NewTimer(1 * time.Hour),
}
t.Stop()
return t
}

func (t *timer) Finished() <-chan time.Time {
return t.timer.C
}

func (t *timer) Stop() {
t.timer.Stop()
t.set = false
// Drain timer channel
select {
case <-t.timer.C:
default:
}
}

func (t *timer) EnsureSet(timeout time.Duration) {
if t.set {
return
}

t.timer.Reset(timeout)
t.set = true
}

// logClient handles requests to a specific type of log target. It encodes
// log messages in the required format, and sends the messages using the
// protocol required by that log target.
Expand Down
4 changes: 2 additions & 2 deletions internals/overlord/logstate/gatherer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *gathererSuite) TestGatherer(c *C) {
func (s *gathererSuite) TestGathererTimeout(c *C) {
received := make(chan []servicelog.Entry, 1)
gathererArgs := logGathererArgs{
tickPeriod: 1 * time.Microsecond,
tickPeriod: 1 * time.Millisecond,
newClient: func(target *plan.LogTarget) (logClient, error) {
return &testClient{
bufferSize: 5,
Expand All @@ -86,7 +86,7 @@ func (s *gathererSuite) TestGathererTimeout(c *C) {

testSvc.writeLog("log line #1")
select {
case <-time.After(20 * time.Millisecond):
case <-time.After(1 * time.Second):
c.Fatalf("timeout waiting for logs")
case logs := <-received:
checkLogs(c, logs, []string{"log line #1"})
Expand Down

0 comments on commit 6bae051

Please sign in to comment.