Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions collector/internal/telemetryapi/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ func setupListener(t *testing.T) (*Listener, string) {

address, err := listener.Start()
require.NoError(t, err)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of t.Cleanup() appeared to trigger a race condition using the race detector with zap.Logger switching to using defer appears to fix the issue.

t.Cleanup(func() {
listener.Shutdown()
})

return listener, address
}

Expand Down Expand Up @@ -183,6 +178,7 @@ func TestListenOnAddress(t *testing.T) {

func TestListener_StartAndShutdown(t *testing.T) {
listener, address := setupListener(t)
defer listener.Shutdown()
require.NotEqual(t, address, "", "Start() should not return an empty address")
require.True(t, strings.HasPrefix(address, "http://"), "Address should start with http://")
require.NotNil(t, listener.httpServer, "httpServer should not be nil")
Expand Down Expand Up @@ -241,6 +237,7 @@ func TestListener_httpHandler(t *testing.T) {
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
listener, address := setupListener(t)
defer listener.Shutdown()
submitEvents(t, address, test.events)
require.EventuallyWithT(t, func(c *assert.CollectT) {
require.Equal(c, test.expectedCount, listener.queue.Len())
Expand Down Expand Up @@ -302,6 +299,7 @@ func TestListener_Wait_Success(t *testing.T) {
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
listener, address := setupListener(t)
defer listener.Shutdown()

waitDone := make(chan error, 1)
go func() {
Expand Down
3 changes: 2 additions & 1 deletion collector/lambdacomponents/receiver/telemetryapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
package receiver

import (
"github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver"
"go.opentelemetry.io/collector/receiver"

"github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver"
)

func init() {
Expand Down
2 changes: 1 addition & 1 deletion collector/receiver/telemetryapireceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Config struct {
extensionID string
Port int `mapstructure:"port"`
Types []string `mapstructure:"types"`
LogReport bool `mapstructure:"log_report"`
LogReport *bool `mapstructure:"log_report"`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switch to pointer to allow default value to be true

}

// Validate validates the configuration by checking for missing or invalid fields
Expand Down
158 changes: 77 additions & 81 deletions collector/receiver/telemetryapireceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,100 +188,103 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ
slice = nil
}

func (r *telemetryAPIReceiver) getRecordRequestId(record map[string]interface{}) string {
if requestId, ok := record["requestId"].(string); ok {
return requestId
} else if r.currentFaasInvocationID != "" {
return r.currentFaasInvocationID
}
return ""
}

func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) {
log := plog.NewLogs()
resourceLog := log.ResourceLogs().AppendEmpty()
r.resource.CopyTo(resourceLog.Resource())
scopeLog := resourceLog.ScopeLogs().AppendEmpty()
scopeLog.Scope().SetName(scopeName)
for _, el := range slice {
if !r.logReport && el.Type == string(telemetryapi.PlatformReport) {
continue
}
r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el))
if el.Type == string(telemetryapi.Function) || el.Type == string(telemetryapi.Extension) {
logRecord := scopeLog.LogRecords().AppendEmpty()
logRecord.Attributes().PutStr("type", el.Type)
if t, err := time.Parse(time.RFC3339, el.Time); err == nil {
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t))
logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now()))
} else {
r.logger.Error("error parsing time", zap.Error(err))
return plog.Logs{}, err
}
if record, ok := el.Record.(map[string]interface{}); ok {
// in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function
if timestamp, ok := record["timestamp"].(string); ok {
if t, err := time.Parse(time.RFC3339, timestamp); err == nil {
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t))
} else {
r.logger.Error("error parsing time", zap.Error(err))
return plog.Logs{}, err
}
}
if level, ok := record["level"].(string); ok {
logRecord.SetSeverityNumber(severityTextToNumber(strings.ToUpper(level)))
logRecord.SetSeverityText(logRecord.SeverityNumber().String())
}
if requestId, ok := record["requestId"].(string); ok {
logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId)
} else if r.currentFaasInvocationID != "" {
logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, r.currentFaasInvocationID)
}
if line, ok := record["message"].(string); ok {
logRecord.Body().SetStr(line)
}
} else {
if r.currentFaasInvocationID != "" {
logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, r.currentFaasInvocationID)
}
// in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function
if line, ok := el.Record.(string); ok {
logRecord.Body().SetStr(line)
logRecord := scopeLog.LogRecords().AppendEmpty()
logRecord.Attributes().PutStr("type", el.Type)
if t, err := time.Parse(time.RFC3339, el.Time); err == nil {
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t))
logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now()))
} else {
r.logger.Error("error parsing time", zap.Error(err))
return plog.Logs{}, err
}
if record, ok := el.Record.(map[string]interface{}); ok {
requestId := r.getRecordRequestId(record)
if requestId != "" {
logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId)

// If this is the first event in the invocation with a request id (i.e. the "platform.start" event),
// set the current invocation id to this request id.
if el.Type == string(telemetryapi.PlatformStart) {
r.currentFaasInvocationID = requestId
}
}
} else { // platform events, if subscribed to
if el.Type == string(telemetryapi.PlatformStart) {
if record, ok := el.Record.(map[string]interface{}); ok {
if requestId, ok := record["requestId"].(string); ok {
r.currentFaasInvocationID = requestId
}

// in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function
if timestamp, ok := record["timestamp"].(string); ok {
if t, err := time.Parse(time.RFC3339, timestamp); err == nil {
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t))
} else {
r.logger.Error("error parsing time", zap.Error(err))
return plog.Logs{}, err
}
} else if el.Type == string(telemetryapi.PlatformRuntimeDone) {
r.currentFaasInvocationID = ""
} else if el.Type == string(telemetryapi.PlatformReport) && r.logReport {
if record, ok := el.Record.(map[string]interface{}); ok {
if logRecord := createReportLogRecord(&scopeLog, record); logRecord != nil {
logRecord.Attributes().PutStr("type", el.Type)
if t, err := time.Parse(time.RFC3339, el.Time); err == nil {
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t))
logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now()))
}
}
}
if level, ok := record["level"].(string); ok {
logRecord.SetSeverityNumber(severityTextToNumber(strings.ToUpper(level)))
logRecord.SetSeverityText(logRecord.SeverityNumber().String())
}

