Skip to content

Commit

Permalink
Redesign with logPullers
Browse files Browse the repository at this point in the history
- one goroutine/tomb per iterator
- move all iterator logic from manager to gatherer methods
  • Loading branch information
barrettj12 committed Jul 11, 2023
1 parent f832bc1 commit f8d8bdb
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 118 deletions.
143 changes: 77 additions & 66 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ package logstate

import (
"fmt"
"sync"
"time"

"github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
"gopkg.in/tomb.v2"
)

const parserSize = 4 * 1024
Expand All @@ -43,10 +43,8 @@ type logGatherer struct {
cancel chan struct{}
client logClient

iterators map[string]servicelog.Iterator
parsers map[string]*servicelog.Parser
itLock sync.Mutex
notifyChan chan bool
entryCh chan servicelog.Entry
pullers map[string]*logPuller
}

func newLogGatherer(target *plan.LogTarget) (*logGatherer, error) {
Expand All @@ -60,41 +58,59 @@ func newLogGatherer(target *plan.LogTarget) (*logGatherer, error) {
tickPeriod: 1 * time.Second,
cancel: make(chan struct{}),
client: client,
notifyChan: make(chan bool, 1),
entryCh: make(chan servicelog.Entry),
pullers: map[string]*logPuller{},
}, nil
}

func (g *logGatherer) addIterator(serviceName string, iterator servicelog.Iterator, parser *servicelog.Parser) {
g.itLock.Lock()
defer g.itLock.Unlock()
func (g *logGatherer) planChanged(pl *plan.Plan, buffers map[string]*servicelog.RingBuffer) {
g.target = pl.LogTargets[g.target.Name]
newPullers := map[string]*logPuller{}

iterator.Notify(g.notifyChan)
g.iterators[serviceName] = iterator
for _, service := range pl.Services {
if !service.LogsTo(g.target) {
continue
}

if parser == nil {
parser = servicelog.NewParser(iterator, parserSize)
}
g.parsers[serviceName] = parser
}
if puller, ok := g.pullers[service.Name]; ok {
// Move over existing puller
newPullers[service.Name] = puller
delete(g.pullers, service.Name)
continue
}

// Clear all iterators/parsers and return the old ones.
func (g *logGatherer) clearIterators() (map[string]servicelog.Iterator, map[string]*servicelog.Parser) {
g.itLock.Lock()
defer g.itLock.Unlock()
buffer, bufferExists := buffers[service.Name]
if !bufferExists {
// We don't yet have a reference to the service's ring buffer
// Need to wait until serviceStarted
continue
}

oldIterators := make(map[string]servicelog.Iterator, len(g.iterators))
for svc, it := range g.iterators {
oldIterators[svc] = it
// Create new puller
newPullers[service.Name] = &logPuller{
iterator: buffer.TailIterator(),
tomb: &tomb.Tomb{},
entryCh: g.entryCh,
}
}
g.iterators = map[string]servicelog.Iterator{}

oldParsers := make(map[string]*servicelog.Parser, len(g.parsers))
for svc, p := range g.parsers {
oldParsers[svc] = p
// Old pullers for now-removed services need to be shut down.
for svc, puller := range g.pullers {
err := puller.tomb.Killf("gatherer closed")
if err != nil {
logger.Noticef("Error shutting down puller for target %q, service %q: %v",
g.target.Name, svc, err)
}
}
g.parsers = map[string]*servicelog.Parser{}
g.pullers = newPullers
}

return oldIterators, oldParsers
func (g *logGatherer) serviceStarted(service *plan.Service, buffer *servicelog.RingBuffer) {
g.pullers[service.Name] = &logPuller{
iterator: buffer.TailIterator(),
tomb: &tomb.Tomb{},
entryCh: g.entryCh,
}
}

func (g *logGatherer) loop() {
Expand All @@ -118,62 +134,57 @@ func (g *logGatherer) loop() {
}
return

case <-g.notifyChan:
// (at least) one of the iterators has data to read
g.pullLogs()
case entry := <-g.entryCh:
err := g.client.Write(entry)
if err != nil {
logger.Noticef("Cannot write logs to target %q: %v", g.target.Name, err)
}
}
}
}

// stop closes the cancel channel, thereby terminating the main loop.
func (g *logGatherer) stop() {
close(g.cancel)
}

func (g *logGatherer) pullLogs() {
g.itLock.Lock()
defer g.itLock.Unlock()

moreData := false
for svc, it := range g.iterators {
if !it.Next(nil) {
continue
}
parser := g.parsers[svc]
if !parser.Next() {
continue
}
if err := parser.Err(); err != nil {
// TODO: handle this properly ?
logger.Noticef("Cannot read logs from service %q: %v", svc, err)
continue
}

entry := parser.Entry()
err := g.client.Write(entry)
for svc, puller := range g.pullers {
err := puller.tomb.Killf("gatherer closed")
if err != nil {
logger.Noticef("Cannot write logs to target %q: %v", g.target.Name, err)
}

if it.Next(nil) {
moreData = true
logger.Noticef("Error shutting down puller for target %q, service %q: %v",
g.target.Name, svc, err)
}
}
}

// If there is more data to read - put a token back in the notifyChan, so
// that we will continue to pull logs on the next loop iteration.
if moreData {
select {
case g.notifyChan <- true:
default:
// logPuller handles pulling logs from a single iterator and sending to the
// main control loop.
type logPuller struct {
iterator servicelog.Iterator
//parser *servicelog.Parser
tomb *tomb.Tomb
entryCh chan servicelog.Entry
}

func (p *logPuller) loop() error {
parser := servicelog.NewParser(p.iterator, parserSize)

for p.iterator.Next(p.tomb.Dying()) {
for parser.Next() {
if err := parser.Err(); err != nil {
return err
}
p.entryCh <- parser.Entry()
}
}

// We've been killed - close the iterator
return p.iterator.Close()
}

// logClient handles requests to a specific type of log target. It encodes
// log messages in the required format, and sends the messages using the
// protocol required by that log target.
// For example, a logBuffer for Loki would encode the log messages in the
// For example, a logClient for Loki would encode the log messages in the
// JSON format expected by Loki, and send them over HTTP(S).
type logClient interface {
Write(servicelog.Entry) error
Expand Down
63 changes: 11 additions & 52 deletions internals/overlord/logstate/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,25 @@
package logstate

import (
"sync"

"github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)

type LogManager struct {
gatherers map[string]*logGatherer
newGatherer func(*plan.LogTarget) (*logGatherer, error)
buffers map[string]*servicelog.RingBuffer
plan *plan.Plan
mu sync.Mutex
gatherers map[string]*logGatherer
buffers map[string]*servicelog.RingBuffer
}

func NewLogManager() *LogManager {
return &LogManager{
gatherers: map[string]*logGatherer{},
newGatherer: newLogGatherer,
buffers: map[string]*servicelog.RingBuffer{},
gatherers: map[string]*logGatherer{},
buffers: map[string]*servicelog.RingBuffer{},
}
}

// PlanChanged is called by the service manager when the plan changes. We update the list of gatherers for each forwarder based on the new plan.
func (m *LogManager) PlanChanged(pl *plan.Plan) {
m.mu.Lock()
defer m.mu.Unlock()

// Create a map to hold gatherers for the new plan.
// Old gatherers will be moved over or deleted.
newGatherers := make(map[string]*logGatherer, len(pl.LogTargets))
Expand All @@ -52,7 +43,7 @@ func (m *LogManager) PlanChanged(pl *plan.Plan) {
if gatherer == nil {
// Create new gatherer
var err error
gatherer, err = m.newGatherer(target)
gatherer, err = newLogGatherer(target)
if err != nil {
logger.Noticef("Internal error: couldn't create gatherer: %v", err)
break
Expand All @@ -67,55 +58,26 @@ func (m *LogManager) PlanChanged(pl *plan.Plan) {
}

// Update iterators for gatherer
oldIterators, oldParsers := gatherer.clearIterators()
for _, service := range pl.Services {
if !service.LogsTo(target) {
continue
}

iterator := oldIterators[service.Name]
parser := oldParsers[service.Name]

if iterator == nil {
buffer := m.buffers[service.Name]
if buffer == nil {
// Don't yet have buffer - need to wait until ServiceStarted is called
continue
}

iterator = buffer.TailIterator()
}

gatherer.addIterator(service.Name, iterator, parser)
}
gatherer.planChanged(pl, m.buffers)
}

// Old gatherers for now-removed services need to be shut down.
// Old gatherers for now-removed targets need to be shut down.
for _, gatherer := range m.gatherers {
gatherer.stop()
}
m.gatherers = newGatherers
m.plan = pl
}

// ServiceStarted notifies the log manager that the named service has started,
// and provides a reference to the service's log buffer.
func (m *LogManager) ServiceStarted(service *plan.Service, buffer *servicelog.RingBuffer) {
m.mu.Lock()
defer m.mu.Unlock()

m.buffers[service.Name] = buffer
for _, target := range m.plan.LogTargets {
if service.LogsTo(target) {
gatherer := m.gatherers[target.Name]
if gatherer == nil {
logger.Noticef("Internal error: couldn't find gatherer for target %q", target.Name)
continue
}

iterator := buffer.TailIterator()
gatherer.addIterator(service.Name, iterator, nil)
for _, gatherer := range m.gatherers {
if !service.LogsTo(gatherer.target) {
continue
}
gatherer.serviceStarted(service, buffer)
}
}

Expand All @@ -126,9 +88,6 @@ func (m *LogManager) Ensure() error {

// Stop implements overlord.StateStopper and stops all log forwarding.
func (m *LogManager) Stop() {
m.mu.Lock()
defer m.mu.Unlock()

for _, gatherer := range m.gatherers {
gatherer.stop()
}
Expand Down

0 comments on commit f8d8bdb

Please sign in to comment.