Skip to content

Commit

Permalink
Improve handling of log gatherer tear down.
Browse files Browse the repository at this point in the history
  • Loading branch information
hpidcock committed Aug 14, 2023
1 parent 5899532 commit 2b4f8eb
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 36 deletions.
4 changes: 3 additions & 1 deletion internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logG
}
g.clientCtx, g.clientCancel = context.WithCancel(context.Background())
g.tomb.Go(g.loop)
g.tomb.Go(g.pullers.tomb.Wait)

return g, nil
}

Expand Down Expand Up @@ -239,7 +241,7 @@ func (g *logGatherer) Stop() {
// Wait for final flush in the main loop
err := g.tomb.Wait()
if err != nil {
logger.Noticef("Error shutting down gatherer: %v", err)
logger.Noticef("Internal error: cannot shut down gatherer: %v", err)
}
}

Expand Down
56 changes: 21 additions & 35 deletions internals/overlord/logstate/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type pullerGroup struct {
pullers map[string]*logPuller
// Mutex for pullers map
mu sync.RWMutex

tomb tomb.Tomb
}

func newPullerGroup(targetName string) *pullerGroup {
Expand All @@ -78,23 +80,20 @@ func newPullerGroup(targetName string) *pullerGroup {
}

func (pg *pullerGroup) Add(serviceName string, buffer *servicelog.RingBuffer, entryCh chan<- servicelog.Entry) {
pg.mu.Lock()
defer pg.mu.Unlock()

// There shouldn't already be a puller for this service, but if there is,
// shut it down first and wait for it to die.
pg.remove(serviceName)

lp := &logPuller{
iterator: buffer.TailIterator(),
entryCh: entryCh,
}
lp.tomb.Go(func() error {
err := lp.loop()
// Once the puller exits, we should remove it from the group.
//pg.Remove(serviceName) - this seems to cause deadlock though
return err
})
lp.tomb.Go(lp.loop)
pg.tomb.Go(lp.tomb.Wait)

// There shouldn't already be a puller for this service, but if there is,
// shut it down first and wait for it to die.
pg.Remove(serviceName)

pg.mu.Lock()
defer pg.mu.Unlock()
pg.pullers[serviceName] = lp
}

Expand All @@ -112,23 +111,24 @@ func (pg *pullerGroup) List() []string {
}

func (pg *pullerGroup) Remove(serviceName string) {
pg.mu.RLock()
puller, pullerExists := pg.pullers[serviceName]
pg.mu.RUnlock()
pg.mu.Lock()
defer pg.mu.Unlock()
pg.remove(serviceName)
}

func (pg *pullerGroup) remove(serviceName string) {
puller, pullerExists := pg.pullers[serviceName]
if !pullerExists {
return
}

puller.tomb.Kill(nil)
delete(pg.pullers, serviceName)

err := puller.tomb.Wait()
if err != nil {
logger.Noticef("Error from log puller: %v", err)
}

pg.mu.Lock()
defer pg.mu.Unlock()
delete(pg.pullers, serviceName)
}

func (pg *pullerGroup) KillAll() {
Expand All @@ -138,26 +138,12 @@ func (pg *pullerGroup) KillAll() {
for _, puller := range pg.pullers {
puller.tomb.Kill(nil)
}
pg.tomb.Kill(nil)
}

// Done returns a channel which can be waited on until all pullers have finished.
func (pg *pullerGroup) Done() <-chan struct{} {
pg.mu.RLock()
pullers := pg.pullers
pg.mu.RUnlock()

done := make(chan struct{})
go func() {
for _, puller := range pullers {
err := puller.tomb.Wait()
if err != nil {
logger.Noticef("Error from log puller: %v", err)
}
}
close(done)
}()

return done
return pg.tomb.Dead()
}

func (pg *pullerGroup) Contains(serviceName string) bool {
Expand Down

0 comments on commit 2b4f8eb

Please sign in to comment.