if el.Type == string(telemetryapi.PlatformReport) {
platformReportMessage := createPlatformReportMessage(requestId, record)
if platformReportMessage != "" {
logRecord.Body().SetStr(platformReportMessage)
}
} else if line, ok := record["message"].(string); ok {
logRecord.Body().SetStr(line)
}
} else {
if r.currentFaasInvocationID != "" {
logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, r.currentFaasInvocationID)
}
// in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function
if line, ok := el.Record.(string); ok {
logRecord.Body().SetStr(line)
}
}
if el.Type == string(telemetryapi.PlatformRuntimeDone) {
r.currentFaasInvocationID = ""
}
}
return log, nil
}

// createReportLogRecord creates a log record for the platform.report event
// returns the log record if successful, otherwise nil
func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface{}) *plog.LogRecord {
func createPlatformReportMessage(requestId string, record map[string]interface{}) string {
// gathering metrics
metrics, ok := record["metrics"].(map[string]interface{})
if !ok {
return nil
return ""
}
var durationMs, billedDurationMs, memorySizeMB, maxMemoryUsedMB float64
if durationMs, ok = metrics[string(telemetryapi.MetricDurationMs)].(float64); !ok {
return nil
return ""
}
if billedDurationMs, ok = metrics[string(telemetryapi.MetricBilledDurationMs)].(float64); !ok {
return nil
return ""
}
if memorySizeMB, ok = metrics[string(telemetryapi.MetricMemorySizeMB)].(float64); !ok {
return nil
return ""
}
if maxMemoryUsedMB, ok = metrics[string(telemetryapi.MetricMaxMemoryUsedMB)].(float64); !ok {
return nil
return ""
}

// optionally gather information about cold start time
Expand All @@ -292,18 +295,7 @@ func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface
}
}

// gathering requestId
requestId := ""
if requestId, ok = record["requestId"].(string); !ok {
return nil
}

// we have all information available, we can create the log record
logRecord := scopeLog.LogRecords().AppendEmpty()
logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId)

// building the body of the log record, optionally adding the init duration
body := fmt.Sprintf(
message := fmt.Sprintf(
logReportFmt,
requestId,
durationMs,
Expand All @@ -312,11 +304,10 @@ func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface
maxMemoryUsedMB,
)
if initDurationMs > 0 {
body += fmt.Sprintf(" Init Duration: %.2f ms", initDurationMs)
message += fmt.Sprintf(" Init Duration: %.2f ms", initDurationMs)
}
logRecord.Body().SetStr(body)

return &logRecord
return message
}

func severityTextToNumber(severityText string) plog.SeverityNumber {
Expand Down Expand Up @@ -418,7 +409,7 @@ func newTelemetryAPIReceiver(
}
}

subscribedTypes := []telemetryapi.EventType{}
var subscribedTypes []telemetryapi.EventType
for _, val := range cfg.Types {
switch val {
case "platform":
Expand All @@ -430,14 +421,19 @@ func newTelemetryAPIReceiver(
}
}

logReport := true
if cfg.LogReport != nil {
logReport = *cfg.LogReport
}

return &telemetryAPIReceiver{
logger: set.Logger,
queue: queue.New(initialQueueSize),
extensionID: cfg.extensionID,
port: cfg.Port,
types: subscribedTypes,
resource: r,
logReport: cfg.LogReport,
logReport: logReport,
}, nil
}

Expand Down
Loading
Loading