-
Notifications
You must be signed in to change notification settings - Fork 237
Address goroutine leak with dynamically determined log destinations #1848
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a312b0f
128847f
a7510db
8778ad9
a5424dc
90e380c
2b15fa5
4233347
4082795
50d569c
17ca8f5
012ea3f
45e6bd1
4e7e9dd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -65,13 +65,13 @@ type LogBackend interface { | |
| // e.g. a particular log stream in cloudwatchlogs. | ||
| type LogDest interface { | ||
| Publish(events []LogEvent) error | ||
| NotifySourceStopped() | ||
| } | ||
|
|
||
| // LogAgent is the agent handles pure log pipelines | ||
| type LogAgent struct { | ||
| Config *config.Config | ||
| backends map[string]LogBackend | ||
| destNames map[LogDest]string | ||
| collections []LogCollection | ||
| retentionAlreadyAttempted map[string]bool | ||
| } | ||
|
|
@@ -80,7 +80,6 @@ func NewLogAgent(c *config.Config) *LogAgent { | |
| return &LogAgent{ | ||
| Config: c, | ||
| backends: make(map[string]LogBackend), | ||
| destNames: make(map[LogDest]string), | ||
| retentionAlreadyAttempted: make(map[string]bool), | ||
| } | ||
| } | ||
|
|
@@ -136,7 +135,6 @@ func (l *LogAgent) Run(ctx context.Context) { | |
| } | ||
| retention = l.checkRetentionAlreadyAttempted(retention, logGroup) | ||
| dest := backend.CreateDest(logGroup, logStream, retention, logGroupClass, src) | ||
| l.destNames[dest] = dname | ||
| log.Printf("I! [logagent] piping log from %s/%s(%s) to %s with retention %d", logGroup, logStream, description, dname, retention) | ||
| go l.runSrcToDest(src, dest) | ||
| } | ||
|
|
@@ -148,8 +146,10 @@ func (l *LogAgent) Run(ctx context.Context) { | |
| } | ||
|
|
||
| func (l *LogAgent) runSrcToDest(src LogSrc, dest LogDest) { | ||
|
|
||
| eventsCh := make(chan LogEvent) | ||
| defer src.Stop() | ||
| defer dest.NotifySourceStopped() | ||
|
|
||
| closed := false | ||
| src.SetOutput(func(e LogEvent) { | ||
|
|
@@ -168,11 +168,11 @@ func (l *LogAgent) runSrcToDest(src LogSrc, dest LogDest) { | |
| for e := range eventsCh { | ||
| err := dest.Publish([]LogEvent{e}) | ||
| if err == ErrOutputStopped { | ||
| log.Printf("I! [logagent] Log destination %v has stopped, finalizing %v/%v", l.destNames[dest], src.Group(), src.Stream()) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was actually a concurrent map write condition here. This non-sync map is written to in the |
||
| log.Printf("I! [logagent] Log destination %v has stopped, finalizing %v/%v", src.Destination(), src.Group(), src.Stream()) | ||
| return | ||
| } | ||
| if err != nil { | ||
| log.Printf("E! [logagent] Failed to publish log to %v, error: %v", l.destNames[dest], err) | ||
| log.Printf("E! [logagent] Failed to publish log to %v, error: %v", src.Destination(), err) | ||
| return | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,8 +41,9 @@ type LogFile struct { | |
| started bool | ||
| } | ||
|
|
||
| func NewLogFile() *LogFile { | ||
| var _ logs.LogCollection = (*LogFile)(nil) | ||
|
|
||
| func NewLogFile() *LogFile { | ||
| return &LogFile{ | ||
| configs: make(map[*FileConfig]map[string]*tailerSrc), | ||
| done: make(chan struct{}), | ||
|
|
@@ -251,11 +252,6 @@ func (t *LogFile) FindLogSrc() []logs.LogSrc { | |
| } | ||
| } | ||
|
|
||
| destination := fileconfig.Destination | ||
| if destination == "" { | ||
| destination = t.Destination | ||
| } | ||
|
|
||
|
Comment on lines
-254
to
-258
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This variable is unused |
||
| src := NewTailerSrc( | ||
| groupName, streamName, | ||
| t.Destination, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it matter which order this is in? If the source is stopped after the destination reports it, would that be an issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order doesn't matter. The two objects are only linked through this function. Maybe the naming isn't optimal here which might cause some confusion. The
NotifySourceStoppedmethod ondestdoesn't notify the source, it notifiesdestthat the source is stopped.