Skip to content

Commit

Permalink
replace puller tombs with context & waitgroup
Browse files Browse the repository at this point in the history
The contexts allow us to individually kill pullers or kill them all as a group
(by having a shared parent context).
The WaitGroup allows us to wait on all pullers finishing during teardown.
  • Loading branch information
barrettj12 committed Jul 27, 2023
1 parent 80bca7b commit d4abcb9
Showing 1 changed file with 54 additions and 29 deletions.
83 changes: 54 additions & 29 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ type logGatherer struct {

// tomb for the main loop
tomb tomb.Tomb

// this context and WaitGroup are used to manage pullers
// Each puller will use a child context of this one
// We wait on pullersGrp during teardown so we know when all pullers are done
pullersCtx context.Context
killPullers context.CancelFunc
pullersGrp *sync.WaitGroup
}

// logGathererArgs allows overriding the newLogClient method and time values
Expand Down Expand Up @@ -101,8 +108,10 @@ func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logG
client: client,
entryCh: make(chan servicelog.Entry),
pullers: map[string]*logPuller{},
pullersGrp: &sync.WaitGroup{},
}
g.clientCtx, g.clientCancel = context.WithCancel(context.Background())
g.pullersCtx, g.killPullers = context.WithCancel(context.Background())
g.tomb.Go(g.loop)
return g, nil
}
Expand Down Expand Up @@ -156,12 +165,12 @@ func (g *logGatherer) planChanged(pl *plan.Plan, buffers map[string]*servicelog.
}

// Create new puller
newPullers[service.Name] = newLogPuller(g.entryCh, buffer)
newPullers[service.Name] = g.newLogPuller(buffer)
}

// Old pullers for now-removed services need to be shut down.
for _, puller := range g.pullers {
_ = puller.tomb.Killf("plan changed")
puller.kill()
}
g.pullers = newPullers
}
Expand All @@ -172,9 +181,9 @@ func (g *logGatherer) serviceStarted(service *plan.Service, buffer *servicelog.R

if puller, ok := g.pullers[service.Name]; ok {
// This should never happen, but just in case, shut down the old puller.
_ = puller.tomb.Killf("service started")
puller.kill()
}
g.pullers[service.Name] = newLogPuller(g.entryCh, buffer)
g.pullers[service.Name] = g.newLogPuller(buffer)
}

func (g *logGatherer) loop() error {
Expand Down Expand Up @@ -213,29 +222,35 @@ func (g *logGatherer) loop() error {
// - Kill the main loop.
// - Flush out any final logs buffered in the client.
func (g *logGatherer) stop() {
// Set deadlines now for the "final flush" teardown step
finalFlushDeadline := time.Now().Add(g.timeoutFinalFlush)

// Wait up to timeoutCurrentFlush for the current flush to complete (if any)
time.AfterFunc(g.timeoutCurrentFlush, g.clientCancel)

// Wait up to timeoutPullers for the pullers to pull the final logs from the
// iterator and send to the main loop.
time.AfterFunc(g.timeoutPullers, func() {
g.pullersLock.Lock()
defer g.pullersLock.Unlock()
for _, puller := range g.pullers {
_ = puller.tomb.Killf("gatherer stopped")
}
})
time.AfterFunc(g.timeoutPullers, g.killPullers)

// Kill the main loop once either:
// - all the pullers are done
// - timeoutMainLoop has passed
pullersDone := make(chan struct{})
go func() {
g.pullersGrp.Wait()
close(pullersDone)
}()

select {
case <-pullersDone:
case <-time.After(g.timeoutMainLoop):
}

// Wait up to timeoutMainLoop for the main loop to gather all logs from the
// pullers, and write them to the client.
// TODO: we can do this early if all pullers are done
time.AfterFunc(g.timeoutMainLoop, func() {
_ = g.tomb.Killf("gatherer stopped")
})
_ = g.tomb.Killf("gatherer stopped")

// Wait up to timeoutFinalFlush for the client to flush all remaining
// buffered logs to the remote.
ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush)
ctx, cancel := context.WithDeadline(context.Background(), finalFlushDeadline)
defer cancel()
_ = g.tomb.Wait()
err := g.client.Flush(ctx)
Expand All @@ -248,36 +263,46 @@ func (g *logGatherer) stop() {
// main control loop.
type logPuller struct {
iterator servicelog.Iterator
tomb tomb.Tomb
entryCh chan servicelog.Entry

ctx context.Context
kill context.CancelFunc
wg *sync.WaitGroup
}

func newLogPuller(entryCh chan servicelog.Entry, buffer *servicelog.RingBuffer) *logPuller {
func (g *logGatherer) newLogPuller(buffer *servicelog.RingBuffer) *logPuller {
ctx, kill := context.WithCancel(g.pullersCtx)
p := &logPuller{
iterator: buffer.TailIterator(),
entryCh: entryCh,
entryCh: g.entryCh,
ctx: ctx,
kill: kill,
wg: g.pullersGrp,
}
p.tomb.Go(p.loop)

p.wg.Add(1) // this will be marked as done once loop finishes
go p.loop()

return p
}

func (p *logPuller) loop() (err error) {
parser := servicelog.NewParser(p.iterator, parserSize)
defer func() { err = p.iterator.Close() }()
func (p *logPuller) loop() {
defer p.wg.Done()
defer func() { _ = p.iterator.Close() }()

for p.iterator.Next(p.tomb.Dying()) {
parser := servicelog.NewParser(p.iterator, parserSize)
for p.iterator.Next(p.ctx.Done()) {
for parser.Next() {
if err := parser.Err(); err != nil {
return err
return
}
select {
case p.entryCh <- parser.Entry():
case <-p.tomb.Dying():
case <-p.ctx.Done():
return
}
}
}
return
}

// logClient handles requests to a specific type of log target. It encodes
Expand Down

0 comments on commit d4abcb9

Please sign in to comment.