Skip to content

Commit

Permalink
address Ben's review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
barrettj12 committed Aug 15, 2023
1 parent 9465a3a commit 5e77fb3
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
16 changes: 8 additions & 8 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
// logs from. Each logPuller will run in a separate goroutine, and send logs to
// the logGatherer via a shared channel.
// The logGatherer will "flush" the client:
// - on a regular cadence (e.g. every 1 second)
// - after a timeout (1s) has passed since the first log was written;
// - when it is told to shut down.
//
// The client may also flush itself when its internal buffer reaches a certain
Expand Down Expand Up @@ -128,7 +128,7 @@ func fillDefaultArgs(args logGathererArgs) logGathererArgs {
// gatherer's target exists in the new plan.
func (g *logGatherer) PlanChanged(pl *plan.Plan, buffers map[string]*servicelog.RingBuffer) {
// Remove old pullers
for _, svcName := range g.pullers.List() {
for _, svcName := range g.pullers.Services() {
svc, svcExists := pl.Services[svcName]
if !svcExists {
g.pullers.Remove(svcName)
Expand Down Expand Up @@ -170,27 +170,27 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R
// client periodically, and exits when the gatherer's tomb is killed.
func (g *logGatherer) loop() error {
timer := newTimer()
defer timer.Stop()

mainLoop:
for {
select {
case <-g.tomb.Dying():
break mainLoop

case <-timer.Finished():
case <-timer.Expired():
// Mark timer as unset
timer.Stop()
err := g.client.Flush(g.clientCtx)
if err != nil {
logger.Noticef("Error sending logs to target %q: %v", g.targetName, err)
logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
}

case entry := <-g.entryCh:
err := g.client.Write(g.clientCtx, entry)
if err != nil {
logger.Noticef("Error writing logs to target %q: %v", g.targetName, err)
logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err)
}
// Set timer if not already set
timer.EnsureSet(g.bufferTimeout)
}
}
Expand All @@ -201,7 +201,7 @@ mainLoop:
defer cancel()
err := g.client.Flush(ctx)
if err != nil {
logger.Noticef("Error sending logs to target %q: %v", g.targetName, err)
logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
}
return nil
}
Expand Down Expand Up @@ -260,7 +260,7 @@ func newTimer() timer {
return t
}

func (t *timer) Finished() <-chan time.Time {
func (t *timer) Expired() <-chan time.Time {
return t.timer.C
}

Expand Down
8 changes: 4 additions & 4 deletions internals/overlord/logstate/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ func (pg *pullerGroup) Add(serviceName string, buffer *servicelog.RingBuffer, en
pg.pullers[serviceName] = lp
}

// List returns a list of all service names for which we have a currently
// active puller.
func (pg *pullerGroup) List() []string {
// Services returns a list containing the name of each service for which we
// have a puller in this group.
func (pg *pullerGroup) Services() []string {
pg.mu.RLock()
defer pg.mu.RUnlock()

var svcs []string
svcs := make([]string, 0, len(pg.pullers))
for svc := range pg.pullers {
svcs = append(svcs, svc)
}
Expand Down

0 comments on commit 5e77fb3

Please sign in to comment.