diff --git a/internals/overlord/logstate/gatherer.go b/internals/overlord/logstate/gatherer.go index 20e9a946c..1d8c26733 100644 --- a/internals/overlord/logstate/gatherer.go +++ b/internals/overlord/logstate/gatherer.go @@ -16,12 +16,12 @@ package logstate import ( "fmt" - "sync" "time" "github.com/canonical/pebble/internals/logger" "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" + "gopkg.in/tomb.v2" ) const parserSize = 4 * 1024 @@ -43,10 +43,8 @@ type logGatherer struct { cancel chan struct{} client logClient - iterators map[string]servicelog.Iterator - parsers map[string]*servicelog.Parser - itLock sync.Mutex - notifyChan chan bool + entryCh chan servicelog.Entry + pullers map[string]*logPuller } func newLogGatherer(target *plan.LogTarget) (*logGatherer, error) { @@ -60,41 +58,59 @@ func newLogGatherer(target *plan.LogTarget) (*logGatherer, error) { tickPeriod: 1 * time.Second, cancel: make(chan struct{}), client: client, - notifyChan: make(chan bool, 1), + entryCh: make(chan servicelog.Entry), + pullers: map[string]*logPuller{}, }, nil } -func (g *logGatherer) addIterator(serviceName string, iterator servicelog.Iterator, parser *servicelog.Parser) { - g.itLock.Lock() - defer g.itLock.Unlock() +func (g *logGatherer) planChanged(pl *plan.Plan, buffers map[string]*servicelog.RingBuffer) { + g.target = pl.LogTargets[g.target.Name] + newPullers := map[string]*logPuller{} - iterator.Notify(g.notifyChan) - g.iterators[serviceName] = iterator + for _, service := range pl.Services { + if !service.LogsTo(g.target) { + continue + } - if parser == nil { - parser = servicelog.NewParser(iterator, parserSize) - } - g.parsers[serviceName] = parser -} + if puller, ok := g.pullers[service.Name]; ok { + // Move over existing puller + newPullers[service.Name] = puller + delete(g.pullers, service.Name) + continue + } -// Clear all iterators/parsers and return the old ones. -func (g *logGatherer) clearIterators() (map[string]servicelog.Iterator, map[string]*servicelog.Parser) { - g.itLock.Lock() - defer g.itLock.Unlock() + buffer, bufferExists := buffers[service.Name] + if !bufferExists { + // We don't yet have a reference to the service's ring buffer + // Need to wait until serviceStarted + continue + } - oldIterators := make(map[string]servicelog.Iterator, len(g.iterators)) - for svc, it := range g.iterators { - oldIterators[svc] = it + // Create new puller + newPullers[service.Name] = &logPuller{ + iterator: buffer.TailIterator(), + tomb: &tomb.Tomb{}, + entryCh: g.entryCh, + } } - g.iterators = map[string]servicelog.Iterator{} - oldParsers := make(map[string]*servicelog.Parser, len(g.parsers)) - for svc, p := range g.parsers { - oldParsers[svc] = p + // Old pullers for now-removed services need to be shut down. + for svc, puller := range g.pullers { + err := puller.tomb.Killf("gatherer closed") + if err != nil { + logger.Noticef("Error shutting down puller for target %q, service %q: %v", + g.target.Name, svc, err) + } } - g.parsers = map[string]*servicelog.Parser{} + g.pullers = newPullers +} - return oldIterators, oldParsers +func (g *logGatherer) serviceStarted(service *plan.Service, buffer *servicelog.RingBuffer) { + g.pullers[service.Name] = &logPuller{ + iterator: buffer.TailIterator(), + tomb: &tomb.Tomb{}, + entryCh: g.entryCh, + } } func (g *logGatherer) loop() { @@ -118,9 +134,11 @@ func (g *logGatherer) loop() { } return - case <-g.notifyChan: - // (at least) one of the iterators has data to read - g.pullLogs() + case entry := <-g.entryCh: + err := g.client.Write(entry) + if err != nil { + logger.Noticef("Cannot write logs to target %q: %v", g.target.Name, err) + } } } } @@ -128,52 +146,45 @@ func (g *logGatherer) loop() { // stop closes the cancel channel, thereby terminating the main loop. func (g *logGatherer) stop() { close(g.cancel) -} - -func (g *logGatherer) pullLogs() { - g.itLock.Lock() - defer g.itLock.Unlock() - moreData := false - for svc, it := range g.iterators { - if !it.Next(nil) { - continue - } - parser := g.parsers[svc] - if !parser.Next() { - continue - } - if err := parser.Err(); err != nil { - // TODO: handle this properly ? - logger.Noticef("Cannot read logs from service %q: %v", svc, err) - continue - } - - entry := parser.Entry() - err := g.client.Write(entry) + for svc, puller := range g.pullers { + err := puller.tomb.Killf("gatherer closed") if err != nil { - logger.Noticef("Cannot write logs to target %q: %v", g.target.Name, err) - } - - if it.Next(nil) { - moreData = true + logger.Noticef("Error shutting down puller for target %q, service %q: %v", + g.target.Name, svc, err) } } +} - // If there is more data to read - put a token back in the notifyChan, so - // that we will continue to pull logs on the next loop iteration. - if moreData { - select { - case g.notifyChan <- true: - default: +// logPuller handles pulling logs from a single iterator and sending to the +// main control loop. +type logPuller struct { + iterator servicelog.Iterator + //parser *servicelog.Parser + tomb *tomb.Tomb + entryCh chan servicelog.Entry +} + +func (p *logPuller) loop() error { + parser := servicelog.NewParser(p.iterator, parserSize) + + for p.iterator.Next(p.tomb.Dying()) { + for parser.Next() { + if err := parser.Err(); err != nil { + return err + } + p.entryCh <- parser.Entry() } } + + // We've been killed - close the iterator + return p.iterator.Close() } // logClient handles requests to a specific type of log target. It encodes // log messages in the required format, and sends the messages using the // protocol required by that log target. -// For example, a logBuffer for Loki would encode the log messages in the +// For example, a logClient for Loki would encode the log messages in the // JSON format expected by Loki, and send them over HTTP(S). type logClient interface { Write(servicelog.Entry) error diff --git a/internals/overlord/logstate/manager.go b/internals/overlord/logstate/manager.go index c78855773..aae92351b 100644 --- a/internals/overlord/logstate/manager.go +++ b/internals/overlord/logstate/manager.go @@ -15,34 +15,25 @@ package logstate import ( - "sync" - "github.com/canonical/pebble/internals/logger" "github.com/canonical/pebble/internals/plan" "github.com/canonical/pebble/internals/servicelog" ) type LogManager struct { - gatherers map[string]*logGatherer - newGatherer func(*plan.LogTarget) (*logGatherer, error) - buffers map[string]*servicelog.RingBuffer - plan *plan.Plan - mu sync.Mutex + gatherers map[string]*logGatherer + buffers map[string]*servicelog.RingBuffer } func NewLogManager() *LogManager { return &LogManager{ - gatherers: map[string]*logGatherer{}, - newGatherer: newLogGatherer, - buffers: map[string]*servicelog.RingBuffer{}, + gatherers: map[string]*logGatherer{}, + buffers: map[string]*servicelog.RingBuffer{}, } } // PlanChanged is called by the service manager when the plan changes. We update the list of gatherers for each forwarder based on the new plan. func (m *LogManager) PlanChanged(pl *plan.Plan) { - m.mu.Lock() - defer m.mu.Unlock() - // Create a map to hold gatherers for the new plan. // Old gatherers will be moved over or deleted. newGatherers := make(map[string]*logGatherer, len(pl.LogTargets)) @@ -52,7 +43,7 @@ func (m *LogManager) PlanChanged(pl *plan.Plan) { if gatherer == nil { // Create new gatherer var err error - gatherer, err = m.newGatherer(target) + gatherer, err = newLogGatherer(target) if err != nil { logger.Noticef("Internal error: couldn't create gatherer: %v", err) break @@ -67,55 +58,26 @@ func (m *LogManager) PlanChanged(pl *plan.Plan) { } // Update iterators for gatherer - oldIterators, oldParsers := gatherer.clearIterators() - for _, service := range pl.Services { - if !service.LogsTo(target) { - continue - } - - iterator := oldIterators[service.Name] - parser := oldParsers[service.Name] - - if iterator == nil { - buffer := m.buffers[service.Name] - if buffer == nil { - // Don't yet have buffer - need to wait until ServiceStarted is called - continue - } - - iterator = buffer.TailIterator() - } - - gatherer.addIterator(service.Name, iterator, parser) - } + gatherer.planChanged(pl, m.buffers) } - // Old gatherers for now-removed services need to be shut down. + // Old gatherers for now-removed targets need to be shut down. for _, gatherer := range m.gatherers { gatherer.stop() } m.gatherers = newGatherers - m.plan = pl } // ServiceStarted notifies the log manager that the named service has started, // and provides a reference to the service's log buffer. func (m *LogManager) ServiceStarted(service *plan.Service, buffer *servicelog.RingBuffer) { - m.mu.Lock() - defer m.mu.Unlock() - m.buffers[service.Name] = buffer - for _, target := range m.plan.LogTargets { - if service.LogsTo(target) { - gatherer := m.gatherers[target.Name] - if gatherer == nil { - logger.Noticef("Internal error: couldn't find gatherer for target %q", target.Name) - continue - } - iterator := buffer.TailIterator() - gatherer.addIterator(service.Name, iterator, nil) + for _, gatherer := range m.gatherers { + if !service.LogsTo(gatherer.target) { + continue } + gatherer.serviceStarted(service, buffer) } } @@ -126,9 +88,6 @@ func (m *LogManager) Ensure() error { // Stop implements overlord.StateStopper and stops all log forwarding. func (m *LogManager) Stop() { - m.mu.Lock() - defer m.mu.Unlock() - for _, gatherer := range m.gatherers { gatherer.stop() }