From 724e073384ef2341301d099aea065824ed82035c Mon Sep 17 00:00:00 2001 From: Dominic Sciascia Date: Fri, 16 May 2025 12:39:46 -0400 Subject: [PATCH 1/5] add limiter for dlg and prp --- .../cloudwatchlogs/internal/pusher/target.go | 36 +++++-- .../internal/pusher/target_test.go | 102 ++++++++++++++++++ 2 files changed, 127 insertions(+), 11 deletions(-) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/target.go b/plugins/outputs/cloudwatchlogs/internal/pusher/target.go index aa78ba85e1..366de90569 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/target.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/target.go @@ -11,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/influxdata/telegraf" + "golang.org/x/time/rate" "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" ) @@ -23,6 +24,9 @@ const ( baseRetryDelay = 1 * time.Second maxRetryDelayTarget = 10 * time.Second numBackoffRetries = 5 + // DescribeLogGroups/PutRetentionPolicy limiter + limiterBurstSize = 1 + limiterRefillRate = 1 ) type Target struct { @@ -39,21 +43,25 @@ type targetManager struct { logger telegraf.Logger service cloudWatchLogsService // cache of initialized targets - cache map[Target]time.Time - cacheTTL time.Duration - mu sync.Mutex - dlg chan Target - prp chan Target + cache map[Target]time.Time + cacheTTL time.Duration + mu sync.Mutex + dlg chan Target + prp chan Target + dlgLimiter *rate.Limiter + prpLimiter *rate.Limiter } func NewTargetManager(logger telegraf.Logger, service cloudWatchLogsService) TargetManager { tm := &targetManager{ - logger: logger, - service: service, - cache: make(map[Target]time.Time), - cacheTTL: cacheTTL, - dlg: make(chan Target, retentionChannelSize), - prp: make(chan Target, retentionChannelSize), + logger: logger, + service: service, + cache: make(map[Target]time.Time), + cacheTTL: cacheTTL, + dlg: make(chan Target, retentionChannelSize), + prp: make(chan Target, retentionChannelSize), + dlgLimiter: rate.NewLimiter(limiterRefillRate, limiterBurstSize), + prpLimiter: rate.NewLimiter(limiterRefillRate, limiterBurstSize), } go tm.processDescribeLogGroup() @@ -176,6 +184,9 @@ func (m *targetManager) createLogStream(t Target) error { func (m *targetManager) processDescribeLogGroup() { for target := range m.dlg { for attempt := 0; attempt < numBackoffRetries; attempt++ { + resv := m.dlgLimiter.Reserve() + time.Sleep(resv.Delay()) + currentRetention, err := m.getRetention(target) if err != nil { m.logger.Errorf("failed to describe log group retention for target %v: %v", target, err) @@ -218,6 +229,9 @@ func (m *targetManager) processPutRetentionPolicy() { for target := range m.prp { var updated bool for attempt := 0; attempt < numBackoffRetries; attempt++ { + resv := m.prpLimiter.Reserve() + time.Sleep(resv.Delay()) + err := m.updateRetentionPolicy(target) if err == nil { updated = true diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go index 1224800c73..aee39bd0fe 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go @@ -355,6 +355,108 @@ func TestCalculateBackoff(t *testing.T) { assert.True(t, totalDelay <= 30*time.Second, "Total delay across all attempts should not exceed 30 seconds, but was %v", totalDelay) } +func TestTargetManager_RateLimiter(t *testing.T) { + logger := testutil.NewNopLogger() + + t.Run("DescribeLogGroupsRateLimited", func(t *testing.T) { + targets := []Target{ + {Group: "G1", Stream: "S1", Retention: 7}, + {Group: "G2", Stream: "S2", Retention: 14}, + {Group: "G3", Stream: "S3", Retention: 30}, + } + + mockService := new(mockLogsService) + var callTimes []time.Time + + mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil) + mockService.On("DescribeLogGroups", mock.Anything).Run(func(args mock.Arguments) { + callTimes = append(callTimes, time.Now()) + }).Return(&cloudwatchlogs.DescribeLogGroupsOutput{ + LogGroups: []*cloudwatchlogs.LogGroup{ + { + LogGroupName: aws.String("G1"), + RetentionInDays: aws.Int64(0), + }, + { + LogGroupName: aws.String("G2"), + RetentionInDays: aws.Int64(0), + }, + { + LogGroupName: aws.String("G3"), + RetentionInDays: aws.Int64(0), + }, + }, + }, nil) + mockService.On("PutRetentionPolicy", mock.Anything).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil) + + manager := NewTargetManager(logger, mockService) + + for _, target := range targets { + err := manager.InitTarget(target) + assert.NoError(t, err) + } + time.Sleep(5 * time.Second) + + assertCacheLen(t, manager, 3) + + // Check that there was at least 1 second between calls (rate limit is 1 per second) + for i := 1; i < len(callTimes); i++ { + timeDiff := callTimes[i].Sub(callTimes[i-1]) + assert.GreaterOrEqual(t, timeDiff, time.Second, + "Expected at least ~1 second between DescribeLogGroups calls due to rate limiting: got %v", timeDiff) + } + }) + + t.Run("PutRetentionPolicyRateLimited", func(t *testing.T) { + targets := []Target{ + {Group: "G1", Stream: "S1", Retention: 7}, + {Group: "G2", Stream: "S2", Retention: 14}, + {Group: "G3", Stream: "S3", Retention: 30}, + } + + mockService := new(mockLogsService) + var callTimes []time.Time + + mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil) + mockService.On("DescribeLogGroups", mock.Anything).Return(&cloudwatchlogs.DescribeLogGroupsOutput{ + LogGroups: []*cloudwatchlogs.LogGroup{ + { + LogGroupName: aws.String("G1"), + RetentionInDays: aws.Int64(0), + }, + { + LogGroupName: aws.String("G2"), + RetentionInDays: aws.Int64(0), + }, + { + LogGroupName: aws.String("G3"), + RetentionInDays: aws.Int64(0), + }, + }, + }, nil) + mockService.On("PutRetentionPolicy", mock.Anything).Run(func(args mock.Arguments) { + callTimes = append(callTimes, time.Now()) + }).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil) + + manager := NewTargetManager(logger, mockService) + + for _, target := range targets { + err := manager.InitTarget(target) + assert.NoError(t, err) + } + time.Sleep(5 * time.Second) + + assertCacheLen(t, manager, 3) + + // Check that there was at least 1 second between calls (rate limit is 1 per second) + for i := 1; i < len(callTimes); i++ { + timeDiff := callTimes[i].Sub(callTimes[i-1]) + assert.GreaterOrEqual(t, timeDiff, time.Second, + "Expected at least ~1 second between PutRetentionPolicy calls due to rate limiting: got %v", timeDiff) + } + }) +} + func assertCacheLen(t *testing.T, manager TargetManager, count int) { t.Helper() tm := manager.(*targetManager) From 18de14d2817f5434b015801785a8b31ebf86d4e6 Mon Sep 17 00:00:00 2001 From: Dominic Sciascia Date: Mon, 19 May 2025 17:50:16 +0000 Subject: [PATCH 2/5] fix lint --- plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go index aee39bd0fe..067dcc87e0 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go @@ -369,7 +369,7 @@ func TestTargetManager_RateLimiter(t *testing.T) { var callTimes []time.Time mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil) - mockService.On("DescribeLogGroups", mock.Anything).Run(func(args mock.Arguments) { + mockService.On("DescribeLogGroups", mock.Anything).Run(func(_ mock.Arguments) { callTimes = append(callTimes, time.Now()) }).Return(&cloudwatchlogs.DescribeLogGroupsOutput{ LogGroups: []*cloudwatchlogs.LogGroup{ @@ -434,7 +434,7 @@ func TestTargetManager_RateLimiter(t *testing.T) { }, }, }, nil) - mockService.On("PutRetentionPolicy", mock.Anything).Run(func(args mock.Arguments) { + mockService.On("PutRetentionPolicy", mock.Anything).Run(func(_ mock.Arguments) { callTimes = append(callTimes, time.Now()) }).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil) From 26595573a2ce3aa1aada82dda9a0257fe63e3502 Mon Sep 17 00:00:00 2001 From: Dominic Sciascia Date: Mon, 19 May 2025 18:30:13 +0000 Subject: [PATCH 3/5] loosen time between calls check --- plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go index 067dcc87e0..aa1f737612 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go @@ -402,7 +402,7 @@ func TestTargetManager_RateLimiter(t *testing.T) { // Check that there was at least 1 second between calls (rate limit is 1 per second) for i := 1; i < len(callTimes); i++ { timeDiff := callTimes[i].Sub(callTimes[i-1]) - assert.GreaterOrEqual(t, timeDiff, time.Second, + assert.GreaterOrEqual(t, timeDiff, 995 * time.Millisecond, "Expected at least ~1 second between DescribeLogGroups calls due to rate limiting: got %v", timeDiff) } }) @@ -451,7 +451,7 @@ func TestTargetManager_RateLimiter(t *testing.T) { // Check that there was at least 1 second between calls (rate limit is 1 per second) for i := 1; i < len(callTimes); i++ { timeDiff := callTimes[i].Sub(callTimes[i-1]) - assert.GreaterOrEqual(t, timeDiff, time.Second, + assert.GreaterOrEqual(t, timeDiff, 995 * time.Millisecond, "Expected at least ~1 second between PutRetentionPolicy calls due to rate limiting: got %v", timeDiff) } }) From eedbe1806380e442130c6954b1c40738d3cabb7e Mon Sep 17 00:00:00 2001 From: Dominic Sciascia Date: Mon, 19 May 2025 19:36:09 +0000 Subject: [PATCH 4/5] fix data race in test --- .../internal/pusher/target_test.go | 146 +++++++++++------- 1 file changed, 90 insertions(+), 56 deletions(-) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go index aa1f737612..4dfc5e1452 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go @@ -365,44 +365,61 @@ func TestTargetManager_RateLimiter(t *testing.T) { {Group: "G3", Stream: "S3", Retention: 30}, } - mockService := new(mockLogsService) + service := new(stubLogsService) + var mu sync.RWMutex var callTimes []time.Time - mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil) - mockService.On("DescribeLogGroups", mock.Anything).Run(func(_ mock.Arguments) { + service.cls = func(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + return &cloudwatchlogs.CreateLogStreamOutput{}, nil + } + service.dlg = func(input *cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error) { + mu.Lock() callTimes = append(callTimes, time.Now()) - }).Return(&cloudwatchlogs.DescribeLogGroupsOutput{ - LogGroups: []*cloudwatchlogs.LogGroup{ - { - LogGroupName: aws.String("G1"), - RetentionInDays: aws.Int64(0), - }, - { - LogGroupName: aws.String("G2"), - RetentionInDays: aws.Int64(0), - }, - { - LogGroupName: aws.String("G3"), - RetentionInDays: aws.Int64(0), + mu.Unlock() + return &cloudwatchlogs.DescribeLogGroupsOutput{ + LogGroups: []*cloudwatchlogs.LogGroup{ + { + LogGroupName: aws.String("G1"), + RetentionInDays: aws.Int64(0), + }, + { + LogGroupName: aws.String("G2"), + RetentionInDays: aws.Int64(0), + }, + { + LogGroupName: aws.String("G3"), + RetentionInDays: aws.Int64(0), + }, }, - }, - }, nil) - mockService.On("PutRetentionPolicy", mock.Anything).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil) - - manager := NewTargetManager(logger, mockService) + }, nil + } + service.prp = func(input *cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error) { + return &cloudwatchlogs.PutRetentionPolicyOutput{}, nil + } - for _, target := range targets { - err := manager.InitTarget(target) - assert.NoError(t, err) + manager := NewTargetManager(logger, service) + var wg sync.WaitGroup + for i := 0; i < len(targets); i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + err := manager.InitTarget(targets[idx]) + assert.NoError(t, err) + }(i) } - time.Sleep(5 * time.Second) + wg.Wait() assertCacheLen(t, manager, 3) // Check that there was at least 1 second between calls (rate limit is 1 per second) - for i := 1; i < len(callTimes); i++ { - timeDiff := callTimes[i].Sub(callTimes[i-1]) - assert.GreaterOrEqual(t, timeDiff, 995 * time.Millisecond, + mu.RLock() + callTimesCopy := make([]time.Time, len(callTimes)) // Make a copy to avoid race + copy(callTimesCopy, callTimes) + mu.RUnlock() + + for i := 1; i < len(callTimesCopy); i++ { + timeDiff := callTimesCopy[i].Sub(callTimesCopy[i-1]) + assert.GreaterOrEqual(t, timeDiff, 995*time.Millisecond, "Expected at least ~1 second between DescribeLogGroups calls due to rate limiting: got %v", timeDiff) } }) @@ -414,44 +431,61 @@ func TestTargetManager_RateLimiter(t *testing.T) { {Group: "G3", Stream: "S3", Retention: 30}, } - mockService := new(mockLogsService) + service := new(stubLogsService) + var mu sync.RWMutex var callTimes []time.Time - mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil) - mockService.On("DescribeLogGroups", mock.Anything).Return(&cloudwatchlogs.DescribeLogGroupsOutput{ - LogGroups: []*cloudwatchlogs.LogGroup{ - { - LogGroupName: aws.String("G1"), - RetentionInDays: aws.Int64(0), - }, - { - LogGroupName: aws.String("G2"), - RetentionInDays: aws.Int64(0), - }, - { - LogGroupName: aws.String("G3"), - RetentionInDays: aws.Int64(0), + service.cls = func(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + return &cloudwatchlogs.CreateLogStreamOutput{}, nil + } + service.dlg = func(input *cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error) { + return &cloudwatchlogs.DescribeLogGroupsOutput{ + LogGroups: []*cloudwatchlogs.LogGroup{ + { + LogGroupName: aws.String("G1"), + RetentionInDays: aws.Int64(0), + }, + { + LogGroupName: aws.String("G2"), + RetentionInDays: aws.Int64(0), + }, + { + LogGroupName: aws.String("G3"), + RetentionInDays: aws.Int64(0), + }, }, - }, - }, nil) - mockService.On("PutRetentionPolicy", mock.Anything).Run(func(_ mock.Arguments) { + }, nil + } + service.prp = func(input *cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error) { + mu.Lock() callTimes = append(callTimes, time.Now()) - }).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil) - - manager := NewTargetManager(logger, mockService) + mu.Unlock() + return &cloudwatchlogs.PutRetentionPolicyOutput{}, nil + } - for _, target := range targets { - err := manager.InitTarget(target) - assert.NoError(t, err) + manager := NewTargetManager(logger, service) + var wg sync.WaitGroup + for i := 0; i < len(targets); i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + err := manager.InitTarget(targets[idx]) + assert.NoError(t, err) + }(i) } - time.Sleep(5 * time.Second) + wg.Wait() assertCacheLen(t, manager, 3) // Check that there was at least 1 second between calls (rate limit is 1 per second) - for i := 1; i < len(callTimes); i++ { - timeDiff := callTimes[i].Sub(callTimes[i-1]) - assert.GreaterOrEqual(t, timeDiff, 995 * time.Millisecond, + mu.RLock() + callTimesCopy := make([]time.Time, len(callTimes)) // Make a copy to avoid race + copy(callTimesCopy, callTimes) + mu.RUnlock() + + for i := 1; i < len(callTimesCopy); i++ { + timeDiff := callTimesCopy[i].Sub(callTimesCopy[i-1]) + assert.GreaterOrEqual(t, timeDiff, 995*time.Millisecond, "Expected at least ~1 second between PutRetentionPolicy calls due to rate limiting: got %v", timeDiff) } }) From ed137c808fe9a2fbbf2de0b93aadce01b35d0ef2 Mon Sep 17 00:00:00 2001 From: Dominic Sciascia Date: Mon, 19 May 2025 19:49:02 +0000 Subject: [PATCH 5/5] fix lint again --- .../outputs/cloudwatchlogs/internal/pusher/target_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go index 4dfc5e1452..b02dcb56bf 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go @@ -372,7 +372,7 @@ func TestTargetManager_RateLimiter(t *testing.T) { service.cls = func(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { return &cloudwatchlogs.CreateLogStreamOutput{}, nil } - service.dlg = func(input *cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error) { + service.dlg = func(*cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error) { mu.Lock() callTimes = append(callTimes, time.Now()) mu.Unlock() @@ -393,7 +393,7 @@ func TestTargetManager_RateLimiter(t *testing.T) { }, }, nil } - service.prp = func(input *cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error) { + service.prp = func(*cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error) { return &cloudwatchlogs.PutRetentionPolicyOutput{}, nil } @@ -438,7 +438,7 @@ func TestTargetManager_RateLimiter(t *testing.T) { service.cls = func(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { return &cloudwatchlogs.CreateLogStreamOutput{}, nil } - service.dlg = func(input *cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error) { + service.dlg = func(*cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error) { return &cloudwatchlogs.DescribeLogGroupsOutput{ LogGroups: []*cloudwatchlogs.LogGroup{ { @@ -456,7 +456,7 @@ func TestTargetManager_RateLimiter(t *testing.T) { }, }, nil } - service.prp = func(input *cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error) { + service.prp = func(*cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error) { mu.Lock() callTimes = append(callTimes, time.Now()) mu.Unlock()