Skip to content

Commit

Permalink
rewrite pullerGroup using tombs
Browse files Browse the repository at this point in the history
  • Loading branch information
barrettj12 committed Aug 11, 2023
1 parent 6bae051 commit be25405
Showing 1 changed file with 48 additions and 43 deletions.
91 changes: 48 additions & 43 deletions internals/overlord/logstate/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
package logstate

import (
"context"
"sync"

Check failure on line 19 in internals/overlord/logstate/puller.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default -s Prefix(github.com/canonical/pebble) (gci)
"github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/servicelog"
"gopkg.in/tomb.v2"

Check failure on line 22 in internals/overlord/logstate/puller.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default -s Prefix(github.com/canonical/pebble) (gci)
)

// logPuller handles pulling logs from a single iterator and sending to the
Expand All @@ -28,39 +28,32 @@ type logPuller struct {
iterator servicelog.Iterator
entryCh chan<- servicelog.Entry

ctx context.Context
cancel context.CancelFunc
tomb tomb.Tomb
}

// loop pulls logs off the iterator and sends them on the entryCh.
// The loop will terminate:
// - if the puller's context is cancelled
// - once the ringbuffer is closed and the iterator finishes reading all
// remaining logs.
func (p *logPuller) loop() {
func (p *logPuller) loop() error {
defer func() { _ = p.iterator.Close() }()

parser := servicelog.NewParser(p.iterator, parserSize)
for p.iterator.Next(p.ctx.Done()) {
for p.iterator.Next(p.tomb.Dying()) {
for parser.Next() {
if err := parser.Err(); err != nil {
return
}

// Check if our context has been cancelled
select {
case <-p.ctx.Done():
return
default:
return err
}

select {
case p.entryCh <- parser.Entry():
case <-p.ctx.Done():
return
case <-p.tomb.Dying():
return nil
}
}
}
return nil
}

// pullerGroup represents a group of logPullers, and provides methods for a
Expand All @@ -73,22 +66,13 @@ type pullerGroup struct {
pullers map[string]*logPuller
// Mutex for pullers map
mu sync.RWMutex
// Common context for all pullers. Each puller uses a derived context so we
// can easily kill all pullers (if required) during teardown.
ctx context.Context
// Cancel func for ctx
kill context.CancelFunc
// WaitGroup for pullers - we use this during teardown to know when all the
// pullers are finished.
wg sync.WaitGroup
}

func newPullerGroup(targetName string) *pullerGroup {
pg := &pullerGroup{
targetName: targetName,
pullers: map[string]*logPuller{},
}
pg.ctx, pg.kill = context.WithCancel(context.Background())
return pg
}

Expand All @@ -97,22 +81,19 @@ func (pg *pullerGroup) Add(serviceName string, buffer *servicelog.RingBuffer, en
iterator: buffer.TailIterator(),
entryCh: entryCh,
}
lp.ctx, lp.cancel = context.WithCancel(pg.ctx)
lp.tomb.Go(func() error {
err := lp.loop()
// Once the puller exits, we should remove it from the group.
//pg.Remove(serviceName) - this seems to cause deadlock though
return err
})

pg.wg.Add(1) // this will be marked as done once loop finishes
go func() {
lp.loop()
pg.wg.Done()
// TODO: remove puller from map ?
}()
// There shouldn't already be a puller for this service, but if there is,
// shut it down first and wait for it to die.
pg.Remove(serviceName)

pg.mu.Lock()
defer pg.mu.Unlock()
if puller, ok := pg.pullers[serviceName]; ok {
// This should never happen, but just in case, shut down the old puller.
logger.Debugf("puller for service %q already exists, shutting down old puller", serviceName)
puller.cancel()
}
pg.pullers[serviceName] = lp
}

Expand All @@ -130,27 +111,51 @@ func (pg *pullerGroup) List() []string {
}

func (pg *pullerGroup) Remove(serviceName string) {
pg.mu.Lock()
defer pg.mu.Unlock()
pg.mu.RLock()
puller, pullerExists := pg.pullers[serviceName]
pg.mu.RUnlock()

if !pullerExists {
return
}

if puller, ok := pg.pullers[serviceName]; ok {
puller.cancel()
delete(pg.pullers, serviceName)
puller.tomb.Kill(nil)
err := puller.tomb.Wait()
if err != nil {
logger.Noticef("Error from log puller: %v", err)
}

pg.mu.Lock()
defer pg.mu.Unlock()
delete(pg.pullers, serviceName)
}

func (pg *pullerGroup) KillAll() {
pg.kill()
pg.mu.RLock()
defer pg.mu.RUnlock()

for _, puller := range pg.pullers {
puller.tomb.Kill(nil)
}
}

// Done returns a channel which can be waited on until all pullers have finished.
func (pg *pullerGroup) Done() <-chan struct{} {
pg.mu.RLock()
pullers := pg.pullers
pg.mu.RUnlock()

done := make(chan struct{})
go func() {
pg.wg.Wait()
for _, puller := range pullers {
err := puller.tomb.Wait()
if err != nil {
logger.Noticef("Error from log puller: %v", err)
}
}
close(done)
}()

return done
}

Expand Down

0 comments on commit be25405

Please sign in to comment.