Skip to content

Commit

Permalink
otlp: Add experimental support to receive logs
Browse files Browse the repository at this point in the history
Adds support to receive OTLP logs on the GRPC receiver. This feature is
_beta_ and should be used sparsely until the OTel spec declares it as
stable.

Currently, it's challenging to test this in the system tests since the
`opentelemetry-go` SDK doesn't expose any APIs for logs.

Signed-off-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
marclop committed Dec 2, 2021
1 parent add9721 commit 017039d
Show file tree
Hide file tree
Showing 20 changed files with 398 additions and 30 deletions.
8 changes: 8 additions & 0 deletions beater/otlp/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,22 @@ var (
gRPCMetricsMonitoringMap = request.MonitoringMapForRegistry(gRPCMetricsRegistry, monitoringKeys)
gRPCTracesRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.traces")
gRPCTracesMonitoringMap = request.MonitoringMapForRegistry(gRPCTracesRegistry, monitoringKeys)
gRPCLogsRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.logs")
gRPCLogsMonitoringMap = request.MonitoringMapForRegistry(gRPCLogsRegistry, monitoringKeys)

// RegistryMonitoringMaps provides mappings from the fully qualified gRPC
// method name to its respective monitoring map.
RegistryMonitoringMaps = map[string]map[request.ResultID]*monitoring.Int{
metricsFullMethod: gRPCMetricsMonitoringMap,
tracesFullMethod: gRPCTracesMonitoringMap,
logsFullMethod: gRPCLogsMonitoringMap,
}
)

const (
metricsFullMethod = "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export"
tracesFullMethod = "/opentelemetry.proto.collector.trace.v1.TraceService/Export"
logsFullMethod = "/opentelemetry.proto.collector.logs.v1.LogsService/Export"
)

func init() {
Expand All @@ -68,6 +72,7 @@ func MethodAuthenticators(authenticator *auth.Authenticator) map[string]intercep
return map[string]interceptors.MethodAuthenticator{
metricsFullMethod: metadataMethodAuthenticator,
tracesFullMethod: metadataMethodAuthenticator,
logsFullMethod: metadataMethodAuthenticator,
}
}

Expand All @@ -86,6 +91,9 @@ func RegisterGRPCServices(grpcServer *grpc.Server, processor model.BatchProcesso
if err := otlpreceiver.RegisterMetricsReceiver(context.Background(), consumer, grpcServer); err != nil {
return errors.Wrap(err, "failed to register OTLP metrics receiver")
}
if err := otlpreceiver.RegisterLogsReceiver(context.Background(), consumer, grpcServer); err != nil {
return errors.Wrap(err, "failed to register OTLP logs receiver")
}
return nil
}

Expand Down
47 changes: 47 additions & 0 deletions beater/otlp/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,53 @@ func TestConsumeMetrics(t *testing.T) {
}, actual)
}

func TestConsumeLogs(t *testing.T) {
var batches []model.Batch
var reportError error
var batchProcessor model.ProcessBatchFunc = func(ctx context.Context, batch *model.Batch) error {
batches = append(batches, *batch)
return reportError
}

conn := newServer(t, batchProcessor)
client := otlpgrpc.NewLogsClient(conn)

// Send a minimal log to verify that everything is connected properly.
//
// We intentionally do not check the published event contents; those are
// tested in processor/otel.
logs := pdata.NewLogs()
log := logs.ResourceLogs().AppendEmpty().InstrumentationLibraryLogs().AppendEmpty().Logs().AppendEmpty()
log.SetName("log_name")

_, err := client.Export(context.Background(), logs)
assert.NoError(t, err)
require.Len(t, batches, 1)

reportError = errors.New("failed to publish events")
_, err = client.Export(context.Background(), logs)
assert.Error(t, err)
errStatus := status.Convert(err)
assert.Equal(t, "failed to publish events", errStatus.Message())
require.Len(t, batches, 2)
assert.Len(t, batches[0], 1)
assert.Len(t, batches[1], 1)

actual := map[string]interface{}{}
monitoring.GetRegistry("apm-server.otlp.grpc.logs").Do(monitoring.Full, func(key string, value interface{}) {
actual[key] = value
})
assert.Equal(t, map[string]interface{}{
"request.count": int64(2),
"response.count": int64(2),
"response.errors.count": int64(1),
"response.valid.count": int64(1),
"response.errors.ratelimit": int64(0),
"response.errors.timeout": int64(0),
"response.errors.unauthorized": int64(0),
}, actual)
}

