diff --git a/logs/logs.go b/logs/logs.go index 29976d3416..07839c7753 100644 --- a/logs/logs.go +++ b/logs/logs.go @@ -65,13 +65,13 @@ type LogBackend interface { // e.g. a particular log stream in cloudwatchlogs. type LogDest interface { Publish(events []LogEvent) error + NotifySourceStopped() } // LogAgent is the agent handles pure log pipelines type LogAgent struct { Config *config.Config backends map[string]LogBackend - destNames map[LogDest]string collections []LogCollection retentionAlreadyAttempted map[string]bool } @@ -80,7 +80,6 @@ func NewLogAgent(c *config.Config) *LogAgent { return &LogAgent{ Config: c, backends: make(map[string]LogBackend), - destNames: make(map[LogDest]string), retentionAlreadyAttempted: make(map[string]bool), } } @@ -136,7 +135,6 @@ func (l *LogAgent) Run(ctx context.Context) { } retention = l.checkRetentionAlreadyAttempted(retention, logGroup) dest := backend.CreateDest(logGroup, logStream, retention, logGroupClass, src) - l.destNames[dest] = dname log.Printf("I! [logagent] piping log from %s/%s(%s) to %s with retention %d", logGroup, logStream, description, dname, retention) go l.runSrcToDest(src, dest) } @@ -148,8 +146,10 @@ func (l *LogAgent) Run(ctx context.Context) { } func (l *LogAgent) runSrcToDest(src LogSrc, dest LogDest) { + eventsCh := make(chan LogEvent) defer src.Stop() + defer dest.NotifySourceStopped() closed := false src.SetOutput(func(e LogEvent) { @@ -168,11 +168,11 @@ func (l *LogAgent) runSrcToDest(src LogSrc, dest LogDest) { for e := range eventsCh { err := dest.Publish([]LogEvent{e}) if err == ErrOutputStopped { - log.Printf("I! [logagent] Log destination %v has stopped, finalizing %v/%v", l.destNames[dest], src.Group(), src.Stream()) + log.Printf("I! [logagent] Log destination %v has stopped, finalizing %v/%v", src.Destination(), src.Group(), src.Stream()) return } if err != nil { - log.Printf("E! [logagent] Failed to publish log to %v, error: %v", l.destNames[dest], err) + log.Printf("E! [logagent] Failed to publish log to %v, error: %v", src.Destination(), err) return } } diff --git a/plugins/inputs/logfile/logfile.go b/plugins/inputs/logfile/logfile.go index efd4a782b8..43bef59896 100644 --- a/plugins/inputs/logfile/logfile.go +++ b/plugins/inputs/logfile/logfile.go @@ -41,8 +41,9 @@ type LogFile struct { started bool } -func NewLogFile() *LogFile { +var _ logs.LogCollection = (*LogFile)(nil) +func NewLogFile() *LogFile { return &LogFile{ configs: make(map[*FileConfig]map[string]*tailerSrc), done: make(chan struct{}), @@ -251,11 +252,6 @@ func (t *LogFile) FindLogSrc() []logs.LogSrc { } } - destination := fileconfig.Destination - if destination == "" { - destination = t.Destination - } - src := NewTailerSrc( groupName, streamName, t.Destination, diff --git a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go index 09641d651d..770ef5e3f9 100644 --- a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go +++ b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go @@ -4,7 +4,6 @@ package cloudwatchlogs import ( - "encoding/json" "fmt" "regexp" "strings" @@ -28,7 +27,6 @@ import ( "github.com/aws/amazon-cloudwatch-agent/logs" "github.com/aws/amazon-cloudwatch-agent/plugins/outputs/cloudwatchlogs/internal/pusher" "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" - "github.com/aws/amazon-cloudwatch-agent/tool/util" ) const ( @@ -40,10 +38,7 @@ const ( defaultFlushTimeout = 5 * time.Second - maxRetryTimeout = 14*24*time.Hour + 10*time.Minute - metricRetryTimeout = 2 * time.Minute - - attributesInFields = "attributesInFields" + maxRetryTimeout = 14*24*time.Hour + 10*time.Minute ) var ( @@ -74,22 +69,24 @@ type CloudWatchLogs struct { Log telegraf.Logger `toml:"-"` - pusherStopChan chan struct{} pusherWaitGroup sync.WaitGroup cwDests sync.Map workerPool pusher.WorkerPool targetManager pusher.TargetManager once sync.Once middleware awsmiddleware.Middleware + configurer *awsmiddleware.Configurer + configurerOnce sync.Once } +var _ logs.LogBackend = (*CloudWatchLogs)(nil) +var _ telegraf.Output = (*CloudWatchLogs)(nil) + func (c *CloudWatchLogs) Connect() error { return nil } func (c *CloudWatchLogs) Close() error { - close(c.pusherStopChan) - c.pusherWaitGroup.Wait() c.cwDests.Range(func(_, value interface{}) bool { if d, ok := value.(*cwDest); ok { @@ -98,6 +95,8 @@ func (c *CloudWatchLogs) Close() error { return true }) + c.pusherWaitGroup.Wait() + if c.workerPool != nil { c.workerPool.Stop() } @@ -106,10 +105,8 @@ func (c *CloudWatchLogs) Close() error { } func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error { - for _, m := range metrics { - c.writeMetricAsStructuredLog(m) - } - return nil + // we no longer expect this to be used. We now use the OTel awsemfexporter for sending EMF metrics to CloudWatch Logs + return fmt.Errorf("unexpected call to Write") } func (c *CloudWatchLogs) CreateDest(group, stream string, retention int, logGroupClass string, logSrc logs.LogSrc) logs.LogDest { @@ -134,7 +131,13 @@ func (c *CloudWatchLogs) CreateDest(group, stream string, retention int, logGrou func (c *CloudWatchLogs) getDest(t pusher.Target, logSrc logs.LogSrc) *cwDest { if cwd, ok := c.cwDests.Load(t); ok { - return cwd.(*cwDest) + d := cwd.(*cwDest) + d.Lock() + defer d.Unlock() + if !d.stopped { + d.refCount++ + return d + } } logThrottleRetryer := retryer.NewLogThrottleRetryer(c.Log) @@ -150,8 +153,15 @@ func (c *CloudWatchLogs) getDest(t pusher.Target, logSrc logs.LogSrc) *cwDest { } c.targetManager = pusher.NewTargetManager(c.Log, client) }) - p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, c.pusherStopChan, &c.pusherWaitGroup) - cwd := &cwDest{pusher: p, retryer: logThrottleRetryer} + p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, &c.pusherWaitGroup) + cwd := &cwDest{ + pusher: p, + retryer: logThrottleRetryer, + refCount: 1, + onStopFunc: func() { + c.cwDests.Delete(t) + }, + } c.cwDests.Store(t, cwd) return cwd } @@ -177,7 +187,10 @@ func (c *CloudWatchLogs) createClient(retryer aws.RequestRetryer) *cloudwatchlog ) client.Handlers.Build.PushBackNamed(handlers.NewRequestCompressionHandler([]string{"PutLogEvents"})) if c.middleware != nil { - if err := awsmiddleware.NewConfigurer(c.middleware.Handlers()).Configure(awsmiddleware.SDKv1(&client.Handlers)); err != nil { + c.configurerOnce.Do(func() { + c.configurer = awsmiddleware.NewConfigurer(c.middleware.Handlers()) + }) + if err := c.configurer.Configure(awsmiddleware.SDKv1(&client.Handlers)); err != nil { c.Log.Errorf("Unable to configure middleware on cloudwatch logs client: %v", err) } else { c.Log.Debug("Configured middleware on AWS client") @@ -186,145 +199,67 @@ func (c *CloudWatchLogs) createClient(retryer aws.RequestRetryer) *cloudwatchlog return client } -func (c *CloudWatchLogs) writeMetricAsStructuredLog(m telegraf.Metric) { - t, err := c.getTargetFromMetric(m) - if err != nil { - c.Log.Errorf("Failed to find target: %v", err) - } - cwd := c.getDest(t, nil) - if cwd == nil { - c.Log.Warnf("unable to find log destination, group: %v, stream: %v", t.Group, t.Stream) - return - } - cwd.switchToEMF() - cwd.pusher.Sender.SetRetryDuration(metricRetryTimeout) - - e := c.getLogEventFromMetric(m) - if e == nil { - return - } - - cwd.AddEvent(e) -} - -func (c *CloudWatchLogs) getTargetFromMetric(m telegraf.Metric) (pusher.Target, error) { - tags := m.Tags() - logGroup, ok := tags[LogGroupNameTag] - if !ok { - return pusher.Target{}, fmt.Errorf("structuredlog receive a metric with name '%v' without log group name", m.Name()) - } else { - m.RemoveTag(LogGroupNameTag) - } - - logStream, ok := tags[LogStreamNameTag] - if ok { - m.RemoveTag(LogStreamNameTag) - } else if logStream == "" { - logStream = c.LogStreamName - } - - return pusher.Target{Group: logGroup, Stream: logStream, Class: util.StandardLogGroupClass, Retention: -1}, nil +// Description returns a one-sentence description on the Output +func (c *CloudWatchLogs) Description() string { + return "Configuration for AWS CloudWatchLogs output." } -func (c *CloudWatchLogs) getLogEventFromMetric(metric telegraf.Metric) *structuredLogEvent { - var message string - if metric.HasField(LogEntryField) { - var ok bool - if message, ok = metric.Fields()[LogEntryField].(string); !ok { - c.Log.Warnf("The log entry value field is not string type: %v", metric.Fields()) - return nil - } - } else { - content := map[string]interface{}{} - tags := metric.Tags() - // build all the attributesInFields - if val, ok := tags[attributesInFields]; ok { - attributes := strings.Split(val, ",") - mFields := metric.Fields() - for _, attr := range attributes { - if fieldVal, ok := mFields[attr]; ok { - content[attr] = fieldVal - metric.RemoveField(attr) - } - } - metric.RemoveTag(attributesInFields) - delete(tags, attributesInFields) - } - - // build remaining attributes - for k := range tags { - content[k] = tags[k] - } - - for k, v := range metric.Fields() { - var value interface{} - - switch t := v.(type) { - case int: - value = float64(t) - case int32: - value = float64(t) - case int64: - value = float64(t) - case uint: - value = float64(t) - case uint32: - value = float64(t) - case uint64: - value = float64(t) - case float64: - value = t - case bool: - value = t - case string: - value = t - case time.Time: - value = float64(t.Unix()) - - default: - c.Log.Errorf("Detected unexpected fields (%s,%v) when encoding structured log event, value type %T is not supported", k, v, v) - return nil - } - content[k] = value - } - - jsonMap, err := json.Marshal(content) - if err != nil { - c.Log.Errorf("Unalbe to marshal structured log content: %v", err) - } - message = string(jsonMap) - } - - return &structuredLogEvent{ - msg: message, - t: metric.Time(), - } -} +var sampleConfig = ` + ## Amazon REGION + region = "us-east-1" -type structuredLogEvent struct { - msg string - t time.Time -} + ## Amazon Credentials + ## Credentials are loaded in the following order + ## 1) Assumed credentials via STS if role_arn is specified + ## 2) explicit credentials from 'access_key' and 'secret_key' + ## 3) shared profile from 'profile' + ## 4) environment variables + ## 5) shared credentials file + ## 6) EC2 Instance Profile + #access_key = "" + #secret_key = "" + #token = "" + #role_arn = "" + #profile = "" + #shared_credential_file = "" -func (e *structuredLogEvent) Message() string { - return e.msg -} + # The log stream name. + log_stream_name = "" +` -func (e *structuredLogEvent) Time() time.Time { - return e.t +// SampleConfig returns the default configuration of the Output +func (c *CloudWatchLogs) SampleConfig() string { + return sampleConfig } -func (e *structuredLogEvent) Done() {} - +// cwDest is responsible for publishing logs from log files to a log group + log stream. +// Logs from more than one log file may be published to the same destination. cwDest closes +// itself when all log file tailers which referenced this cwDest are closed. +// All exported functions should practice thread-safety by acquiring lock the cwDest +// and not calling any other function which requires the lock. type cwDest struct { pusher *pusher.Pusher sync.Mutex isEMF bool - stopped bool retryer *retryer.LogThrottleRetryer + + // refCount keeps track of how many LogSrc objects are referencing + // this cwDest object at any given time. Once there are no more + // references, the cwDest object stops itself, closing all goroutines, + // and it can no longer be used + refCount int + stopped bool + onStopFunc func() } +var _ logs.LogDest = (*cwDest)(nil) + func (cd *cwDest) Publish(events []logs.LogEvent) error { + cd.Lock() + defer cd.Unlock() + if cd.stopped { + return logs.ErrOutputStopped + } for _, e := range events { if !cd.isEMF { msg := e.Message() @@ -332,20 +267,43 @@ func (cd *cwDest) Publish(events []logs.LogEvent) error { cd.switchToEMF() } } - cd.AddEvent(e) - } - if cd.stopped { - return logs.ErrOutputStopped + cd.addEvent(e) } return nil } +func (cd *cwDest) NotifySourceStopped() { + cd.Lock() + defer cd.Unlock() + cd.refCount-- + if cd.refCount <= 0 { + cd.stop() + } + + if cd.refCount < 0 { + fmt.Printf("E! Negative refCount on cwDest detected. refCount: %d, logGroup: %s, logStream: %s", cd.refCount, cd.pusher.Group, cd.pusher.Stream) + } +} + func (cd *cwDest) Stop() { + cd.Lock() + defer cd.Unlock() + cd.stop() +} + +func (cd *cwDest) stop() { + if cd.stopped { + return + } cd.retryer.Stop() + cd.pusher.Stop() cd.stopped = true + if cd.onStopFunc != nil { + cd.onStopFunc() + } } -func (cd *cwDest) AddEvent(e logs.LogEvent) { +func (cd *cwDest) addEvent(e logs.LogEvent) { // Drop events for metric path logs when queue is full if cd.isEMF { cd.pusher.AddEventNonBlocking(e) @@ -355,8 +313,6 @@ func (cd *cwDest) AddEvent(e logs.LogEvent) { } func (cd *cwDest) switchToEMF() { - cd.Lock() - defer cd.Unlock() if !cd.isEMF { cd.isEMF = true cwl, ok := cd.pusher.Service.(*cloudwatchlogs.CloudWatchLogs) @@ -366,44 +322,10 @@ func (cd *cwDest) switchToEMF() { } } -// Description returns a one-sentence description on the Output -func (c *CloudWatchLogs) Description() string { - return "Configuration for AWS CloudWatchLogs output." -} - -var sampleConfig = ` - ## Amazon REGION - region = "us-east-1" - - ## Amazon Credentials - ## Credentials are loaded in the following order - ## 1) Assumed credentials via STS if role_arn is specified - ## 2) explicit credentials from 'access_key' and 'secret_key' - ## 3) shared profile from 'profile' - ## 4) environment variables - ## 5) shared credentials file - ## 6) EC2 Instance Profile - #access_key = "" - #secret_key = "" - #token = "" - #role_arn = "" - #profile = "" - #shared_credential_file = "" - - # The log stream name. - log_stream_name = "" -` - -// SampleConfig returns the default configuration of the Output -func (c *CloudWatchLogs) SampleConfig() string { - return sampleConfig -} - func init() { outputs.Add("cloudwatchlogs", func() telegraf.Output { return &CloudWatchLogs{ ForceFlushInterval: internal.Duration{Duration: defaultFlushTimeout}, - pusherStopChan: make(chan struct{}), cwDests: sync.Map{}, middleware: agenthealth.NewAgentHealth( zap.NewNop(), diff --git a/plugins/outputs/cloudwatchlogs/cloudwatchlogs_test.go b/plugins/outputs/cloudwatchlogs/cloudwatchlogs_test.go index 63b6dfd0d0..66f1643fd0 100644 --- a/plugins/outputs/cloudwatchlogs/cloudwatchlogs_test.go +++ b/plugins/outputs/cloudwatchlogs/cloudwatchlogs_test.go @@ -69,13 +69,12 @@ func TestCreateDestination(t *testing.T) { for name, testCase := range testCases { t.Run(name, func(t *testing.T) { c := &CloudWatchLogs{ - Log: testutil.Logger{Name: "test"}, - LogGroupName: "G1", - LogStreamName: "S1", - AccessKey: "access_key", - SecretKey: "secret_key", - pusherStopChan: make(chan struct{}), - cwDests: sync.Map{}, + Log: testutil.Logger{Name: "test"}, + LogGroupName: "G1", + LogStreamName: "S1", + AccessKey: "access_key", + SecretKey: "secret_key", + cwDests: sync.Map{}, } dest := c.CreateDest(testCase.cfgLogGroup, testCase.cfgLogStream, testCase.cfgLogRetention, testCase.cfgLogClass, testCase.cfgTailerSrc).(*cwDest) require.Equal(t, testCase.expectedLogGroup, dest.pusher.Group) @@ -89,11 +88,10 @@ func TestCreateDestination(t *testing.T) { func TestDuplicateDestination(t *testing.T) { c := &CloudWatchLogs{ - Log: testutil.Logger{Name: "test"}, - AccessKey: "access_key", - SecretKey: "secret_key", - cwDests: sync.Map{}, - pusherStopChan: make(chan struct{}), + Log: testutil.Logger{Name: "test"}, + AccessKey: "access_key", + SecretKey: "secret_key", + cwDests: sync.Map{}, } // Given the same log group, log stream, same retention, and logClass d1 := c.CreateDest("FILENAME", "", -1, util.InfrequentAccessLogGroupClass, nil) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go index f5afeb309a..1d6edf57e9 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go @@ -109,6 +109,11 @@ func (s *senderPool) Send(batch *logEventBatch) { }) } +func (s *senderPool) Stop() { + // workerpool is stopped by the plugin + s.sender.Stop() +} + // SetRetryDuration sets the retry duration on the wrapped Sender. func (s *senderPool) SetRetryDuration(duration time.Duration) { s.sender.SetRetryDuration(duration) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go index b706688034..d9f3860967 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go @@ -105,10 +105,9 @@ func TestWorkerPool(t *testing.T) { func TestSenderPool(t *testing.T) { logger := testutil.NewNopLogger() - stop := make(chan struct{}) mockService := new(mockLogsService) mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil) - s := newSender(logger, mockService, nil, time.Second, stop) + s := newSender(logger, mockService, nil, time.Second) p := NewWorkerPool(12) sp := newSenderPool(p, s) @@ -132,5 +131,6 @@ func TestSenderPool(t *testing.T) { } p.Stop() + s.Stop() assert.Equal(t, int32(200), completed.Load()) } diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go index 33656c3bb2..57256ae033 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go @@ -33,11 +33,10 @@ func NewPusher( workerPool WorkerPool, flushTimeout time.Duration, retryDuration time.Duration, - stop <-chan struct{}, wg *sync.WaitGroup, ) *Pusher { - s := createSender(logger, service, targetManager, workerPool, retryDuration, stop) - q := newQueue(logger, target, flushTimeout, entityProvider, s, stop, wg) + s := createSender(logger, service, targetManager, workerPool, retryDuration) + q := newQueue(logger, target, flushTimeout, entityProvider, s, wg) targetManager.PutRetentionPolicy(target) return &Pusher{ Target: target, @@ -49,6 +48,11 @@ func NewPusher( } } +func (p *Pusher) Stop() { + p.Queue.Stop() + p.Sender.Stop() +} + // createSender initializes a Sender. Wraps it in a senderPool if a WorkerPool is provided. func createSender( logger telegraf.Logger, @@ -56,9 +60,8 @@ func createSender( targetManager TargetManager, workerPool WorkerPool, retryDuration time.Duration, - stop <-chan struct{}, ) Sender { - s := newSender(logger, service, targetManager, retryDuration, stop) + s := newSender(logger, service, targetManager, retryDuration) if workerPool == nil { return s } diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go index 54f68621f9..6d63e3c4ff 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go @@ -20,23 +20,21 @@ const eventCount = 100000 func TestPusher(t *testing.T) { t.Run("WithSender", func(t *testing.T) { t.Parallel() - stop := make(chan struct{}) var wg sync.WaitGroup - pusher := setupPusher(t, nil, stop, &wg) + pusher := setupPusher(t, nil, &wg) var completed atomic.Int32 generateEvents(t, pusher, &completed) - close(stop) + pusher.Stop() wg.Wait() }) t.Run("WithSenderPool", func(t *testing.T) { t.Parallel() - stop := make(chan struct{}) var wg sync.WaitGroup wp := NewWorkerPool(5) - pusher := setupPusher(t, wp, stop, &wg) + pusher := setupPusher(t, wp, &wg) _, isSenderPool := pusher.Sender.(*senderPool) assert.True(t, isSenderPool) @@ -44,12 +42,41 @@ func TestPusher(t *testing.T) { var completed atomic.Int32 generateEvents(t, pusher, &completed) - close(stop) + pusher.Stop() wg.Wait() wp.Stop() }) } +func TestPusherStop(t *testing.T) { + var wg sync.WaitGroup + + s := &mockSender{} + s.On("Stop").Return() + + logger := testutil.NewNopLogger() + target := Target{} + service := new(stubLogsService) + service.ple = func(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + return &cloudwatchlogs.PutLogEventsOutput{}, nil + } + mockManager := new(mockTargetManager) + q := newQueue(logger, target, time.Second, nil, s, &wg) + pusher := &Pusher{ + Target: target, + Queue: q, + Service: service, + TargetManager: mockManager, + EntityProvider: nil, + Sender: s, + } + + pusher.Stop() + + s.AssertCalled(t, "Stop") + +} + func generateEvents(t *testing.T, pusher *Pusher, completed *atomic.Int32) { t.Helper() for i := 0; i < eventCount; i++ { @@ -63,7 +90,7 @@ func generateEvents(t *testing.T, pusher *Pusher, completed *atomic.Int32) { } } -func setupPusher(t *testing.T, workerPool WorkerPool, stop chan struct{}, wg *sync.WaitGroup) *Pusher { +func setupPusher(t *testing.T, workerPool WorkerPool, wg *sync.WaitGroup) *Pusher { t.Helper() logger := testutil.NewNopLogger() target := Target{Group: "G", Stream: "S", Retention: 7} @@ -85,7 +112,6 @@ func setupPusher(t *testing.T, workerPool WorkerPool, stop chan struct{}, wg *sy workerPool, time.Second, time.Minute, - stop, wg, ) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go b/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go index da3a28a25a..e8ad65ffdc 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go @@ -17,6 +17,7 @@ import ( type Queue interface { AddEvent(e logs.LogEvent) AddEventNonBlocking(e logs.LogEvent) + Stop() } type queue struct { @@ -34,7 +35,8 @@ type queue struct { resetTimerCh chan struct{} flushTimer *time.Timer flushTimeout atomic.Value - stop <-chan struct{} + stopCh chan struct{} + stopped bool lastSentTime atomic.Value initNonBlockingChOnce sync.Once @@ -42,13 +44,14 @@ type queue struct { wg *sync.WaitGroup } +var _ (Queue) = (*queue)(nil) + func newQueue( logger telegraf.Logger, target Target, flushTimeout time.Duration, entityProvider logs.LogEntityProvider, sender Sender, - stop <-chan struct{}, wg *sync.WaitGroup, ) Queue { q := &queue{ @@ -61,7 +64,7 @@ func newQueue( flushCh: make(chan struct{}), resetTimerCh: make(chan struct{}), flushTimer: time.NewTimer(flushTimeout), - stop: stop, + stopCh: make(chan struct{}), startNonBlockCh: make(chan struct{}), wg: wg, } @@ -105,33 +108,30 @@ func (q *queue) AddEventNonBlocking(e logs.LogEvent) { } } +// Stop stops all goroutines associated with this queue instance. +func (q *queue) Stop() { + if q.stopped { + return + } + close(q.stopCh) + q.stopped = true +} + // start is the main loop for processing events and managing the queue. func (q *queue) start() { defer q.wg.Done() mergeChan := make(chan logs.LogEvent) - // Merge events from both blocking and non-blocking channel - go func() { - var nonBlockingEventsCh <-chan logs.LogEvent - for { - select { - case e := <-q.eventsCh: - mergeChan <- e - case e := <-nonBlockingEventsCh: - mergeChan <- e - case <-q.startNonBlockCh: - nonBlockingEventsCh = q.nonBlockingEventsCh - case <-q.stop: - return - } - } - }() - + go q.merge(mergeChan) go q.manageFlushTimer() for { select { - case e := <-mergeChan: + case e, ok := <-mergeChan: + if !ok { + q.send() + return + } // Start timer when first event of the batch is added (happens after a flush timer timeout) if len(q.batch.events) == 0 { q.resetFlushTimer() @@ -149,10 +149,23 @@ func (q *queue) start() { } else { q.resetFlushTimer() } - case <-q.stop: - if len(q.batch.events) > 0 { - q.send() - } + } + } +} + +// merge merges events from both blocking and non-blocking channel +func (q *queue) merge(mergeChan chan logs.LogEvent) { + defer close(mergeChan) + var nonBlockingEventsCh <-chan logs.LogEvent + for { + select { + case e := <-q.eventsCh: + mergeChan <- e + case e := <-nonBlockingEventsCh: + mergeChan <- e + case <-q.startNonBlockCh: + nonBlockingEventsCh = q.nonBlockingEventsCh + case <-q.stopCh: return } } @@ -194,7 +207,7 @@ func (q *queue) manageFlushTimer() { if flushTimeout, ok := q.flushTimeout.Load().(time.Duration); ok { q.flushTimer.Reset(flushTimeout) } - case <-q.stop: + case <-q.stopCh: q.stopFlushTimer() return } diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go index 8b9b4132e1..b5fc04d02e 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go @@ -86,6 +86,10 @@ func (m *mockSender) RetryDuration() time.Duration { return args.Get(0).(time.Duration) } +func (m *mockSender) Stop() { + m.Called() +} + func TestAddSingleEvent_WithAccountId(t *testing.T) { t.Parallel() var wg sync.WaitGroup @@ -119,7 +123,7 @@ func TestAddSingleEvent_WithAccountId(t *testing.T) { } ep := newMockEntityProvider(expectedEntity) - stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, ep, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, ep, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now())) require.False(t, called.Load(), "PutLogEvents has been called too fast, it should wait until FlushTimeout.") @@ -130,7 +134,8 @@ func TestAddSingleEvent_WithAccountId(t *testing.T) { time.Sleep(time.Second) require.True(t, called.Load(), "PutLogEvents has not been called after FlushTimeout has been reached.") - close(stop) + q.Stop() + sender.Stop() wg.Wait() } @@ -155,7 +160,7 @@ func TestAddSingleEvent_WithoutAccountId(t *testing.T) { } ep := newMockEntityProvider(nil) - stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, ep, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, ep, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now())) require.False(t, called.Load(), "PutLogEvents has been called too fast, it should wait until FlushTimeout.") @@ -166,7 +171,8 @@ func TestAddSingleEvent_WithoutAccountId(t *testing.T) { time.Sleep(2 * time.Second) require.True(t, called.Load(), "PutLogEvents has not been called after FlushTimeout has been reached.") - close(stop) + q.Stop() + sender.Stop() wg.Wait() } @@ -184,14 +190,15 @@ func TestStopQueueWouldDoFinalSend(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now())) time.Sleep(10 * time.Millisecond) require.False(t, called.Load(), "PutLogEvents has been called too fast, it should wait until FlushTimeout.") - close(stop) + q.Stop() + sender.Stop() wg.Wait() require.True(t, called.Load(), "PutLogEvents has not been called after FlushTimeout has been reached.") @@ -207,13 +214,14 @@ func TestStopPusherWouldStopRetries(t *testing.T) { } logSink := testutil.NewLogSink() - stop, q := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now())) time.Sleep(10 * time.Millisecond) triggerSend(t, q) // stop should try flushing the remaining events with retry disabled - close(stop) + q.Stop() + sender.Stop() time.Sleep(50 * time.Millisecond) wg.Wait() @@ -248,11 +256,12 @@ func TestLongMessageHandling(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent(longMsg, time.Now())) triggerSend(t, q) - close(stop) + q.Stop() + sender.Stop() wg.Wait() } @@ -276,14 +285,15 @@ func TestRequestIsLessThan1MB(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) for i := 0; i < 8; i++ { q.AddEvent(newStubLogEvent(longMsg, time.Now())) } time.Sleep(10 * time.Millisecond) triggerSend(t, q) triggerSend(t, q) - close(stop) + q.Stop() + sender.Stop() wg.Wait() } @@ -301,7 +311,7 @@ func TestRequestIsLessThan10kEvents(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) for i := 0; i < 30000; i++ { q.AddEvent(newStubLogEvent(msg, time.Now())) } @@ -309,7 +319,8 @@ func TestRequestIsLessThan10kEvents(t *testing.T) { for i := 0; i < 5; i++ { triggerSend(t, q) } - close(stop) + q.Stop() + sender.Stop() wg.Wait() } @@ -326,7 +337,7 @@ func TestTimestampPopulation(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) for i := 0; i < 3; i++ { q.AddEvent(newStubLogEvent("msg", time.Time{})) } @@ -334,7 +345,8 @@ func TestTimestampPopulation(t *testing.T) { for i := 0; i < 5; i++ { triggerSend(t, q) } - close(stop) + q.Stop() + sender.Stop() wg.Wait() } @@ -349,7 +361,7 @@ func TestIgnoreOutOfTimeRangeEvent(t *testing.T) { } logSink := testutil.NewLogSink() - stop, q := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, 2*time.Hour, nil, &wg) + q, sender := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, 2*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now().Add(-15*24*time.Hour))) q.AddEventNonBlocking(newStubLogEvent("MSG", time.Now().Add(2*time.Hour+1*time.Minute))) @@ -362,7 +374,8 @@ func TestIgnoreOutOfTimeRangeEvent(t *testing.T) { } time.Sleep(20 * time.Millisecond) - close(stop) + q.Stop() + sender.Stop() wg.Wait() } @@ -401,7 +414,7 @@ func TestAddMultipleEvents(t *testing.T) { )) } evts[10], evts[90] = evts[90], evts[10] // make events out of order - stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) for _, e := range evts { q.AddEvent(e) } @@ -412,7 +425,8 @@ func TestAddMultipleEvents(t *testing.T) { time.Sleep(time.Second) - close(stop) + q.Stop() + sender.Stop() wg.Wait() } @@ -452,7 +466,7 @@ func TestSendReqWhenEventsSpanMoreThan24Hrs(t *testing.T) { return nil, nil } - stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent("MSG 25hrs ago", time.Now().Add(-25*time.Hour))) q.AddEvent(newStubLogEvent("MSG 24hrs ago", time.Now().Add(-24*time.Hour))) q.AddEvent(newStubLogEvent("MSG 23hrs ago", time.Now().Add(-23*time.Hour))) @@ -461,7 +475,8 @@ func TestSendReqWhenEventsSpanMoreThan24Hrs(t *testing.T) { time.Sleep(10 * time.Millisecond) q.resetFlushTimer() time.Sleep(20 * time.Millisecond) - close(stop) + q.Stop() + sender.Stop() wg.Wait() } @@ -481,7 +496,7 @@ func TestUnhandledErrorWouldNotResend(t *testing.T) { } logSink := testutil.NewLogSink() - stop, q := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, 2*time.Hour, nil, &wg) + q, sender := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, 2*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent("msg", time.Now())) time.Sleep(2 * time.Second) @@ -489,7 +504,8 @@ func TestUnhandledErrorWouldNotResend(t *testing.T) { require.True(t, strings.Contains(logLine, "E!"), fmt.Sprintf("Expecting error log with unhandled error, but received '%s' in the log", logLine)) require.True(t, strings.Contains(logLine, "unhandled error"), fmt.Sprintf("Expecting error log with unhandled error, but received '%s' in the log", logLine)) - close(stop) + q.Stop() + sender.Stop() wg.Wait() require.EqualValues(t, 1, cnt.Load(), fmt.Sprintf("Expecting pusher to call send 1 time, but %d times called", cnt.Load())) } @@ -526,7 +542,7 @@ func TestCreateLogGroupAndLogStreamWhenNotFound(t *testing.T) { } logSink := testutil.NewLogSink() - stop, q := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) var eventWG sync.WaitGroup eventWG.Add(1) q.AddEvent(&stubLogEvent{message: "msg", timestamp: time.Now(), done: eventWG.Done}) @@ -544,7 +560,8 @@ func TestCreateLogGroupAndLogStreamWhenNotFound(t *testing.T) { require.True(t, foundUnknownErr, fmt.Sprintf("Expecting error log with unknown error, but received '%s' in the log", logSink)) - close(stop) + q.Stop() + sender.Stop() wg.Wait() } @@ -563,7 +580,7 @@ func TestLogRejectedLogEntryInfo(t *testing.T) { } logSink := testutil.NewLogSink() - stop, q := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) var eventWG sync.WaitGroup eventWG.Add(1) q.AddEvent(&stubLogEvent{message: "msg", timestamp: time.Now(), done: eventWG.Done}) @@ -586,7 +603,8 @@ func TestLogRejectedLogEntryInfo(t *testing.T) { require.True(t, strings.Contains(logLine, "W!"), fmt.Sprintf("Expecting error log events too expired, but received '%s' in the log", logSink.String())) require.True(t, strings.Contains(logLine, "300"), fmt.Sprintf("Expecting error log events too expired, but received '%s' in the log", logSink.String())) - close(stop) + q.Stop() + sender.Stop() wg.Wait() } @@ -612,7 +630,7 @@ func TestAddEventNonBlocking(t *testing.T) { start.Add(time.Duration(i)*time.Millisecond), )) } - stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + q, sender := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) time.Sleep(200 * time.Millisecond) // Wait until pusher started, merge channel is blocked for _, e := range evts { @@ -623,7 +641,8 @@ func TestAddEventNonBlocking(t *testing.T) { triggerSend(t, q) time.Sleep(20 * time.Millisecond) - close(stop) + q.Stop() + sender.Stop() wg.Wait() } @@ -639,7 +658,7 @@ func TestResendWouldStopAfterExhaustedRetries(t *testing.T) { } logSink := testutil.NewLogSink() - stop, q := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, time.Second, nil, &wg) + q, sender := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, time.Second, nil, &wg) q.AddEvent(newStubLogEvent("msg", time.Now())) time.Sleep(2 * time.Second) @@ -648,7 +667,8 @@ func TestResendWouldStopAfterExhaustedRetries(t *testing.T) { expected := fmt.Sprintf("All %v retries to G/S failed for PutLogEvents, request dropped.", cnt.Load()-1) require.True(t, strings.HasSuffix(lastLine, expected), fmt.Sprintf("Expecting error log to end with request dropped, but received '%s' in the log", logSink.String())) - close(stop) + q.Stop() + sender.Stop() wg.Wait() } @@ -667,7 +687,7 @@ func testPreparation( retryDuration time.Duration, entityProvider logs.LogEntityProvider, wg *sync.WaitGroup, -) (chan struct{}, *queue) { +) (*queue, Sender) { return testPreparationWithLogger( t, testutil.NewNopLogger(), @@ -689,21 +709,19 @@ func testPreparationWithLogger( retryDuration time.Duration, entityProvider logs.LogEntityProvider, wg *sync.WaitGroup, -) (chan struct{}, *queue) { +) (*queue, Sender) { t.Helper() - stop := make(chan struct{}) tm := NewTargetManager(logger, service) - s := newSender(logger, service, tm, retryDuration, stop) + s := newSender(logger, service, tm, retryDuration) q := newQueue( logger, Target{"G", "S", util.StandardLogGroupClass, retention}, flushTimeout, entityProvider, s, - stop, wg, ) - return stop, q.(*queue) + return q.(*queue), s } func TestQueueCallbackRegistration(t *testing.T) { @@ -729,7 +747,6 @@ func TestQueueCallbackRegistration(t *testing.T) { }).Return() logger := testutil.NewNopLogger() - stop := make(chan struct{}) q := &queue{ target: Target{"G", "S", util.StandardLogGroupClass, -1}, logger: logger, @@ -740,7 +757,6 @@ func TestQueueCallbackRegistration(t *testing.T) { flushCh: make(chan struct{}), resetTimerCh: make(chan struct{}), flushTimer: time.NewTimer(10 * time.Millisecond), - stop: stop, startNonBlockCh: make(chan struct{}), wg: &wg, } @@ -773,7 +789,6 @@ func TestQueueCallbackRegistration(t *testing.T) { }).Return() logger := testutil.NewNopLogger() - stop := make(chan struct{}) q := &queue{ target: Target{"G", "S", util.StandardLogGroupClass, -1}, logger: logger, @@ -784,7 +799,6 @@ func TestQueueCallbackRegistration(t *testing.T) { flushCh: make(chan struct{}), resetTimerCh: make(chan struct{}), flushTimer: time.NewTimer(10 * time.Millisecond), - stop: stop, startNonBlockCh: make(chan struct{}), wg: &wg, } diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go b/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go index 365dba91a0..de1bdf6708 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go @@ -26,6 +26,7 @@ type Sender interface { Send(*logEventBatch) SetRetryDuration(time.Duration) RetryDuration() time.Duration + Stop() } type sender struct { @@ -33,21 +34,24 @@ type sender struct { retryDuration atomic.Value targetManager TargetManager logger telegraf.Logger - stop <-chan struct{} + stopCh chan struct{} + stopped bool } +var _ (Sender) = (*sender)(nil) + func newSender( logger telegraf.Logger, service cloudWatchLogsService, targetManager TargetManager, retryDuration time.Duration, - stop <-chan struct{}, ) Sender { s := &sender{ logger: logger, service: service, targetManager: targetManager, - stop: stop, + stopCh: make(chan struct{}), + stopped: false, } s.retryDuration.Store(retryDuration) return s @@ -125,7 +129,7 @@ func (s *sender) Send(batch *logEventBatch) { s.logger.Warnf("Retried %v time, going to sleep %v before retrying.", retryCountShort+retryCountLong-1, wait) select { - case <-s.stop: + case <-s.stopCh: s.logger.Errorf("Stop requested after %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCountShort+retryCountLong-1, batch.Group, batch.Stream) batch.updateState() return @@ -134,6 +138,14 @@ func (s *sender) Send(batch *logEventBatch) { } } +func (s *sender) Stop() { + if s.stopped { + return + } + close(s.stopCh) + s.stopped = true +} + // SetRetryDuration sets the maximum duration for retrying failed log sends. func (s *sender) SetRetryDuration(retryDuration time.Duration) { s.retryDuration.Store(retryDuration) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go index ccc6d1b3ba..3b469350ef 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go @@ -80,8 +80,9 @@ func TestSender(t *testing.T) { mockManager := new(mockTargetManager) mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once() - s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) + s := newSender(logger, mockService, mockManager, time.Second) s.Send(batch) + s.Stop() mockService.AssertExpectations(t) assert.True(t, stateCallbackCalled, "State callback was not called in success scenario") @@ -102,8 +103,9 @@ func TestSender(t *testing.T) { mockManager := new(mockTargetManager) mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{RejectedLogEventsInfo: rejectedInfo}, nil).Once() - s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) + s := newSender(logger, mockService, mockManager, time.Second) s.Send(batch) + s.Stop() mockService.AssertExpectations(t) }) @@ -120,8 +122,9 @@ func TestSender(t *testing.T) { mockManager.On("InitTarget", mock.Anything).Return(nil).Once() mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once() - s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) + s := newSender(logger, mockService, mockManager, time.Second) s.Send(batch) + s.Stop() mockService.AssertExpectations(t) mockManager.AssertExpectations(t) @@ -146,8 +149,9 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, &cloudwatchlogs.InvalidParameterException{}).Once() - s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) + s := newSender(logger, mockService, mockManager, time.Second) s.Send(batch) + s.Stop() mockService.AssertExpectations(t) assert.True(t, stateCallbackCalled, "State callback was not called for InvalidParameterException") @@ -173,8 +177,9 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, &cloudwatchlogs.DataAlreadyAcceptedException{}).Once() - s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) + s := newSender(logger, mockService, mockManager, time.Second) s.Send(batch) + s.Stop() mockService.AssertExpectations(t) assert.True(t, stateCallbackCalled, "State callback was not called for DataAlreadyAcceptedException") @@ -200,8 +205,9 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, errors.New("test")).Once() - s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) + s := newSender(logger, mockService, mockManager, time.Second) s.Send(batch) + s.Stop() mockService.AssertExpectations(t) assert.True(t, stateCallbackCalled, "State callback was not called for non-AWS error") @@ -219,8 +225,9 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once() - s := newSender(logger, mockService, mockManager, time.Second, make(chan struct{})) + s := newSender(logger, mockService, mockManager, time.Second) s.Send(batch) + s.Stop() mockService.AssertExpectations(t) }) @@ -244,8 +251,9 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, awserr.New("SomeAWSError", "Some AWS error", nil)).Once() - s := newSender(logger, mockService, mockManager, 100*time.Millisecond, make(chan struct{})) + s := newSender(logger, mockService, mockManager, 100*time.Millisecond) s.Send(batch) + s.Stop() mockService.AssertExpectations(t) assert.True(t, stateCallbackCalled, "State callback was not called when retry attempts were exhausted") @@ -271,12 +279,11 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, awserr.New("SomeAWSError", "Some AWS error", nil)).Once() - stopCh := make(chan struct{}) - s := newSender(logger, mockService, mockManager, time.Second, stopCh) + s := newSender(logger, mockService, mockManager, time.Second) go func() { time.Sleep(50 * time.Millisecond) - close(stopCh) + s.Stop() }() s.Send(batch)