From 017039d4b043dcdf5d7999d74de0e34fc80ce82d Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Wed, 1 Dec 2021 20:46:02 +0800 Subject: [PATCH] otlp: Add experimental support to receive logs Adds support to receive OTLP logs on the GRPC receiver. This feature is _beta_ and should be used sparsely until the OTel spec declares it as stable. Currently, it's challenging to test this in the system tests since the `opentelemetry-go` SDK doesn't expose any APIs for logs. Signed-off-by: Marc Lopez Rubio --- beater/otlp/grpc.go | 8 + beater/otlp/grpc_test.go | 47 ++++++ model/apmevent.go | 5 +- model/error.go | 4 +- model/error_test.go | 6 +- model/event.go | 10 ++ model/log.go | 16 ++ model/modeldecoder/rumv3/decoder.go | 2 +- model/modeldecoder/rumv3/metadata_test.go | 1 + model/modeldecoder/v2/decoder.go | 2 +- model/modeldecoder/v2/metadata_test.go | 4 + model/modelprocessor/culprit_test.go | 10 +- model/modelprocessor/errormessage_test.go | 6 +- .../excludefromgrouping_test.go | 4 +- model/modelprocessor/groupingkey_test.go | 10 +- model/modelprocessor/libraryframe_test.go | 4 +- processor/otel/logs.go | 133 ++++++++++++++++ processor/otel/logs_test.go | 150 ++++++++++++++++++ processor/otel/traces.go | 2 +- sourcemap/processor_test.go | 4 +- 20 files changed, 398 insertions(+), 30 deletions(-) create mode 100644 processor/otel/logs.go create mode 100644 processor/otel/logs_test.go diff --git a/beater/otlp/grpc.go b/beater/otlp/grpc.go index 4f12ec9f85..8a59fd7a7d 100644 --- a/beater/otlp/grpc.go +++ b/beater/otlp/grpc.go @@ -44,18 +44,22 @@ var ( gRPCMetricsMonitoringMap = request.MonitoringMapForRegistry(gRPCMetricsRegistry, monitoringKeys) gRPCTracesRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.traces") gRPCTracesMonitoringMap = request.MonitoringMapForRegistry(gRPCTracesRegistry, monitoringKeys) + gRPCLogsRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.logs") + gRPCLogsMonitoringMap = request.MonitoringMapForRegistry(gRPCLogsRegistry, monitoringKeys) // RegistryMonitoringMaps provides mappings from the fully qualified gRPC // method name to its respective monitoring map. RegistryMonitoringMaps = map[string]map[request.ResultID]*monitoring.Int{ metricsFullMethod: gRPCMetricsMonitoringMap, tracesFullMethod: gRPCTracesMonitoringMap, + logsFullMethod: gRPCLogsMonitoringMap, } ) const ( metricsFullMethod = "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export" tracesFullMethod = "/opentelemetry.proto.collector.trace.v1.TraceService/Export" + logsFullMethod = "/opentelemetry.proto.collector.logs.v1.LogsService/Export" ) func init() { @@ -68,6 +72,7 @@ func MethodAuthenticators(authenticator *auth.Authenticator) map[string]intercep return map[string]interceptors.MethodAuthenticator{ metricsFullMethod: metadataMethodAuthenticator, tracesFullMethod: metadataMethodAuthenticator, + logsFullMethod: metadataMethodAuthenticator, } } @@ -86,6 +91,9 @@ func RegisterGRPCServices(grpcServer *grpc.Server, processor model.BatchProcesso if err := otlpreceiver.RegisterMetricsReceiver(context.Background(), consumer, grpcServer); err != nil { return errors.Wrap(err, "failed to register OTLP metrics receiver") } + if err := otlpreceiver.RegisterLogsReceiver(context.Background(), consumer, grpcServer); err != nil { + return errors.Wrap(err, "failed to register OTLP logs receiver") + } return nil } diff --git a/beater/otlp/grpc_test.go b/beater/otlp/grpc_test.go index 917a7f76c3..c1c2edf9fd 100644 --- a/beater/otlp/grpc_test.go +++ b/beater/otlp/grpc_test.go @@ -133,6 +133,53 @@ func TestConsumeMetrics(t *testing.T) { }, actual) } +func TestConsumeLogs(t *testing.T) { + var batches []model.Batch + var reportError error + var batchProcessor model.ProcessBatchFunc = func(ctx context.Context, batch *model.Batch) error { + batches = append(batches, *batch) + return reportError + } + + conn := newServer(t, batchProcessor) + client := otlpgrpc.NewLogsClient(conn) + + // Send a minimal log to verify that everything is connected properly. + // + // We intentionally do not check the published event contents; those are + // tested in processor/otel. + logs := pdata.NewLogs() + log := logs.ResourceLogs().AppendEmpty().InstrumentationLibraryLogs().AppendEmpty().Logs().AppendEmpty() + log.SetName("log_name") + + _, err := client.Export(context.Background(), logs) + assert.NoError(t, err) + require.Len(t, batches, 1) + + reportError = errors.New("failed to publish events") + _, err = client.Export(context.Background(), logs) + assert.Error(t, err) + errStatus := status.Convert(err) + assert.Equal(t, "failed to publish events", errStatus.Message()) + require.Len(t, batches, 2) + assert.Len(t, batches[0], 1) + assert.Len(t, batches[1], 1) + + actual := map[string]interface{}{} + monitoring.GetRegistry("apm-server.otlp.grpc.logs").Do(monitoring.Full, func(key string, value interface{}) { + actual[key] = value + }) + assert.Equal(t, map[string]interface{}{ + "request.count": int64(2), + "response.count": int64(2), + "response.errors.count": int64(1), + "response.valid.count": int64(1), + "response.errors.ratelimit": int64(0), + "response.errors.timeout": int64(0), + "response.errors.unauthorized": int64(0), + }, actual) +} + func newServer(t *testing.T, batchProcessor model.BatchProcessor) *grpc.ClientConn { lis, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) diff --git a/model/apmevent.go b/model/apmevent.go index da42ba8cbe..1daf83a99c 100644 --- a/model/apmevent.go +++ b/model/apmevent.go @@ -30,9 +30,6 @@ import ( // Exactly one of the event fields should be non-nil. type APMEvent struct { // DataStream optionally holds data stream identifiers. - // - // This will have the zero value when APM Server is run - // in standalone mode. DataStream DataStream ECSVersion string @@ -59,6 +56,7 @@ type APMEvent struct { Child Child HTTP HTTP FAAS FAAS + Log Log // Timestamp holds the event timestamp. // @@ -152,5 +150,6 @@ func (e *APMEvent) BeatEvent(ctx context.Context) beat.Event { fields.maybeSetString("message", e.Message) fields.maybeSetMapStr("http", e.HTTP.fields()) fields.maybeSetMapStr("faas", e.FAAS.fields()) + fields.maybeSetMapStr("log", e.Log.fields()) return event } diff --git a/model/error.go b/model/error.go index 5353959aac..7488342a67 100644 --- a/model/error.go +++ b/model/error.go @@ -38,7 +38,7 @@ type Error struct { Custom common.MapStr Exception *Exception - Log *Log + Log *ErrorLog } type Exception struct { @@ -52,7 +52,7 @@ type Exception struct { Cause []Exception } -type Log struct { +type ErrorLog struct { Message string Level string ParamMessage string diff --git a/model/error_test.go b/model/error_test.go index ce61188c33..f4cdb28ab4 100644 --- a/model/error_test.go +++ b/model/error_test.go @@ -36,8 +36,8 @@ func (e *Exception) withCode(code string) *Exception { return e } -func baseLog() *Log { - return &Log{Message: "error log message"} +func baseLog() *ErrorLog { + return &ErrorLog{Message: "error log message"} } func TestHandleExceptionTree(t *testing.T) { @@ -132,7 +132,7 @@ func TestEventFields(t *testing.T) { loggerName := "logger" logMsg := "error log message" paramMsg := "param message" - log := Log{ + log := ErrorLog{ Level: level, Message: logMsg, ParamMessage: paramMsg, diff --git a/model/event.go b/model/event.go index 376c7b623d..50ec54f6a3 100644 --- a/model/event.go +++ b/model/event.go @@ -36,10 +36,20 @@ type Event struct { // Outcome holds the event outcome: "success", "failure", or "unknown". Outcome string + + // Severity holds the numeric severity of the event for log events. + Severity int64 + + // Severity holds the action captured by the event for log events. + Action string } func (e *Event) fields() common.MapStr { var fields mapStr fields.maybeSetString("outcome", e.Outcome) + fields.maybeSetString("action", e.Action) + if e.Severity > 0 { + fields.set("severity", e.Severity) + } return common.MapStr(fields) } diff --git a/model/log.go b/model/log.go index 97f44c6a54..2985d92643 100644 --- a/model/log.go +++ b/model/log.go @@ -17,6 +17,8 @@ package model +import "github.com/elastic/beats/v7/libbeat/common" + const ( AppLogsDataset = "apm.app" ) @@ -25,3 +27,17 @@ var ( // LogProcessor is the Processor value that should be assigned to log events. LogProcessor = Processor{Name: "log", Event: "log"} ) + +// Log holds information about a log, as defined by ECS. +// +// https://www.elastic.co/guide/en/ecs/current/ecs-log.html +type Log struct { + // Level holds the log level of the log event. + Level string +} + +func (e Log) fields() common.MapStr { + var fields mapStr + fields.maybeSetString("level", e.Level) + return common.MapStr(fields) +} diff --git a/model/modeldecoder/rumv3/decoder.go b/model/modeldecoder/rumv3/decoder.go index b604cebad4..433051c2ed 100644 --- a/model/modeldecoder/rumv3/decoder.go +++ b/model/modeldecoder/rumv3/decoder.go @@ -210,7 +210,7 @@ func mapToErrorModel(from *errorEvent, event *model.APMEvent) { out.ID = from.ID.Val } if from.Log.IsSet() { - log := model.Log{} + log := model.ErrorLog{} if from.Log.Level.IsSet() { log.Level = from.Log.Level.Val } diff --git a/model/modeldecoder/rumv3/metadata_test.go b/model/modeldecoder/rumv3/metadata_test.go index e49cb4e448..ce65ed9ed4 100644 --- a/model/modeldecoder/rumv3/metadata_test.go +++ b/model/modeldecoder/rumv3/metadata_test.go @@ -82,6 +82,7 @@ func metadataExceptions(keys ...string) func(key string) bool { "Session", "Trace", "URL", + "Log", // Dedicated test for it. "NumericLabels", diff --git a/model/modeldecoder/v2/decoder.go b/model/modeldecoder/v2/decoder.go index 72847e2a93..3ff8ccc168 100644 --- a/model/modeldecoder/v2/decoder.go +++ b/model/modeldecoder/v2/decoder.go @@ -362,7 +362,7 @@ func mapToErrorModel(from *errorEvent, event *model.APMEvent) { out.ID = from.ID.Val } if from.Log.IsSet() { - log := model.Log{} + log := model.ErrorLog{} if from.Log.Level.IsSet() { log.Level = from.Log.Level.Val } diff --git a/model/modeldecoder/v2/metadata_test.go b/model/modeldecoder/v2/metadata_test.go index ffa1b50ca2..3dda9336c0 100644 --- a/model/modeldecoder/v2/metadata_test.go +++ b/model/modeldecoder/v2/metadata_test.go @@ -113,6 +113,10 @@ func isUnmappedMetadataField(key string) bool { "Event", "Event.Duration", "Event.Outcome", + "Event.Severity", + "Event.Action", + "Log", + "Log.Level", "Service.Origin", "Service.Origin.ID", "Service.Origin.Name", diff --git a/model/modelprocessor/culprit_test.go b/model/modelprocessor/culprit_test.go index a514a427b9..95ea9d0da4 100644 --- a/model/modelprocessor/culprit_test.go +++ b/model/modelprocessor/culprit_test.go @@ -40,7 +40,7 @@ func TestSetCulprit(t *testing.T) { }, { input: model.Error{ Culprit: "already_set", - Log: &model.Log{ + Log: &model.ErrorLog{ Stacktrace: model.Stacktrace{{SourcemapUpdated: false, Filename: "foo.go"}}, }, }, @@ -48,7 +48,7 @@ func TestSetCulprit(t *testing.T) { }, { input: model.Error{ Culprit: "already_set", - Log: &model.Log{ + Log: &model.ErrorLog{ Stacktrace: model.Stacktrace{{SourcemapUpdated: true, LibraryFrame: true, Filename: "foo.go"}}, }, }, @@ -56,7 +56,7 @@ func TestSetCulprit(t *testing.T) { }, { input: model.Error{ Culprit: "already_set", - Log: &model.Log{ + Log: &model.ErrorLog{ Stacktrace: model.Stacktrace{ {SourcemapUpdated: true, LibraryFrame: true, Filename: "foo.go"}, {SourcemapUpdated: true, LibraryFrame: false, Filename: "foo2.go"}, @@ -67,7 +67,7 @@ func TestSetCulprit(t *testing.T) { }, { input: model.Error{ Culprit: "already_set", - Log: &model.Log{ + Log: &model.ErrorLog{ Stacktrace: model.Stacktrace{{SourcemapUpdated: true, LibraryFrame: true, Filename: "foo.go"}}, }, Exception: &model.Exception{ @@ -77,7 +77,7 @@ func TestSetCulprit(t *testing.T) { culprit: "foo2.go", }, { input: model.Error{ - Log: &model.Log{ + Log: &model.ErrorLog{ Stacktrace: model.Stacktrace{ {SourcemapUpdated: true, Classname: "AbstractFactoryManagerBean", Function: "toString"}, }, diff --git a/model/modelprocessor/errormessage_test.go b/model/modelprocessor/errormessage_test.go index 2d75988242..1dee8137df 100644 --- a/model/modelprocessor/errormessage_test.go +++ b/model/modelprocessor/errormessage_test.go @@ -35,20 +35,20 @@ func TestSetErrorMessage(t *testing.T) { input: model.Error{}, message: "", }, { - input: model.Error{Log: &model.Log{Message: "log_message"}}, + input: model.Error{Log: &model.ErrorLog{Message: "log_message"}}, message: "log_message", }, { input: model.Error{Exception: &model.Exception{Message: "exception_message"}}, message: "exception_message", }, { input: model.Error{ - Log: &model.Log{}, + Log: &model.ErrorLog{}, Exception: &model.Exception{Message: "exception_message"}, }, message: "exception_message", }, { input: model.Error{ - Log: &model.Log{Message: "log_message"}, + Log: &model.ErrorLog{Message: "log_message"}, Exception: &model.Exception{Message: "exception_message"}, }, message: "log_message", diff --git a/model/modelprocessor/excludefromgrouping_test.go b/model/modelprocessor/excludefromgrouping_test.go index 2399301ee2..c8097bb728 100644 --- a/model/modelprocessor/excludefromgrouping_test.go +++ b/model/modelprocessor/excludefromgrouping_test.go @@ -60,7 +60,7 @@ func TestSetExcludeFromGrouping(t *testing.T) { }, { input: model.Batch{{ Error: &model.Error{ - Log: &model.Log{ + Log: &model.ErrorLog{ Stacktrace: model.Stacktrace{ {Filename: "foo.go"}, }, @@ -82,7 +82,7 @@ func TestSetExcludeFromGrouping(t *testing.T) { }}, output: model.Batch{{ Error: &model.Error{ - Log: &model.Log{ + Log: &model.ErrorLog{ Stacktrace: model.Stacktrace{ {ExcludeFromGrouping: true, Filename: "foo.go"}, }, diff --git a/model/modelprocessor/groupingkey_test.go b/model/modelprocessor/groupingkey_test.go index d6339d0aad..4ddfd711b5 100644 --- a/model/modelprocessor/groupingkey_test.go +++ b/model/modelprocessor/groupingkey_test.go @@ -43,7 +43,7 @@ func TestSetGroupingKey(t *testing.T) { Exception: &model.Exception{ Type: "exception_type", }, - Log: &model.Log{ + Log: &model.ErrorLog{ ParamMessage: "log_parammessage", }, }, @@ -72,7 +72,7 @@ func TestSetGroupingKey(t *testing.T) { }, }}, }, - Log: &model.Log{Stacktrace: model.Stacktrace{{Filename: "abc"}}}, // ignored + Log: &model.ErrorLog{Stacktrace: model.Stacktrace{{Filename: "abc"}}}, // ignored }, groupingKey: hashStrings( "module", "func_1", "filename", "func_2", "classname", "func_4", "func_5", "func_6", @@ -80,7 +80,7 @@ func TestSetGroupingKey(t *testing.T) { }, "log_stacktrace": { input: model.Error{ - Log: &model.Log{ + Log: &model.ErrorLog{ Stacktrace: model.Stacktrace{{Function: "function"}}, }, }, @@ -99,13 +99,13 @@ func TestSetGroupingKey(t *testing.T) { Message: "message_4", }}, }, - Log: &model.Log{Message: "log_message"}, // ignored + Log: &model.ErrorLog{Message: "log_message"}, // ignored }, groupingKey: hashStrings("message_1", "message_2", "message_3", "message_4"), }, "log_message": { input: model.Error{ - Log: &model.Log{Message: "log_message"}, // ignored + Log: &model.ErrorLog{Message: "log_message"}, // ignored }, groupingKey: hashStrings("log_message"), }, diff --git a/model/modelprocessor/libraryframe_test.go b/model/modelprocessor/libraryframe_test.go index dcf3d8f609..3e86d8d27f 100644 --- a/model/modelprocessor/libraryframe_test.go +++ b/model/modelprocessor/libraryframe_test.go @@ -62,7 +62,7 @@ func TestSetLibraryFrames(t *testing.T) { }, { input: model.Batch{{ Error: &model.Error{ - Log: &model.Log{ + Log: &model.ErrorLog{ Stacktrace: model.Stacktrace{ {LibraryFrame: true, Filename: "foo.go"}, {LibraryFrame: false, AbsPath: "foobar.go"}, @@ -87,7 +87,7 @@ func TestSetLibraryFrames(t *testing.T) { }}, output: model.Batch{{ Error: &model.Error{ - Log: &model.Log{ + Log: &model.ErrorLog{ Stacktrace: model.Stacktrace{ {LibraryFrame: true, Filename: "foo.go", Original: model.Original{LibraryFrame: true}}, {LibraryFrame: true, AbsPath: "foobar.go", Original: model.Original{LibraryFrame: false}}, diff --git a/processor/otel/logs.go b/processor/otel/logs.go new file mode 100644 index 0000000000..f9cb2f15b9 --- /dev/null +++ b/processor/otel/logs.go @@ -0,0 +1,133 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Portions copied from OpenTelemetry Collector (contrib), from the +// elastic exporter. +// +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otel + +import ( + "context" + "time" + + "go.opentelemetry.io/collector/model/otlp" + "go.opentelemetry.io/collector/model/pdata" + + apmserverlogs "github.com/elastic/apm-server/log" + "github.com/elastic/apm-server/model" + "github.com/elastic/beats/v7/libbeat/logp" +) + +var jsonLogsMarshaler = otlp.NewJSONLogsMarshaler() + +func (c *Consumer) ConsumeLogs(ctx context.Context, logs pdata.Logs) error { + receiveTimestamp := time.Now() + logger := logp.NewLogger(apmserverlogs.Otel) + if logger.IsDebug() { + data, err := jsonLogsMarshaler.MarshalLogs(logs) + if err != nil { + logger.Debug(err) + } else { + logger.Debug(string(data)) + } + } + resourceLogs := logs.ResourceLogs() + batch := make(model.Batch, 0, resourceLogs.Len()) + for i := 0; i < resourceLogs.Len(); i++ { + c.convertResourceLogs(resourceLogs.At(i), receiveTimestamp, &batch) + } + return c.Processor.ProcessBatch(ctx, &batch) +} + +func (c *Consumer) convertResourceLogs(resourceLogs pdata.ResourceLogs, receiveTimestamp time.Time, out *model.Batch) { + var timeDelta time.Duration + resource := resourceLogs.Resource() + baseEvent := model.APMEvent{Processor: model.LogProcessor} + translateResourceMetadata(resource, &baseEvent) + if exportTimestamp, ok := exportTimestamp(resource); ok { + timeDelta = receiveTimestamp.Sub(exportTimestamp) + } + instrumentationLibraryLogs := resourceLogs.InstrumentationLibraryLogs() + for i := 0; i < instrumentationLibraryLogs.Len(); i++ { + c.convertInstrumentationLibraryLogs(instrumentationLibraryLogs.At(i), baseEvent, timeDelta, out) + } +} + +func (c *Consumer) convertInstrumentationLibraryLogs( + in pdata.InstrumentationLibraryLogs, + baseEvent model.APMEvent, + timeDelta time.Duration, + out *model.Batch, +) { + otelLogs := in.Logs() + for i := 0; i < otelLogs.Len(); i++ { + event := c.convertLogRecord(otelLogs.At(i), baseEvent, timeDelta) + *out = append(*out, event) + } +} + +func (c *Consumer) convertLogRecord( + record pdata.LogRecord, + baseEvent model.APMEvent, + timeDelta time.Duration, +) model.APMEvent { + event := baseEvent + event.Timestamp = record.Timestamp().AsTime().Add(timeDelta) + event.Event.Severity = int64(record.SeverityNumber()) + event.Event.Action = record.Name() + event.Log.Level = record.SeverityText() + if body := record.Body(); body.Type() != pdata.AttributeValueTypeNull { + event.Message = body.AsString() + if body.Type() == pdata.AttributeValueTypeMap { + body.MapVal().Range(func(k string, v pdata.AttributeValue) bool { + setLabel(k, &event, ifaceAttributeValue(v)) + return true + }) + } + } + if traceID := record.TraceID(); !traceID.IsEmpty() { + event.Trace.ID = traceID.HexString() + } + if spanID := record.SpanID(); !spanID.IsEmpty() { + if event.Span == nil { + event.Span = &model.Span{} + } + event.Span.ID = spanID.HexString() + } + if attrs := record.Attributes(); attrs.Len() > 0 { + initEventLabels(&event) + attrs.Range(func(k string, v pdata.AttributeValue) bool { + setLabel(k, &event, ifaceAttributeValue(v)) + return true + }) + } + return event +} diff --git a/processor/otel/logs_test.go b/processor/otel/logs_test.go new file mode 100644 index 0000000000..2b424a231c --- /dev/null +++ b/processor/otel/logs_test.go @@ -0,0 +1,150 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Portions copied from OpenTelemetry Collector (contrib), from the +// elastic exporter. +// +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otel_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/model/pdata" + semconv "go.opentelemetry.io/collector/model/semconv/v1.5.0" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/processor/otel" +) + +func TestConsumerConsumeLogs(t *testing.T) { + t.Run("empty", func(t *testing.T) { + var processor model.ProcessBatchFunc = func(_ context.Context, batch *model.Batch) error { + assert.Empty(t, batch) + return nil + } + + consumer := otel.Consumer{Processor: processor} + logs := pdata.NewLogs() + assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs)) + }) + + commonEvent := model.APMEvent{ + Processor: model.LogProcessor, + Agent: model.Agent{ + Name: "otlp/go", + Version: "unknown", + }, + Service: model.Service{ + Name: "unknown", + Language: model.Language{Name: "go"}, + }, + Message: "a random log message", + Event: model.Event{ + Severity: int64(pdata.SeverityNumberINFO), + Action: "doOperation()", + }, + Log: model.Log{Level: "Info"}, + Span: &model.Span{ID: "0200000000000000"}, + Trace: model.Trace{ID: "01000000000000000000000000000000"}, + Labels: model.Labels{ + "key": model.LabelValue{Value: "value"}, + }, + NumericLabels: model.NumericLabels{ + "numeric_key": model.NumericLabelValue{Value: 1234}, + }, + } + test := func(name string, body interface{}, expectedMessage string) { + t.Run(name, func(t *testing.T) { + logs := newLogs(body) + + var processed model.Batch + var processor model.ProcessBatchFunc = func(_ context.Context, batch *model.Batch) error { + if processed != nil { + panic("already processes batch") + } + processed = *batch + assert.NotNil(t, processed[0].Timestamp) + processed[0].Timestamp = time.Time{} + return nil + } + consumer := otel.Consumer{Processor: processor} + assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs)) + + expected := commonEvent + expected.Message = expectedMessage + assert.Equal(t, model.Batch{expected}, processed) + }) + } + test("string_body", "a random log message", "a random log message") + test("int_body", 1234, "1234") + test("float_body", 1234.1234, "1234.1234") + test("bool_body", true, "true") + // TODO(marclop): How to test map body +} + +func newLogs(body interface{}) pdata.Logs { + logs := pdata.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + logs.ResourceLogs().At(0).Resource().Attributes().InitFromMap(map[string]pdata.AttributeValue{ + semconv.AttributeTelemetrySDKLanguage: pdata.NewAttributeValueString("go"), + }) + instrumentationLogs := resourceLogs.InstrumentationLibraryLogs().AppendEmpty() + otelLog := instrumentationLogs.Logs().AppendEmpty() + otelLog.SetTraceID(pdata.NewTraceID([16]byte{1})) + otelLog.SetSpanID(pdata.NewSpanID([8]byte{2})) + otelLog.SetName("doOperation()") + otelLog.SetSeverityNumber(pdata.SeverityNumberINFO) + otelLog.SetSeverityText("Info") + otelLog.SetTimestamp(pdata.NewTimestampFromTime(time.Now())) + otelLog.Attributes().InitFromMap(map[string]pdata.AttributeValue{ + "key": pdata.NewAttributeValueString("value"), + "numeric_key": pdata.NewAttributeValueDouble(1234), + }) + + switch b := body.(type) { + case string: + otelLog.Body().SetStringVal(b) + case int: + otelLog.Body().SetIntVal(int64(b)) + case float64: + otelLog.Body().SetDoubleVal(float64(b)) + case bool: + otelLog.Body().SetBoolVal(b) + // case map[string]string: + // TODO(marclop) figure out how to set the body since it cannot be set + // as a map. + // otelLog.Body() + } + return logs +} diff --git a/processor/otel/traces.go b/processor/otel/traces.go index 1b052d8c62..53602f7f95 100644 --- a/processor/otel/traces.go +++ b/processor/otel/traces.go @@ -925,7 +925,7 @@ func convertJaegerErrorSpanEvent(logger *logp.Logger, event pdata.SpanEvent, apm } e := &model.Error{} if logMessage != "" { - e.Log = &model.Log{Message: logMessage} + e.Log = &model.ErrorLog{Message: logMessage} } if exMessage != "" || exType != "" { e.Exception = &model.Exception{ diff --git a/sourcemap/processor_test.go b/sourcemap/processor_test.go index 4121d575bb..ca5bae21fd 100644 --- a/sourcemap/processor_test.go +++ b/sourcemap/processor_test.go @@ -147,7 +147,7 @@ func TestBatchProcessor(t *testing.T) { error2 := model.APMEvent{ Service: service, Error: &model.Error{ - Log: &model.Log{ + Log: &model.ErrorLog{ Stacktrace: model.Stacktrace{{ AbsPath: "bundle.js", Lineno: newInt(originalLinenoWithoutFilename), @@ -197,7 +197,7 @@ func TestBatchProcessor(t *testing.T) { }, }, span2.Span) assert.Equal(t, &model.Error{ - Log: &model.Log{ + Log: &model.ErrorLog{ Stacktrace: model.Stacktrace{ cloneFrame(mappedFrameWithoutFilename), },