Skip to content

Commit f065b10

Browse files
committed
Revert "Address goroutine leak with dynamically determined log destinations (#1848)"
This reverts commit 637837b.
1 parent 90c0151 commit f065b10

File tree

12 files changed

+303
-299
lines changed

12 files changed

+303
-299
lines changed

logs/logs.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ type LogBackend interface {
6565
// e.g. a particular log stream in cloudwatchlogs.
6666
type LogDest interface {
6767
Publish(events []LogEvent) error
68-
NotifySourceStopped()
6968
}
7069

7170
// LogAgent is the agent handles pure log pipelines
7271
type LogAgent struct {
7372
Config *config.Config
7473
backends map[string]LogBackend
74+
destNames map[LogDest]string
7575
collections []LogCollection
7676
retentionAlreadyAttempted map[string]bool
7777
}
@@ -80,6 +80,7 @@ func NewLogAgent(c *config.Config) *LogAgent {
8080
return &LogAgent{
8181
Config: c,
8282
backends: make(map[string]LogBackend),
83+
destNames: make(map[LogDest]string),
8384
retentionAlreadyAttempted: make(map[string]bool),
8485
}
8586
}
@@ -135,6 +136,7 @@ func (l *LogAgent) Run(ctx context.Context) {
135136
}
136137
retention = l.checkRetentionAlreadyAttempted(retention, logGroup)
137138
dest := backend.CreateDest(logGroup, logStream, retention, logGroupClass, src)
139+
l.destNames[dest] = dname
138140
log.Printf("I! [logagent] piping log from %s/%s(%s) to %s with retention %d", logGroup, logStream, description, dname, retention)
139141
go l.runSrcToDest(src, dest)
140142
}
@@ -146,10 +148,8 @@ func (l *LogAgent) Run(ctx context.Context) {
146148
}
147149

148150
func (l *LogAgent) runSrcToDest(src LogSrc, dest LogDest) {
149-
150151
eventsCh := make(chan LogEvent)
151152
defer src.Stop()
152-
defer dest.NotifySourceStopped()
153153

154154
closed := false
155155
src.SetOutput(func(e LogEvent) {
@@ -168,11 +168,11 @@ func (l *LogAgent) runSrcToDest(src LogSrc, dest LogDest) {
168168
for e := range eventsCh {
169169
err := dest.Publish([]LogEvent{e})
170170
if err == ErrOutputStopped {
171-
log.Printf("I! [logagent] Log destination %v has stopped, finalizing %v/%v", src.Destination(), src.Group(), src.Stream())
171+
log.Printf("I! [logagent] Log destination %v has stopped, finalizing %v/%v", l.destNames[dest], src.Group(), src.Stream())
172172
return
173173
}
174174
if err != nil {
175-
log.Printf("E! [logagent] Failed to publish log to %v, error: %v", src.Destination(), err)
175+
log.Printf("E! [logagent] Failed to publish log to %v, error: %v", l.destNames[dest], err)
176176
return
177177
}
178178
}

plugins/inputs/logfile/logfile.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,8 @@ type LogFile struct {
4141
started bool
4242
}
4343

44-
var _ logs.LogCollection = (*LogFile)(nil)
45-
4644
func NewLogFile() *LogFile {
45+
4746
return &LogFile{
4847
configs: make(map[*FileConfig]map[string]*tailerSrc),
4948
done: make(chan struct{}),
@@ -252,6 +251,11 @@ func (t *LogFile) FindLogSrc() []logs.LogSrc {
252251
}
253252
}
254253

254+
destination := fileconfig.Destination
255+
if destination == "" {
256+
destination = t.Destination
257+
}
258+
255259
src := NewTailerSrc(
256260
groupName, streamName,
257261
t.Destination,

0 commit comments

Comments
 (0)