diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/target.go b/plugins/outputs/cloudwatchlogs/internal/pusher/target.go index aa78ba85e1..366de90569 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/target.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/target.go @@ -11,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/influxdata/telegraf" + "golang.org/x/time/rate" "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" ) @@ -23,6 +24,9 @@ const ( baseRetryDelay = 1 * time.Second maxRetryDelayTarget = 10 * time.Second numBackoffRetries = 5 + // DescribeLogGroups/PutRetentionPolicy limiter + limiterBurstSize = 1 + limiterRefillRate = 1 ) type Target struct { @@ -39,21 +43,25 @@ type targetManager struct { logger telegraf.Logger service cloudWatchLogsService // cache of initialized targets - cache map[Target]time.Time - cacheTTL time.Duration - mu sync.Mutex - dlg chan Target - prp chan Target + cache map[Target]time.Time + cacheTTL time.Duration + mu sync.Mutex + dlg chan Target + prp chan Target + dlgLimiter *rate.Limiter + prpLimiter *rate.Limiter } func NewTargetManager(logger telegraf.Logger, service cloudWatchLogsService) TargetManager { tm := &targetManager{ - logger: logger, - service: service, - cache: make(map[Target]time.Time), - cacheTTL: cacheTTL, - dlg: make(chan Target, retentionChannelSize), - prp: make(chan Target, retentionChannelSize), + logger: logger, + service: service, + cache: make(map[Target]time.Time), + cacheTTL: cacheTTL, + dlg: make(chan Target, retentionChannelSize), + prp: make(chan Target, retentionChannelSize), + dlgLimiter: rate.NewLimiter(limiterRefillRate, limiterBurstSize), + prpLimiter: rate.NewLimiter(limiterRefillRate, limiterBurstSize), } go tm.processDescribeLogGroup() @@ -176,6 +184,9 @@ func (m *targetManager) createLogStream(t Target) error { func (m *targetManager) processDescribeLogGroup() { for target := range m.dlg { for attempt := 0; attempt < numBackoffRetries; attempt++ { + resv := m.dlgLimiter.Reserve() + time.Sleep(resv.Delay()) + currentRetention, err := m.getRetention(target) if err != nil { m.logger.Errorf("failed to describe log group retention for target %v: %v", target, err) @@ -218,6 +229,9 @@ func (m *targetManager) processPutRetentionPolicy() { for target := range m.prp { var updated bool for attempt := 0; attempt < numBackoffRetries; attempt++ { + resv := m.prpLimiter.Reserve() + time.Sleep(resv.Delay()) + err := m.updateRetentionPolicy(target) if err == nil { updated = true diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go index 1224800c73..b02dcb56bf 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go @@ -355,6 +355,142 @@ func TestCalculateBackoff(t *testing.T) { assert.True(t, totalDelay <= 30*time.Second, "Total delay across all attempts should not exceed 30 seconds, but was %v", totalDelay) } +func TestTargetManager_RateLimiter(t *testing.T) { + logger := testutil.NewNopLogger() + + t.Run("DescribeLogGroupsRateLimited", func(t *testing.T) { + targets := []Target{ + {Group: "G1", Stream: "S1", Retention: 7}, + {Group: "G2", Stream: "S2", Retention: 14}, + {Group: "G3", Stream: "S3", Retention: 30}, + } + + service := new(stubLogsService) + var mu sync.RWMutex + var callTimes []time.Time + + service.cls = func(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + return &cloudwatchlogs.CreateLogStreamOutput{}, nil + } + service.dlg = func(*cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error) { + mu.Lock() + callTimes = append(callTimes, time.Now()) + mu.Unlock() + return &cloudwatchlogs.DescribeLogGroupsOutput{ + LogGroups: []*cloudwatchlogs.LogGroup{ + { + LogGroupName: aws.String("G1"), + RetentionInDays: aws.Int64(0), + }, + { + LogGroupName: aws.String("G2"), + RetentionInDays: aws.Int64(0), + }, + { + LogGroupName: aws.String("G3"), + RetentionInDays: aws.Int64(0), + }, + }, + }, nil + } + service.prp = func(*cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error) { + return &cloudwatchlogs.PutRetentionPolicyOutput{}, nil + } + + manager := NewTargetManager(logger, service) + var wg sync.WaitGroup + for i := 0; i < len(targets); i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + err := manager.InitTarget(targets[idx]) + assert.NoError(t, err) + }(i) + } + wg.Wait() + + assertCacheLen(t, manager, 3) + + // Check that there was at least 1 second between calls (rate limit is 1 per second) + mu.RLock() + callTimesCopy := make([]time.Time, len(callTimes)) // Make a copy to avoid race + copy(callTimesCopy, callTimes) + mu.RUnlock() + + for i := 1; i < len(callTimesCopy); i++ { + timeDiff := callTimesCopy[i].Sub(callTimesCopy[i-1]) + assert.GreaterOrEqual(t, timeDiff, 995*time.Millisecond, + "Expected at least ~1 second between DescribeLogGroups calls due to rate limiting: got %v", timeDiff) + } + }) + + t.Run("PutRetentionPolicyRateLimited", func(t *testing.T) { + targets := []Target{ + {Group: "G1", Stream: "S1", Retention: 7}, + {Group: "G2", Stream: "S2", Retention: 14}, + {Group: "G3", Stream: "S3", Retention: 30}, + } + + service := new(stubLogsService) + var mu sync.RWMutex + var callTimes []time.Time + + service.cls = func(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + return &cloudwatchlogs.CreateLogStreamOutput{}, nil + } + service.dlg = func(*cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error) { + return &cloudwatchlogs.DescribeLogGroupsOutput{ + LogGroups: []*cloudwatchlogs.LogGroup{ + { + LogGroupName: aws.String("G1"), + RetentionInDays: aws.Int64(0), + }, + { + LogGroupName: aws.String("G2"), + RetentionInDays: aws.Int64(0), + }, + { + LogGroupName: aws.String("G3"), + RetentionInDays: aws.Int64(0), + }, + }, + }, nil + } + service.prp = func(*cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error) { + mu.Lock() + callTimes = append(callTimes, time.Now()) + mu.Unlock() + return &cloudwatchlogs.PutRetentionPolicyOutput{}, nil + } + + manager := NewTargetManager(logger, service) + var wg sync.WaitGroup + for i := 0; i < len(targets); i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + err := manager.InitTarget(targets[idx]) + assert.NoError(t, err) + }(i) + } + wg.Wait() + + assertCacheLen(t, manager, 3) + + // Check that there was at least 1 second between calls (rate limit is 1 per second) + mu.RLock() + callTimesCopy := make([]time.Time, len(callTimes)) // Make a copy to avoid race + copy(callTimesCopy, callTimes) + mu.RUnlock() + + for i := 1; i < len(callTimesCopy); i++ { + timeDiff := callTimesCopy[i].Sub(callTimesCopy[i-1]) + assert.GreaterOrEqual(t, timeDiff, 995*time.Millisecond, + "Expected at least ~1 second between PutRetentionPolicy calls due to rate limiting: got %v", timeDiff) + } + }) +} + func assertCacheLen(t *testing.T, manager TargetManager, count int) { t.Helper() tm := manager.(*targetManager)