From be25405911e985a51a8ae537983c0f25a7a66a90 Mon Sep 17 00:00:00 2001 From: Jordan Barrett Date: Fri, 11 Aug 2023 10:40:45 +0700 Subject: [PATCH] rewrite pullerGroup using tombs --- internals/overlord/logstate/puller.go | 91 ++++++++++++++------------- 1 file changed, 48 insertions(+), 43 deletions(-) diff --git a/internals/overlord/logstate/puller.go b/internals/overlord/logstate/puller.go index d42d211b..e078ba8e 100644 --- a/internals/overlord/logstate/puller.go +++ b/internals/overlord/logstate/puller.go @@ -15,11 +15,11 @@ package logstate import ( - "context" "sync" "github.com/canonical/pebble/internals/logger" "github.com/canonical/pebble/internals/servicelog" + "gopkg.in/tomb.v2" ) // logPuller handles pulling logs from a single iterator and sending to the @@ -28,8 +28,7 @@ type logPuller struct { iterator servicelog.Iterator entryCh chan<- servicelog.Entry - ctx context.Context - cancel context.CancelFunc + tomb tomb.Tomb } // loop pulls logs off the iterator and sends them on the entryCh. @@ -37,30 +36,24 @@ type logPuller struct { // - if the puller's context is cancelled // - once the ringbuffer is closed and the iterator finishes reading all // remaining logs. -func (p *logPuller) loop() { +func (p *logPuller) loop() error { defer func() { _ = p.iterator.Close() }() parser := servicelog.NewParser(p.iterator, parserSize) - for p.iterator.Next(p.ctx.Done()) { + for p.iterator.Next(p.tomb.Dying()) { for parser.Next() { if err := parser.Err(); err != nil { - return - } - - // Check if our context has been cancelled - select { - case <-p.ctx.Done(): - return - default: + return err } select { case p.entryCh <- parser.Entry(): - case <-p.ctx.Done(): - return + case <-p.tomb.Dying(): + return nil } } } + return nil } // pullerGroup represents a group of logPullers, and provides methods for a @@ -73,14 +66,6 @@ type pullerGroup struct { pullers map[string]*logPuller // Mutex for pullers map mu sync.RWMutex - // Common context for all pullers. Each puller uses a derived context so we - // can easily kill all pullers (if required) during teardown. - ctx context.Context - // Cancel func for ctx - kill context.CancelFunc - // WaitGroup for pullers - we use this during teardown to know when all the - // pullers are finished. - wg sync.WaitGroup } func newPullerGroup(targetName string) *pullerGroup { @@ -88,7 +73,6 @@ func newPullerGroup(targetName string) *pullerGroup { targetName: targetName, pullers: map[string]*logPuller{}, } - pg.ctx, pg.kill = context.WithCancel(context.Background()) return pg } @@ -97,22 +81,19 @@ func (pg *pullerGroup) Add(serviceName string, buffer *servicelog.RingBuffer, en iterator: buffer.TailIterator(), entryCh: entryCh, } - lp.ctx, lp.cancel = context.WithCancel(pg.ctx) + lp.tomb.Go(func() error { + err := lp.loop() + // Once the puller exits, we should remove it from the group. + //pg.Remove(serviceName) - this seems to cause deadlock though + return err + }) - pg.wg.Add(1) // this will be marked as done once loop finishes - go func() { - lp.loop() - pg.wg.Done() - // TODO: remove puller from map ? - }() + // There shouldn't already be a puller for this service, but if there is, + // shut it down first and wait for it to die. + pg.Remove(serviceName) pg.mu.Lock() defer pg.mu.Unlock() - if puller, ok := pg.pullers[serviceName]; ok { - // This should never happen, but just in case, shut down the old puller. - logger.Debugf("puller for service %q already exists, shutting down old puller", serviceName) - puller.cancel() - } pg.pullers[serviceName] = lp } @@ -130,27 +111,51 @@ func (pg *pullerGroup) List() []string { } func (pg *pullerGroup) Remove(serviceName string) { - pg.mu.Lock() - defer pg.mu.Unlock() + pg.mu.RLock() + puller, pullerExists := pg.pullers[serviceName] + pg.mu.RUnlock() + + if !pullerExists { + return + } - if puller, ok := pg.pullers[serviceName]; ok { - puller.cancel() - delete(pg.pullers, serviceName) + puller.tomb.Kill(nil) + err := puller.tomb.Wait() + if err != nil { + logger.Noticef("Error from log puller: %v", err) } + pg.mu.Lock() + defer pg.mu.Unlock() + delete(pg.pullers, serviceName) } func (pg *pullerGroup) KillAll() { - pg.kill() + pg.mu.RLock() + defer pg.mu.RUnlock() + + for _, puller := range pg.pullers { + puller.tomb.Kill(nil) + } } // Done returns a channel which can be waited on until all pullers have finished. func (pg *pullerGroup) Done() <-chan struct{} { + pg.mu.RLock() + pullers := pg.pullers + pg.mu.RUnlock() + done := make(chan struct{}) go func() { - pg.wg.Wait() + for _, puller := range pullers { + err := puller.tomb.Wait() + if err != nil { + logger.Noticef("Error from log puller: %v", err) + } + } close(done) }() + return done }