Skip to content

Conversation

dricross
Copy link
Contributor

@dricross dricross commented Sep 5, 2025

Description of the issue

There are two primary issues causing leaking goroutines to drive up CloudWatch Agent memory usage over time.

Everliving Destinations

The CloudWatch Agent can publish logs from log files to CloudWatch Log Streams determined by the file name. Each log file the agent is reading from creates a "source" object (specifically tailerSrc type) with several running goroutines, and each log stream that the CloudWatch Agent is pushing logs to creates a "destination" object (specifically cwDest type) with several running goroutines. The source objects' goroutines are closed when the associated log file is closed, but the destination objects are never subsequently cleaned up. This causes a memory leak as the goroutines are never closed and keep piling up.

Dynamically generated log stream names can be generated when using the publish_multi_logs flag. Here's is a sample entry in the collect list:

          {
            "publish_multi_logs": true,
            "file_path": "/tmp/test_logs/publish_multi_logs_*.log",
            "log_group_name": "test-log-rotation",
            "log_stream_name": "rotation-test-stream",
            "timezone": "UTC",
            "timestamp_format": "%Y-%m-%dT%H:%M:%S",
            "multi_line_start_pattern": "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}",
            "retention_in_days": 1
          },

For this config, the agent will periodically look for log files that match the /tmp/test_logs/publish_multi_logs_*.log glob pattern. For each one that it finds, it will write the contents of that file to test-log-rotation/rotiatest-test-stream-<logfilename> loggroup/logstream. So each file it finds will create one new source object and one new destination object.

It's important to note that there may be several source objects referencing one destination. For example, a customer could use the following in their collect list to collect logs from different files but push to the same destination:

          {
            "file_path": "/tmp/test_logs/shared_destination.txt",
            "log_group_name": "test-log-rotation",
            "log_stream_name": "shared-destination-stream",
            "timezone": "UTC",
            "timestamp_format": "%Y-%m-%dT%H:%M:%S",
            "multi_line_start_pattern": "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}",
            "retention_in_days": 1
          },
          {
            "file_path": "/tmp/test_logs/shared_destination_to_close.txt",
            "log_group_name": "test-log-rotation",
            "log_stream_name": "shared-destination-stream",
            "timezone": "UTC",
            "timestamp_format": "%Y-%m-%dT%H:%M:%S",
            "multi_line_start_pattern": "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}",
            "retention_in_days": 1
          }

Duplicate cloudwatch logs clients

Each time a destination object is created, a new cloudwatch logs client is created. Request/response handlers are injected into the client to collect agent health metrics. These handlers have more underlying clients which have their own goroutines and caches. The underlying cache is a way to associate request data with response data, like Payload size and latency, which are eventually forwarded to the agent health metrics extension. The handlers are created by a middleware configurer. Each time a new cloudwatch logs client is created, a new middleware configurer is created, which creates new request/response handlers and spins up more goroutines. These clients have no way to be closed so the goroutines are never closed and keep piling up.

Description of changes

There are two sets of changes to address the two issues noted above.

Reference Counting Destinations

Add reference counting to the destination objects and a notification system for the source objects to tell the destinations it's no longer being used. When the destination object is no longer used, it stops itself and tells the CloudWatchLogs plugin it's no longer usable.

This implementation assumes that nothing other than the source objects are using the destination objects. There is some vestigial code left over from when the CloudWatchLogs telegraf plugin was used to process EMF metrics which could use the destination objects, but that functionality has been moved to the OTel awsemfexporter plugin. So that code path is more or less unreachable. I removed all functionality in the unused code path to make it easier to make cwDest thread-safe.

As mentioned previously, it is important to note that it is possible for multiple source objects to reference one destination. This possibility of sharing means source objects cannot close the destinations directly and a signaling mechanism is necessary.

Using the above JSON as an example, the agent will create two sources for each of the log files and one destination object for the test-log-rotation/shared-destination-stream loggroup/logstream. When the file shared_destination_to_close.txt is closed, the source object will notify the destination object that the source has closed and it's no longer being referenced. The destination will remain open since it knows the shared_destination.txt is still using it. If shared_destination.txt is closed, then the destination will then know it's no longer being used and will close itself, terminating the associated goroutines.

Single Middleware Configurer

Create one middleware configurer (per CloudWatchLogs instance, but it's a singleton, so there's only ever one configurer). These request/response handlers are all identical, so there's really no need to create new ones, nor is there a need to create a new middleware configurer.

The one side effect is that destination objects will share request/response handlers, which means there is only one cache to support all request/responses. There are no concurrency concerns here as the request/response handlers already support concurrent request/responses.

License

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Tests

Integration Tests

https://github.com/aws/amazon-cloudwatch-agent/actions/runs/17582601756

Manual Tests

