Skip to content

Commit

Permalink
move final flush to gatherer.loop()
Browse files Browse the repository at this point in the history
  • Loading branch information
barrettj12 committed Aug 1, 2023
1 parent 599b89a commit fd3574b
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"gopkg.in/tomb.v2"

"github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)
Expand All @@ -35,7 +34,9 @@ const (
timeoutCurrentFlush = 1 * time.Second
timeoutPullers = 2 * time.Second
timeoutMainLoop = 3 * time.Second
timeoutFinalFlush = 5 * time.Second
// timeoutFinalFlush is measured from when the gatherer's main loop finishes,
// NOT from when stop() is called like the other constants.
timeoutFinalFlush = 2 * time.Second
)

// logGatherer is responsible for collecting service logs from a bunch of
Expand Down Expand Up @@ -207,10 +208,11 @@ func (g *logGatherer) loop() error {
ticker := time.NewTicker(g.tickPeriod)
defer ticker.Stop()

mainLoop:
for {
select {
case <-g.tomb.Dying():
return nil
break mainLoop

case <-ticker.C:
// Timeout - flush
Expand All @@ -226,6 +228,15 @@ func (g *logGatherer) loop() error {
}
}
}

// Final flush to send any remaining logs buffered in the client
ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush)
defer cancel()
err := g.client.Flush(ctx)
if err != nil {
return fmt.Errorf("sending logs to target %q: %v", g.targetName, err)
}
return nil
}

// stop tears down the gatherer and associated resources (pullers, client).
Expand All @@ -239,9 +250,6 @@ func (g *logGatherer) loop() error {
// - Kill the main loop.
// - Flush out any final logs buffered in the client.
func (g *logGatherer) stop() {
// Set deadlines now for the "final flush" teardown step
finalFlushDeadline := time.Now().Add(g.timeoutFinalFlush)

// Wait up to timeoutCurrentFlush for the current flush to complete (if any)
time.AfterFunc(g.timeoutCurrentFlush, g.clientCancel)

Expand All @@ -264,16 +272,8 @@ func (g *logGatherer) stop() {
}

_ = g.tomb.Killf("gatherer stopped")

// Wait up to timeoutFinalFlush for the client to flush all remaining
// buffered logs to the remote.
ctx, cancel := context.WithDeadline(context.Background(), finalFlushDeadline)
defer cancel()
// Wait for final flush in the main loop
_ = g.tomb.Wait()
err := g.client.Flush(ctx)
if err != nil {
logger.Noticef("Cannot send logs to target %q: %v", g.targetName, err)
}
}

// logPuller handles pulling logs from a single iterator and sending to the
Expand Down

0 comments on commit fd3574b

Please sign in to comment.