func newServer(t *testing.T, batchProcessor model.BatchProcessor) *grpc.ClientConn {
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
Expand Down
5 changes: 2 additions & 3 deletions model/apmevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ import (
// Exactly one of the event fields should be non-nil.
type APMEvent struct {
// DataStream optionally holds data stream identifiers.
//
// This will have the zero value when APM Server is run
// in standalone mode.
DataStream DataStream

ECSVersion string
Expand All @@ -59,6 +56,7 @@ type APMEvent struct {
Child Child
HTTP HTTP
FAAS FAAS
Log Log

// Timestamp holds the event timestamp.
//
Expand Down Expand Up @@ -152,5 +150,6 @@ func (e *APMEvent) BeatEvent(ctx context.Context) beat.Event {
fields.maybeSetString("message", e.Message)
fields.maybeSetMapStr("http", e.HTTP.fields())
fields.maybeSetMapStr("faas", e.FAAS.fields())
fields.maybeSetMapStr("log", e.Log.fields())
return event
}
4 changes: 2 additions & 2 deletions model/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Error struct {
Custom common.MapStr

Exception *Exception
Log *Log
Log *ErrorLog
}

type Exception struct {
Expand All @@ -52,7 +52,7 @@ type Exception struct {
Cause []Exception
}

type Log struct {
type ErrorLog struct {
Message string
Level string
ParamMessage string
Expand Down
6 changes: 3 additions & 3 deletions model/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func (e *Exception) withCode(code string) *Exception {
return e
}

func baseLog() *Log {
return &Log{Message: "error log message"}
func baseLog() *ErrorLog {
return &ErrorLog{Message: "error log message"}
}

func TestHandleExceptionTree(t *testing.T) {
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestEventFields(t *testing.T) {
loggerName := "logger"
logMsg := "error log message"
paramMsg := "param message"
log := Log{
log := ErrorLog{
Level: level,
Message: logMsg,
ParamMessage: paramMsg,
Expand Down
10 changes: 10 additions & 0 deletions model/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,20 @@ type Event struct {

// Outcome holds the event outcome: "success", "failure", or "unknown".
Outcome string

// Severity holds the numeric severity of the event for log events.
Severity int64

// Severity holds the action captured by the event for log events.
Action string
}

func (e *Event) fields() common.MapStr {
var fields mapStr
fields.maybeSetString("outcome", e.Outcome)
fields.maybeSetString("action", e.Action)
if e.Severity > 0 {
fields.set("severity", e.Severity)
}
return common.MapStr(fields)
}
16 changes: 16 additions & 0 deletions model/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package model

import "github.com/elastic/beats/v7/libbeat/common"

const (
AppLogsDataset = "apm.app"
)
Expand All @@ -25,3 +27,17 @@ var (
// LogProcessor is the Processor value that should be assigned to log events.
LogProcessor = Processor{Name: "log", Event: "log"}
)

// Log holds information about a log, as defined by ECS.
//
// https://www.elastic.co/guide/en/ecs/current/ecs-log.html
type Log struct {
// Level holds the log level of the log event.
Level string
}

func (e Log) fields() common.MapStr {
var fields mapStr
fields.maybeSetString("level", e.Level)
return common.MapStr(fields)
}
2 changes: 1 addition & 1 deletion model/modeldecoder/rumv3/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func mapToErrorModel(from *errorEvent, event *model.APMEvent) {
out.ID = from.ID.Val
}
if from.Log.IsSet() {
log := model.Log{}
log := model.ErrorLog{}
if from.Log.Level.IsSet() {
log.Level = from.Log.Level.Val
}
Expand Down
1 change: 1 addition & 0 deletions model/modeldecoder/rumv3/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func metadataExceptions(keys ...string) func(key string) bool {
"Session",
"Trace",
"URL",
"Log",

// Dedicated test for it.
"NumericLabels",
Expand Down
2 changes: 1 addition & 1 deletion model/modeldecoder/v2/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func mapToErrorModel(from *errorEvent, event *model.APMEvent) {
out.ID = from.ID.Val
}
if from.Log.IsSet() {
log := model.Log{}
log := model.ErrorLog{}
if from.Log.Level.IsSet() {
log.Level = from.Log.Level.Val
}
Expand Down
4 changes: 4 additions & 0 deletions model/modeldecoder/v2/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func isUnmappedMetadataField(key string) bool {
"Event",
"Event.Duration",
"Event.Outcome",
"Event.Severity",
"Event.Action",
"Log",
"Log.Level",
"Service.Origin",
"Service.Origin.ID",
"Service.Origin.Name",
Expand Down
10 changes: 5 additions & 5 deletions model/modelprocessor/culprit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,23 @@ func TestSetCulprit(t *testing.T) {
}, {
input: model.Error{
Culprit: "already_set",
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{{SourcemapUpdated: false, Filename: "foo.go"}},
},
},
culprit: "already_set",
}, {
input: model.Error{
Culprit: "already_set",
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{{SourcemapUpdated: true, LibraryFrame: true, Filename: "foo.go"}},
},
},
culprit: "already_set",
}, {
input: model.Error{
Culprit: "already_set",
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{
{SourcemapUpdated: true, LibraryFrame: true, Filename: "foo.go"},
{SourcemapUpdated: true, LibraryFrame: false, Filename: "foo2.go"},
Expand All @@ -67,7 +67,7 @@ func TestSetCulprit(t *testing.T) {
}, {
input: model.Error{
Culprit: "already_set",
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{{SourcemapUpdated: true, LibraryFrame: true, Filename: "foo.go"}},
},
Exception: &model.Exception{
Expand All @@ -77,7 +77,7 @@ func TestSetCulprit(t *testing.T) {
culprit: "foo2.go",
}, {
input: model.Error{
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{
{SourcemapUpdated: true, Classname: "AbstractFactoryManagerBean", Function: "toString"},
},
Expand Down
6 changes: 3 additions & 3 deletions model/modelprocessor/errormessage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,20 @@ func TestSetErrorMessage(t *testing.T) {
input: model.Error{},
message: "",
}, {
input: model.Error{Log: &model.Log{Message: "log_message"}},
input: model.Error{Log: &model.ErrorLog{Message: "log_message"}},
message: "log_message",
}, {
input: model.Error{Exception: &model.Exception{Message: "exception_message"}},
message: "exception_message",
}, {
input: model.Error{
Log: &model.Log{},
Log: &model.ErrorLog{},
Exception: &model.Exception{Message: "exception_message"},
},
message: "exception_message",
}, {
input: model.Error{
Log: &model.Log{Message: "log_message"},
Log: &model.ErrorLog{Message: "log_message"},
Exception: &model.Exception{Message: "exception_message"},
},
message: "log_message",
Expand Down
4 changes: 2 additions & 2 deletions model/modelprocessor/excludefromgrouping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestSetExcludeFromGrouping(t *testing.T) {
}, {
input: model.Batch{{
Error: &model.Error{
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{
{Filename: "foo.go"},
},
Expand All @@ -82,7 +82,7 @@ func TestSetExcludeFromGrouping(t *testing.T) {
}},
output: model.Batch{{
Error: &model.Error{
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{
{ExcludeFromGrouping: true, Filename: "foo.go"},
},
Expand Down
10 changes: 5 additions & 5 deletions model/modelprocessor/groupingkey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestSetGroupingKey(t *testing.T) {
Exception: &model.Exception{
Type: "exception_type",
},
Log: &model.Log{
Log: &model.ErrorLog{
ParamMessage: "log_parammessage",
},
},
Expand Down Expand Up @@ -72,15 +72,15 @@ func TestSetGroupingKey(t *testing.T) {
},
}},
},
Log: &model.Log{Stacktrace: model.Stacktrace{{Filename: "abc"}}}, // ignored
Log: &model.ErrorLog{Stacktrace: model.Stacktrace{{Filename: "abc"}}}, // ignored
},
groupingKey: hashStrings(
"module", "func_1", "filename", "func_2", "classname", "func_4", "func_5", "func_6",
),
},
"log_stacktrace": {
input: model.Error{
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{{Function: "function"}},
},
},
Expand All @@ -99,13 +99,13 @@ func TestSetGroupingKey(t *testing.T) {
Message: "message_4",
}},
},
Log: &model.Log{Message: "log_message"}, // ignored
Log: &model.ErrorLog{Message: "log_message"}, // ignored
},
groupingKey: hashStrings("message_1", "message_2", "message_3", "message_4"),
},
"log_message": {
input: model.Error{
Log: &model.Log{Message: "log_message"}, // ignored
Log: &model.ErrorLog{Message: "log_message"}, // ignored
},
groupingKey: hashStrings("log_message"),
},
Expand Down
4 changes: 2 additions & 2 deletions model/modelprocessor/libraryframe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestSetLibraryFrames(t *testing.T) {
}, {
input: model.Batch{{
Error: &model.Error{
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{
{LibraryFrame: true, Filename: "foo.go"},
{LibraryFrame: false, AbsPath: "foobar.go"},
Expand All @@ -87,7 +87,7 @@ func TestSetLibraryFrames(t *testing.T) {
}},
output: model.Batch{{
Error: &model.Error{
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{
{LibraryFrame: true, Filename: "foo.go", Original: model.Original{LibraryFrame: true}},
{LibraryFrame: true, AbsPath: "foobar.go", Original: model.Original{LibraryFrame: false}},
Expand Down
Loading

0 comments on commit 017039d

Please sign in to comment.