Configure the agent with publish_multi_logs flag. Compare memory usage over time with baseline agent. With the baseline agent, the memory usage when from ~7% to ~13% in 7 hours. With the modified agent, there is no noticeable increase in memory usage after 3 days.

Additionally, tested reading logs from multiple files and writing to the same destination and then closing one of the log files. This tests that when the source log file is closed, the associated destination isn't closed until all sources are closed.

Baseline Agent

image

Modified Agent

image

Configure agent with the following configuration:

{
  "agent": {"debug":true},
  "logs": {
    "logs_collected": {
      "files": {
        "collect_list": [
          {
            "publish_multi_logs": true,
            "file_path": "/tmp/test_logs/publish_multi_logs_*.log",
            "log_group_name": "test-log-rotation",
            "log_stream_name": "rotation-test-stream",
            "timezone": "UTC",
            "timestamp_format": "%Y-%m-%dT%H:%M:%S",
            "multi_line_start_pattern": "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}",
            "retention_in_days": 1
          },
          {
            "file_path": "/tmp/test_logs/shared_destination.txt",
            "log_group_name": "test-log-rotation",
            "log_stream_name": "shared-destination-stream",
            "timezone": "UTC",
            "timestamp_format": "%Y-%m-%dT%H:%M:%S",
            "multi_line_start_pattern": "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}",
            "retention_in_days": 1
          },
            {
            "file_path": "/tmp/test_logs/shared_destination_to_close.txt",
            "log_group_name": "test-log-rotation",
            "log_stream_name": "shared-destination-stream",
            "timezone": "UTC",
            "timestamp_format": "%Y-%m-%dT%H:%M:%S",
            "multi_line_start_pattern": "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}",
            "retention_in_days": 1
          }
        ]
      }
    }
  }

Use the following python script to generate log files that match the configuration:

#!/usr/bin/env python3

import time
import os
from datetime import datetime
from threading import Thread

log_dir = "/tmp/test_logs"

def generate_shared_destination_logs():
    os.makedirs(log_dir, exist_ok=True)
    
    shared_file = open(f"{log_dir}/shared_destination.txt", 'w')
    shared_to_close_file = open(f"{log_dir}/shared_destination_to_close.txt", 'w')
    
    try:
        start_time = time.time()
        line_counter = 1
        
        while True:
            current_time = time.time()
            elapsed = current_time - start_time
            
            log_line = f"{datetime.now().isoformat()} INFO Processing request {line_counter}\n"
            
            # Write to shared_destination throughout
            shared_file.write(log_line)
            shared_file.flush()
            
            # Write to shared_destination_to_close only for first 20 seconds
            if elapsed < 20:
                shared_to_close_file.write(log_line)
                shared_to_close_file.flush()
            elif elapsed >= 20 and shared_to_close_file:
                shared_to_close_file.close()
                shared_to_close_file = None
                print("Closed shared_destination_to_close.txt after 20 seconds")
            
            line_counter += 1
            time.sleep(0.5)
            
    except KeyboardInterrupt:
        print("\nStopping log generation...")
    finally:
        shared_file.close()
        if shared_to_close_file:
            shared_to_close_file.close()


def generate_publish_multi_logs():
    log_dir = "/tmp/test_logs"
    os.makedirs(log_dir, exist_ok=True)

    file_counter = 1
    current_file = None

    try:
        while True:
            # Close previous file and open new one
            if current_file:
               # current_file.close()
                os.remove(current_file.name)

            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"{log_dir}/publish_multi_logs_{timestamp}_{file_counter:03d}.log"
            current_file = open(filename, 'w')
            print(f"Created new log file: {filename}")

            # Write logs for 10 seconds
            start_time = time.time()
            line_counter = 1

            while time.time() - start_time < 10:
                log_line = f"{datetime.now().isoformat()} INFO [app_{file_counter:03d}] Processing request {line_counter}\n"
                current_file.write(log_line)
                current_file.flush()
                line_counter += 1
                time.sleep(0.5)  # Write every 500ms

            file_counter += 1

    except KeyboardInterrupt:
        print("\nStopping log generation...")
    finally:
        if current_file:
            current_file.close()

if __name__ == "__main__":
    multi_logs_thread = Thread(target = generate_publish_multi_logs, args = ())
    shared_destination_thread = Thread(target = generate_shared_destination_logs, args = ())
    multi_logs_thread.start()
    shared_destination_thread.start()

    multi_logs_thread.join()
    shared_destination_thread.join()

Requirements

Before commiting your code, please do the following steps.

  1. Run make fmt and make fmt-sh
  2. Run make lint

Integration Tests

To run integration tests against this PR, add the ready for testing label.

@dricross dricross marked this pull request as ready for review September 5, 2025 14:40
@dricross dricross requested a review from a team as a code owner September 5, 2025 14:40
for e := range eventsCh {
err := dest.Publish([]LogEvent{e})
if err == ErrOutputStopped {
log.Printf("I! [logagent] Log destination %v has stopped, finalizing %v/%v", l.destNames[dest], src.Group(), src.Stream())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was actually a concurrent map write condition here. This non-sync map is written to in the Run routine and is potentially read from the runSrcToDest goroutine.

Comment on lines -251 to -255
destination := fileconfig.Destination
if destination == "" {
destination = t.Destination
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is unused

mergeChan := make(chan logs.LogEvent)

// Merge events from both blocking and non-blocking channel
go func() {
Copy link
Contributor Author

@dricross dricross Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to a named function just to make the pprof output a little nicer. Since this is an unnamed function, the name of the function in the output is a bit obfuscated start.func1+0xcf:

github.com/aws/amazon-cloudwatch-agent/plugins/outputs/cloudwatchlogs/internal/pusher.(*queue).start.func1+0xcf github.com/aws/amazon-cloudwatch-agent/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go:117

Still findable from the line number, just not super obvious

@dricross dricross force-pushed the dricross/logfiletailerdebug branch from 52f58b5 to 2114e27 Compare September 8, 2025 11:56
@dricross dricross force-pushed the dricross/logfiletailerdebug branch from 2114e27 to a2c0747 Compare September 8, 2025 12:06
Comment on lines 107 to +109
func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error {
for _, m := range metrics {
c.writeMetricAsStructuredLog(m)
}
return nil
// we no longer expect this to be used. We now use the OTel awsemfexporter for sending EMF metrics to CloudWatch Logs
return fmt.Errorf("unexpected call to Write")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make cwDest thread-safe more easily, I removed this functionality. This function is here to adhere to the Telegraf interface, but it is no longer used. This functionality was for pushing EMF metrics to CloudWatch Logs using this telegraf-based output plugin. However, the agent no longer uses this plugin to push EMF metrics. EMF metrics now go through the OTel awsemfexporter plugin.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're removing this function, should we also remove cwDest.switchToEMF

return client
}

func (c *CloudWatchLogs) writeMetricAsStructuredLog(m telegraf.Metric) {
Copy link
Contributor Author

@dricross dricross Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only used by Write which is no longer used

return "Configuration for AWS CloudWatchLogs output."
}

func (c *CloudWatchLogs) getLogEventFromMetric(metric telegraf.Metric) *structuredLogEvent {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only used by Write which is no longer used

## Amazon REGION
region = "us-east-1"
type structuredLogEvent struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only used by Write which is no longer used

}

// Description returns a one-sentence description on the Output
func (c *CloudWatchLogs) Description() string {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved these two CloudWatchLogs functions to the CloudWatchLogs block (above cwDest). Didn't make much sense to define these these functions separately from the rest.

@duhminick
Copy link
Contributor

Also, when possible, can you run the integ tests?

@dricross dricross added the ready for testing Indicates this PR is ready for integration tests to run label Sep 8, 2025
@dricross
Copy link
Contributor Author

dricross commented Sep 8, 2025

Also, when possible, can you run the integ tests?

Was having trouble with the log tailer unit test on Windows again... Eventually worked after a couple of retries. Added the tag and kicked off integ tests.

Copy link
Contributor

This PR was marked stale due to lack of activity.

@github-actions github-actions bot added the Stale label Sep 18, 2025
Comment on lines 107 to +109
func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error {
for _, m := range metrics {
c.writeMetricAsStructuredLog(m)
}
return nil
// we no longer expect this to be used. We now use the OTel awsemfexporter for sending EMF metrics to CloudWatch Logs
return fmt.Errorf("unexpected call to Write")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're removing this function, should we also remove cwDest.switchToEMF

}

// eventsCh has been closed meaning the src has been stopped
dest.NotifySourceStopped()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this use a defer after the defer src.Stop() at the start of the function? What if the eventsCh didn't close, but the runToDest function exits? Wouldn't that result in an inconsistent refCount and prevent it from ever closing the pusher?

wg *sync.WaitGroup
}

var _ (Queue) = (*queue)(nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Not necessary because the constructor should already enforce this.

}

func (s *senderPool) Stop() {
// workerpool is stopped by the plugin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this still stop the underlying sender? I think if this doesn't, then nothing else will.

}

func (m *mockSender) Stop() {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think it'd be worth asserting this is called when the pusher is stopped in one of the tests.

Comment on lines +141 to +147
func (s *sender) Stop() {
if s.stopped {
return
}
close(s.stopCh)
s.stopped = true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not thread-safe, but likely not an issue since the sender should only be stopped from a single thread.

@github-actions github-actions bot removed the Stale label Oct 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ready for testing Indicates this PR is ready for integration tests to run
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants