Skip to content

Commit 52f58b5

Browse files
committed
Locking cwDest on Publish. Remove vestigial functionality
1 parent 03c9660 commit 52f58b5

File tree

1 file changed

+54
-182
lines changed

1 file changed

+54
-182
lines changed

plugins/outputs/cloudwatchlogs/cloudwatchlogs.go

Lines changed: 54 additions & 182 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package cloudwatchlogs
55

66
import (
7-
"encoding/json"
87
"fmt"
98
"regexp"
109
"strings"
@@ -28,7 +27,6 @@ import (
2827
"github.com/aws/amazon-cloudwatch-agent/logs"
2928
"github.com/aws/amazon-cloudwatch-agent/plugins/outputs/cloudwatchlogs/internal/pusher"
3029
"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
31-
"github.com/aws/amazon-cloudwatch-agent/tool/util"
3230
)
3331

3432
const (
@@ -40,10 +38,7 @@ const (
4038

4139
defaultFlushTimeout = 5 * time.Second
4240

43-
maxRetryTimeout = 14*24*time.Hour + 10*time.Minute
44-
metricRetryTimeout = 2 * time.Minute
45-
46-
attributesInFields = "attributesInFields"
41+
maxRetryTimeout = 14*24*time.Hour + 10*time.Minute
4742
)
4843

4944
var (
@@ -95,7 +90,7 @@ func (c *CloudWatchLogs) Close() error {
9590

9691
c.cwDests.Range(func(_, value interface{}) bool {
9792
if d, ok := value.(*cwDest); ok {
98-
d.stop()
93+
d.Stop()
9994
}
10095
return true
10196
})
@@ -110,10 +105,8 @@ func (c *CloudWatchLogs) Close() error {
110105
}
111106

112107
func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error {
113-
for _, m := range metrics {
114-
c.writeMetricAsStructuredLog(m)
115-
}
116-
return nil
108+
// we no longer expect this to be used. We now use the OTel awsemfexporter for sending EMF metrics to CloudWatch Logs
109+
return fmt.Errorf("unexpected call to Write")
117110
}
118111

119112
func (c *CloudWatchLogs) CreateDest(group, stream string, retention int, logGroupClass string, logSrc logs.LogSrc) logs.LogDest {
@@ -161,9 +154,14 @@ func (c *CloudWatchLogs) getDest(t pusher.Target, logSrc logs.LogSrc) *cwDest {
161154
c.targetManager = pusher.NewTargetManager(c.Log, client)
162155
})
163156
p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, &c.pusherWaitGroup)
164-
cwd := &cwDest{pusher: p, retryer: logThrottleRetryer, onStopFunc: func() {
165-
c.cwDests.Delete(t)
166-
}}
157+
cwd := &cwDest{
158+
pusher: p,
159+
retryer: logThrottleRetryer,
160+
refCount: 1,
161+
onStopFunc: func() {
162+
c.cwDests.Delete(t)
163+
},
164+
}
167165
c.cwDests.Store(t, cwd)
168166
return cwd
169167
}
@@ -201,136 +199,44 @@ func (c *CloudWatchLogs) createClient(retryer aws.RequestRetryer) *cloudwatchlog
201199
return client
202200
}
203201

204-
func (c *CloudWatchLogs) writeMetricAsStructuredLog(m telegraf.Metric) {
205-
t, err := c.getTargetFromMetric(m)
206-
if err != nil {
207-
c.Log.Errorf("Failed to find target: %v", err)
208-
}
209-
cwd := c.getDest(t, nil)
210-
if cwd == nil {
211-
c.Log.Warnf("unable to find log destination, group: %v, stream: %v", t.Group, t.Stream)
212-
return
213-
}
214-
cwd.switchToEMF()
215-
cwd.pusher.Sender.SetRetryDuration(metricRetryTimeout)
216-
217-
e := c.getLogEventFromMetric(m)
218-
if e == nil {
219-
return
220-
}
221-
222-
cwd.AddEvent(e)
223-
}
224-
225-
func (c *CloudWatchLogs) getTargetFromMetric(m telegraf.Metric) (pusher.Target, error) {
226-
tags := m.Tags()
227-
logGroup, ok := tags[LogGroupNameTag]
228-
if !ok {
229-
return pusher.Target{}, fmt.Errorf("structuredlog receive a metric with name '%v' without log group name", m.Name())
230-
} else {
231-
m.RemoveTag(LogGroupNameTag)
232-
}
233-
234-
logStream, ok := tags[LogStreamNameTag]
235-
if ok {
236-
m.RemoveTag(LogStreamNameTag)
237-
} else if logStream == "" {
238-
logStream = c.LogStreamName
239-
}
240-
241-
return pusher.Target{Group: logGroup, Stream: logStream, Class: util.StandardLogGroupClass, Retention: -1}, nil
202+
// Description returns a one-sentence description on the Output
203+
func (c *CloudWatchLogs) Description() string {
204+
return "Configuration for AWS CloudWatchLogs output."
242205
}
243206

244-
func (c *CloudWatchLogs) getLogEventFromMetric(metric telegraf.Metric) *structuredLogEvent {
245-
var message string
246-
if metric.HasField(LogEntryField) {
247-
var ok bool
248-
if message, ok = metric.Fields()[LogEntryField].(string); !ok {
249-
c.Log.Warnf("The log entry value field is not string type: %v", metric.Fields())
250-
return nil
251-
}
252-
} else {
253-
content := map[string]interface{}{}
254-
tags := metric.Tags()
255-
// build all the attributesInFields
256-
if val, ok := tags[attributesInFields]; ok {
257-
attributes := strings.Split(val, ",")
258-
mFields := metric.Fields()
259-
for _, attr := range attributes {
260-
if fieldVal, ok := mFields[attr]; ok {
261-
content[attr] = fieldVal
262-
metric.RemoveField(attr)
263-
}
264-
}
265-
metric.RemoveTag(attributesInFields)
266-
delete(tags, attributesInFields)
267-
}
268-
269-
// build remaining attributes
270-
for k := range tags {
271-
content[k] = tags[k]
272-
}
273-
274-
for k, v := range metric.Fields() {
275-
var value interface{}
276-
277-
switch t := v.(type) {
278-
case int:
279-
value = float64(t)
280-
case int32:
281-
value = float64(t)
282-
case int64:
283-
value = float64(t)
284-
case uint:
285-
value = float64(t)
286-
case uint32:
287-
value = float64(t)
288-
case uint64:
289-
value = float64(t)
290-
case float64:
291-
value = t
292-
case bool:
293-
value = t
294-
case string:
295-
value = t
296-
case time.Time:
297-
value = float64(t.Unix())
298-
299-
default:
300-
c.Log.Errorf("Detected unexpected fields (%s,%v) when encoding structured log event, value type %T is not supported", k, v, v)
301-
return nil
302-
}
303-
content[k] = value
304-
}
305-
306-
jsonMap, err := json.Marshal(content)
307-
if err != nil {
308-
c.Log.Errorf("Unalbe to marshal structured log content: %v", err)
309-
}
310-
message = string(jsonMap)
311-
}
312-
313-
return &structuredLogEvent{
314-
msg: message,
315-
t: metric.Time(),
316-
}
317-
}
207+
var sampleConfig = `
208+
## Amazon REGION
209+
region = "us-east-1"
318210
319-
type structuredLogEvent struct {
320-
msg string
321-
t time.Time
322-
}
211+
## Amazon Credentials
212+
## Credentials are loaded in the following order
213+
## 1) Assumed credentials via STS if role_arn is specified
214+
## 2) explicit credentials from 'access_key' and 'secret_key'
215+
## 3) shared profile from 'profile'
216+
## 4) environment variables
217+
## 5) shared credentials file
218+
## 6) EC2 Instance Profile
219+
#access_key = ""
220+
#secret_key = ""
221+
#token = ""
222+
#role_arn = ""
223+
#profile = ""
224+
#shared_credential_file = ""
323225
324-
func (e *structuredLogEvent) Message() string {
325-
return e.msg
326-
}
226+
# The log stream name.
227+
log_stream_name = "<log_stream_name>"
228+
`
327229

328-
func (e *structuredLogEvent) Time() time.Time {
329-
return e.t
230+
// SampleConfig returns the default configuration of the Output
231+
func (c *CloudWatchLogs) SampleConfig() string {
232+
return sampleConfig
330233
}
331234

332-
func (e *structuredLogEvent) Done() {}
333-
235+
// cwDest is responsible for publishing logs from log files to a log group + log stream.
236+
// Logs from more than one log file may be published to the same destination. cwDest closes
237+
// itself when all log file tailers which referenced this cwDest are closed.
238+
// All exported functions should practice thread-safety by acquiring lock the cwDest
239+
// and not calling any other function which requires the lock.
334240
type cwDest struct {
335241
pusher *pusher.Pusher
336242
sync.Mutex
@@ -349,8 +255,10 @@ type cwDest struct {
349255
var _ logs.LogDest = (*cwDest)(nil)
350256

351257
func (cd *cwDest) Publish(events []logs.LogEvent) error {
258+
cd.Lock()
259+
defer cd.Unlock()
352260
if cd.stopped {
353-
return fmt.Errorf("cannot publish events: destination has been stopped")
261+
return logs.ErrOutputStopped
354262
}
355263
for _, e := range events {
356264
if !cd.isEMF {
@@ -359,10 +267,7 @@ func (cd *cwDest) Publish(events []logs.LogEvent) error {
359267
cd.switchToEMF()
360268
}
361269
}
362-
cd.AddEvent(e)
363-
}
364-
if cd.stopped {
365-
return logs.ErrOutputStopped
270+
cd.addEvent(e)
366271
}
367272
return nil
368273
}
@@ -376,6 +281,12 @@ func (cd *cwDest) NotifySourceStopped() {
376281
}
377282
}
378283

284+
func (cd *cwDest) Stop() {
285+
cd.Lock()
286+
defer cd.Unlock()
287+
cd.stop()
288+
}
289+
379290
func (cd *cwDest) stop() {
380291
if cd.stopped {
381292
return
@@ -388,11 +299,7 @@ func (cd *cwDest) stop() {
388299
}
389300
}
390301

391-
func (cd *cwDest) AddEvent(e logs.LogEvent) {
392-
if cd.stopped {
393-
// cannot add event, destination has been stopped
394-
return
395-
}
302+
func (cd *cwDest) addEvent(e logs.LogEvent) {
396303
// Drop events for metric path logs when queue is full
397304
if cd.isEMF {
398305
cd.pusher.AddEventNonBlocking(e)
@@ -402,8 +309,6 @@ func (cd *cwDest) AddEvent(e logs.LogEvent) {
402309
}
403310

404311
func (cd *cwDest) switchToEMF() {
405-
cd.Lock()
406-
defer cd.Unlock()
407312
if !cd.isEMF {
408313
cd.isEMF = true
409314
cwl, ok := cd.pusher.Service.(*cloudwatchlogs.CloudWatchLogs)
@@ -413,39 +318,6 @@ func (cd *cwDest) switchToEMF() {
413318
}
414319
}
415320

416-
// Description returns a one-sentence description on the Output
417-
func (c *CloudWatchLogs) Description() string {
418-
return "Configuration for AWS CloudWatchLogs output."
419-
}
420-
421-
var sampleConfig = `
422-
## Amazon REGION
423-
region = "us-east-1"
424-
425-
## Amazon Credentials
426-
## Credentials are loaded in the following order
427-
## 1) Assumed credentials via STS if role_arn is specified
428-
## 2) explicit credentials from 'access_key' and 'secret_key'
429-
## 3) shared profile from 'profile'
430-
## 4) environment variables
431-
## 5) shared credentials file
432-
## 6) EC2 Instance Profile
433-
#access_key = ""
434-
#secret_key = ""
435-
#token = ""
436-
#role_arn = ""
437-
#profile = ""
438-
#shared_credential_file = ""
439-
440-
# The log stream name.
441-
log_stream_name = "<log_stream_name>"
442-
`
443-
444-
// SampleConfig returns the default configuration of the Output
445-
func (c *CloudWatchLogs) SampleConfig() string {
446-
return sampleConfig
447-
}
448-
449321
func init() {
450322
outputs.Add("cloudwatchlogs", func() telegraf.Output {
451323
return &CloudWatchLogs{

0 commit comments

Comments
 (0)