Skip to content
10 changes: 5 additions & 5 deletions logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ type LogBackend interface {
// e.g. a particular log stream in cloudwatchlogs.
type LogDest interface {
Publish(events []LogEvent) error
NotifySourceStopped()
}

// LogAgent is the agent handles pure log pipelines
type LogAgent struct {
Config *config.Config
backends map[string]LogBackend
destNames map[LogDest]string
collections []LogCollection
retentionAlreadyAttempted map[string]bool
}
Expand All @@ -80,7 +80,6 @@ func NewLogAgent(c *config.Config) *LogAgent {
return &LogAgent{
Config: c,
backends: make(map[string]LogBackend),
destNames: make(map[LogDest]string),
retentionAlreadyAttempted: make(map[string]bool),
}
}
Expand Down Expand Up @@ -136,7 +135,6 @@ func (l *LogAgent) Run(ctx context.Context) {
}
retention = l.checkRetentionAlreadyAttempted(retention, logGroup)
dest := backend.CreateDest(logGroup, logStream, retention, logGroupClass, src)
l.destNames[dest] = dname
log.Printf("I! [logagent] piping log from %s/%s(%s) to %s with retention %d", logGroup, logStream, description, dname, retention)
go l.runSrcToDest(src, dest)
}
Expand All @@ -148,8 +146,10 @@ func (l *LogAgent) Run(ctx context.Context) {
}

func (l *LogAgent) runSrcToDest(src LogSrc, dest LogDest) {

eventsCh := make(chan LogEvent)
defer src.Stop()
defer dest.NotifySourceStopped()

closed := false
src.SetOutput(func(e LogEvent) {
Expand All @@ -168,11 +168,11 @@ func (l *LogAgent) runSrcToDest(src LogSrc, dest LogDest) {
for e := range eventsCh {
err := dest.Publish([]LogEvent{e})
if err == ErrOutputStopped {
log.Printf("I! [logagent] Log destination %v has stopped, finalizing %v/%v", l.destNames[dest], src.Group(), src.Stream())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was actually a concurrent map write condition here. This non-sync map is written to in the Run routine and is potentially read from the runSrcToDest goroutine.

log.Printf("I! [logagent] Log destination %v has stopped, finalizing %v/%v", src.Destination(), src.Group(), src.Stream())
return
}
if err != nil {
log.Printf("E! [logagent] Failed to publish log to %v, error: %v", l.destNames[dest], err)
log.Printf("E! [logagent] Failed to publish log to %v, error: %v", src.Destination(), err)
return
}
}
Expand Down
8 changes: 2 additions & 6 deletions plugins/inputs/logfile/logfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ type LogFile struct {
started bool
}

func NewLogFile() *LogFile {
var _ logs.LogCollection = (*LogFile)(nil)

func NewLogFile() *LogFile {
return &LogFile{
configs: make(map[*FileConfig]map[string]*tailerSrc),
done: make(chan struct{}),
Expand Down Expand Up @@ -251,11 +252,6 @@ func (t *LogFile) FindLogSrc() []logs.LogSrc {
}
}

destination := fileconfig.Destination
if destination == "" {
destination = t.Destination
}

Comment on lines -254 to -258
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is unused

src := NewTailerSrc(
groupName, streamName,
t.Destination,
Expand Down
Loading
Loading