Skip to content

Commit

Permalink
move iterator/parser update logic into logGatherer methods
Browse files Browse the repository at this point in the history
  • Loading branch information
barrettj12 committed Jul 7, 2023
1 parent aa0bb5a commit ce450db
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 35 deletions.
33 changes: 33 additions & 0 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,39 @@ func newLogGatherer(target *plan.LogTarget) (*logGatherer, error) {
}, nil
}

func (g *logGatherer) addIterator(serviceName string, iterator servicelog.Iterator, parser *servicelog.Parser) {
g.itLock.Lock()
defer g.itLock.Unlock()

iterator.Notify(g.notifyChan)
g.iterators[serviceName] = iterator

if parser == nil {
parser = servicelog.NewParser(iterator, parserSize)
}
g.parsers[serviceName] = parser
}

// Clear all iterators/parsers and return the old ones.
func (g *logGatherer) clearIterators() (map[string]servicelog.Iterator, map[string]*servicelog.Parser) {
g.itLock.Lock()
defer g.itLock.Unlock()

oldIterators := make(map[string]servicelog.Iterator, len(g.iterators))
for svc, it := range g.iterators {
oldIterators[svc] = it
}
g.iterators = map[string]servicelog.Iterator{}

oldParsers := make(map[string]*servicelog.Parser, len(g.parsers))
for svc, p := range g.parsers {
oldParsers[svc] = p
}
g.parsers = map[string]*servicelog.Parser{}

return oldIterators, oldParsers
}

func (g *logGatherer) loop() {
ticker := time.NewTicker(g.tickPeriod)
defer ticker.Stop()
Expand Down
53 changes: 18 additions & 35 deletions internals/overlord/logstate/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,38 +65,27 @@ func (m *LogManager) PlanChanged(pl *plan.Plan) {
}

// Update iterators for gatherer
newIterators := map[string]servicelog.Iterator{}
newParsers := map[string]*servicelog.Parser{}

gatherer.itLock.Lock()
oldIterators, oldParsers := gatherer.clearIterators()
for _, service := range pl.Services {
if service.LogsTo(target) {
iterator := gatherer.iterators[service.Name]
if iterator == nil {
// Create new iterator/parser
// If we don't yet have the buffer, we'll have to wait until
// ServiceStarted to create the iterator
buffer, ok := m.buffers[service.Name]
if ok {
iterator = buffer.TailIterator()
iterator.Notify(gatherer.notifyChan)
newIterators[service.Name] = iterator

parser := servicelog.NewParser(iterator, parserSize)
newParsers[service.Name] = parser
}
} else {
// Copy over old iterator/parser
newIterators[service.Name] = iterator
delete(gatherer.iterators, service.Name)
newParsers[service.Name] = gatherer.parsers[service.Name]
delete(gatherer.parsers, service.Name)
if !service.LogsTo(target) {
continue
}

iterator := oldIterators[service.Name]
parser := oldParsers[service.Name]

if iterator == nil {
buffer := m.buffers[service.Name]
if buffer == nil {
// Don't yet have buffer - need to wait until ServiceStarted is called
continue
}

iterator = buffer.TailIterator()
}

gatherer.addIterator(service.Name, iterator, parser)
}
gatherer.iterators = newIterators
gatherer.parsers = newParsers
gatherer.itLock.Unlock()
}

// Old gatherers for now-removed services need to be shut down.
Expand All @@ -119,14 +108,8 @@ func (m *LogManager) ServiceStarted(service *plan.Service, buffer *servicelog.Ri
continue
}

gatherer.itLock.Lock()
iterator := buffer.TailIterator()
iterator.Notify(gatherer.notifyChan)
gatherer.iterators[service.Name] = iterator

parser := servicelog.NewParser(iterator, parserSize)
gatherer.parsers[service.Name] = parser
gatherer.itLock.Unlock()
gatherer.addIterator(service.Name, iterator, nil)
}
}
}
Expand Down

0 comments on commit ce450db

Please sign in to comment.