Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otlp: Add beta support to receive logs #6768

Merged
merged 7 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions beater/otlp/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,22 @@ var (
gRPCMetricsMonitoringMap = request.MonitoringMapForRegistry(gRPCMetricsRegistry, monitoringKeys)
gRPCTracesRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.traces")
gRPCTracesMonitoringMap = request.MonitoringMapForRegistry(gRPCTracesRegistry, monitoringKeys)
gRPCLogsRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.logs")
gRPCLogsMonitoringMap = request.MonitoringMapForRegistry(gRPCLogsRegistry, monitoringKeys)

// RegistryMonitoringMaps provides mappings from the fully qualified gRPC
// method name to its respective monitoring map.
RegistryMonitoringMaps = map[string]map[request.ResultID]*monitoring.Int{
metricsFullMethod: gRPCMetricsMonitoringMap,
tracesFullMethod: gRPCTracesMonitoringMap,
logsFullMethod: gRPCLogsMonitoringMap,
}
)

const (
metricsFullMethod = "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export"
tracesFullMethod = "/opentelemetry.proto.collector.trace.v1.TraceService/Export"
logsFullMethod = "/opentelemetry.proto.collector.logs.v1.LogsService/Export"
)

func init() {
Expand All @@ -68,6 +72,7 @@ func MethodAuthenticators(authenticator *auth.Authenticator) map[string]intercep
return map[string]interceptors.MethodAuthenticator{
metricsFullMethod: metadataMethodAuthenticator,
tracesFullMethod: metadataMethodAuthenticator,
logsFullMethod: metadataMethodAuthenticator,
}
}

Expand All @@ -86,6 +91,9 @@ func RegisterGRPCServices(grpcServer *grpc.Server, processor model.BatchProcesso
if err := otlpreceiver.RegisterMetricsReceiver(context.Background(), consumer, grpcServer); err != nil {
return errors.Wrap(err, "failed to register OTLP metrics receiver")
}
if err := otlpreceiver.RegisterLogsReceiver(context.Background(), consumer, grpcServer); err != nil {
return errors.Wrap(err, "failed to register OTLP logs receiver")
}
return nil
}

Expand Down
47 changes: 47 additions & 0 deletions beater/otlp/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,53 @@ func TestConsumeMetrics(t *testing.T) {
}, actual)
}

func TestConsumeLogs(t *testing.T) {
var batches []model.Batch
var reportError error
var batchProcessor model.ProcessBatchFunc = func(ctx context.Context, batch *model.Batch) error {
batches = append(batches, *batch)
return reportError
}

conn := newServer(t, batchProcessor)
client := otlpgrpc.NewLogsClient(conn)

// Send a minimal log to verify that everything is connected properly.
//
// We intentionally do not check the published event contents; those are
// tested in processor/otel.
logs := pdata.NewLogs()
log := logs.ResourceLogs().AppendEmpty().InstrumentationLibraryLogs().AppendEmpty().Logs().AppendEmpty()
log.SetName("log_name")

_, err := client.Export(context.Background(), logs)
assert.NoError(t, err)
require.Len(t, batches, 1)

reportError = errors.New("failed to publish events")
_, err = client.Export(context.Background(), logs)
assert.Error(t, err)
errStatus := status.Convert(err)
assert.Equal(t, "failed to publish events", errStatus.Message())
require.Len(t, batches, 2)
assert.Len(t, batches[0], 1)
assert.Len(t, batches[1], 1)

actual := map[string]interface{}{}
monitoring.GetRegistry("apm-server.otlp.grpc.logs").Do(monitoring.Full, func(key string, value interface{}) {
actual[key] = value
})
assert.Equal(t, map[string]interface{}{
"request.count": int64(2),
"response.count": int64(2),
"response.errors.count": int64(1),
"response.valid.count": int64(1),
"response.errors.ratelimit": int64(0),
"response.errors.timeout": int64(0),
"response.errors.unauthorized": int64(0),
}, actual)
}

func newServer(t *testing.T, batchProcessor model.BatchProcessor) *grpc.ClientConn {
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ https://github.com/elastic/apm-server/compare/7.15\...master[View commits]
- The `labels` indexed field is now ECS compliant (string only) and added a new `numeric_labels` object that holds labels with numeric values {pull}6633[6633]
- Modify default standalone apm-server config values to be more in line with the default managed apm-server values {pull}6675[6675]
- APM Server is now using a new Elasticsearch output implementation {pull}6656[6656]
- APM Server now supports receiving OpenTelemetry Logs on the OTLP/GRPC receiver {pull}6768[6768]
marclop marked this conversation as resolved.
Show resolved Hide resolved

[float]
==== Deprecated
Expand Down
2 changes: 1 addition & 1 deletion docs/open-telemetry.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -401,4 +401,4 @@ Here is an example of AWS Lambda Node.js function managed with Terraform and the
[[open-telemetry-logs-limitations]]
===== OpenTelemetry logs

* OpenTelemetry logs are not yet supported https://github.com/elastic/apm-server/issues/5491[#5491]
* OpenTelemetry logs are supported with **beta support** from 8.0 https://github.com/elastic/apm-server/pull/6768[#6768].
5 changes: 2 additions & 3 deletions model/apmevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ import (
// Exactly one of the event fields should be non-nil.
type APMEvent struct {
// DataStream optionally holds data stream identifiers.
//
// This will have the zero value when APM Server is run
// in standalone mode.
DataStream DataStream

ECSVersion string
Expand All @@ -59,6 +56,7 @@ type APMEvent struct {
Child Child
HTTP HTTP
FAAS FAAS
Log Log

// Timestamp holds the event timestamp.
//
Expand Down Expand Up @@ -152,5 +150,6 @@ func (e *APMEvent) BeatEvent(ctx context.Context) beat.Event {
fields.maybeSetString("message", e.Message)
fields.maybeSetMapStr("http", e.HTTP.fields())
fields.maybeSetMapStr("faas", e.FAAS.fields())
fields.maybeSetMapStr("log", e.Log.fields())
return event
}
4 changes: 2 additions & 2 deletions model/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Error struct {
Custom common.MapStr

Exception *Exception
Log *Log
Log *ErrorLog
}

type Exception struct {
Expand All @@ -52,7 +52,7 @@ type Exception struct {
Cause []Exception
}

type Log struct {
type ErrorLog struct {
Message string
Level string
ParamMessage string
Expand Down
6 changes: 3 additions & 3 deletions model/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func (e *Exception) withCode(code string) *Exception {
return e
}

func baseLog() *Log {
return &Log{Message: "error log message"}
func baseLog() *ErrorLog {
return &ErrorLog{Message: "error log message"}
}

func TestHandleExceptionTree(t *testing.T) {
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestEventFields(t *testing.T) {
loggerName := "logger"
logMsg := "error log message"
paramMsg := "param message"
log := Log{
log := ErrorLog{
Level: level,
Message: logMsg,
ParamMessage: paramMsg,
Expand Down
10 changes: 10 additions & 0 deletions model/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,20 @@ type Event struct {

// Outcome holds the event outcome: "success", "failure", or "unknown".
Outcome string

// Severity holds the numeric severity of the event for log events.
Severity int64

// Severity holds the action captured by the event for log events.
Action string
marclop marked this conversation as resolved.
Show resolved Hide resolved
}

func (e *Event) fields() common.MapStr {
var fields mapStr
fields.maybeSetString("outcome", e.Outcome)
fields.maybeSetString("action", e.Action)
if e.Severity > 0 {
fields.set("severity", e.Severity)
}
return common.MapStr(fields)
}
16 changes: 16 additions & 0 deletions model/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package model

import "github.com/elastic/beats/v7/libbeat/common"

const (
AppLogsDataset = "apm.app"
)
Expand All @@ -25,3 +27,17 @@ var (
// LogProcessor is the Processor value that should be assigned to log events.
LogProcessor = Processor{Name: "log", Event: "log"}
)

// Log holds information about a log, as defined by ECS.
//
// https://www.elastic.co/guide/en/ecs/current/ecs-log.html
type Log struct {
// Level holds the log level of the log event.
Level string
marclop marked this conversation as resolved.
Show resolved Hide resolved
}

func (e Log) fields() common.MapStr {
var fields mapStr
fields.maybeSetString("level", e.Level)
return common.MapStr(fields)
}
2 changes: 1 addition & 1 deletion model/modeldecoder/rumv3/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func mapToErrorModel(from *errorEvent, event *model.APMEvent) {
out.ID = from.ID.Val
}
if from.Log.IsSet() {
log := model.Log{}
log := model.ErrorLog{}
if from.Log.Level.IsSet() {
log.Level = from.Log.Level.Val
}
Expand Down
1 change: 1 addition & 0 deletions model/modeldecoder/rumv3/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func metadataExceptions(keys ...string) func(key string) bool {
"Session",
"Trace",
"URL",
"Log",

// Dedicated test for it.
"NumericLabels",
Expand Down
2 changes: 1 addition & 1 deletion model/modeldecoder/v2/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func mapToErrorModel(from *errorEvent, event *model.APMEvent) {
out.ID = from.ID.Val
}
if from.Log.IsSet() {
log := model.Log{}
log := model.ErrorLog{}
if from.Log.Level.IsSet() {
log.Level = from.Log.Level.Val
}
Expand Down
4 changes: 4 additions & 0 deletions model/modeldecoder/v2/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func isUnmappedMetadataField(key string) bool {
"Event",
"Event.Duration",
"Event.Outcome",
"Event.Severity",
"Event.Action",
"Log",
"Log.Level",
"Service.Origin",
"Service.Origin.ID",
"Service.Origin.Name",
Expand Down
10 changes: 5 additions & 5 deletions model/modelprocessor/culprit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,23 @@ func TestSetCulprit(t *testing.T) {
}, {
input: model.Error{
Culprit: "already_set",
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{{SourcemapUpdated: false, Filename: "foo.go"}},
},
},
culprit: "already_set",
}, {
input: model.Error{
Culprit: "already_set",
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{{SourcemapUpdated: true, LibraryFrame: true, Filename: "foo.go"}},
},
},
culprit: "already_set",
}, {
input: model.Error{
Culprit: "already_set",
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{
{SourcemapUpdated: true, LibraryFrame: true, Filename: "foo.go"},
{SourcemapUpdated: true, LibraryFrame: false, Filename: "foo2.go"},
Expand All @@ -67,7 +67,7 @@ func TestSetCulprit(t *testing.T) {
}, {
input: model.Error{
Culprit: "already_set",
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{{SourcemapUpdated: true, LibraryFrame: true, Filename: "foo.go"}},
},
Exception: &model.Exception{
Expand All @@ -77,7 +77,7 @@ func TestSetCulprit(t *testing.T) {
culprit: "foo2.go",
}, {
input: model.Error{
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{
{SourcemapUpdated: true, Classname: "AbstractFactoryManagerBean", Function: "toString"},
},
Expand Down
6 changes: 3 additions & 3 deletions model/modelprocessor/errormessage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,20 @@ func TestSetErrorMessage(t *testing.T) {
input: model.Error{},
message: "",
}, {
input: model.Error{Log: &model.Log{Message: "log_message"}},
input: model.Error{Log: &model.ErrorLog{Message: "log_message"}},
message: "log_message",
}, {
input: model.Error{Exception: &model.Exception{Message: "exception_message"}},
message: "exception_message",
}, {
input: model.Error{
Log: &model.Log{},
Log: &model.ErrorLog{},
Exception: &model.Exception{Message: "exception_message"},
},
message: "exception_message",
}, {
input: model.Error{
Log: &model.Log{Message: "log_message"},
Log: &model.ErrorLog{Message: "log_message"},
Exception: &model.Exception{Message: "exception_message"},
},
message: "log_message",
Expand Down
4 changes: 2 additions & 2 deletions model/modelprocessor/excludefromgrouping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestSetExcludeFromGrouping(t *testing.T) {
}, {
input: model.Batch{{
Error: &model.Error{
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{
{Filename: "foo.go"},
},
Expand All @@ -82,7 +82,7 @@ func TestSetExcludeFromGrouping(t *testing.T) {
}},
output: model.Batch{{
Error: &model.Error{
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{
{ExcludeFromGrouping: true, Filename: "foo.go"},
},
Expand Down
10 changes: 5 additions & 5 deletions model/modelprocessor/groupingkey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestSetGroupingKey(t *testing.T) {
Exception: &model.Exception{
Type: "exception_type",
},
Log: &model.Log{
Log: &model.ErrorLog{
ParamMessage: "log_parammessage",
},
},
Expand Down Expand Up @@ -72,15 +72,15 @@ func TestSetGroupingKey(t *testing.T) {
},
}},
},
Log: &model.Log{Stacktrace: model.Stacktrace{{Filename: "abc"}}}, // ignored
Log: &model.ErrorLog{Stacktrace: model.Stacktrace{{Filename: "abc"}}}, // ignored
},
groupingKey: hashStrings(
"module", "func_1", "filename", "func_2", "classname", "func_4", "func_5", "func_6",
),
},
"log_stacktrace": {
input: model.Error{
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{{Function: "function"}},
},
},
Expand All @@ -99,13 +99,13 @@ func TestSetGroupingKey(t *testing.T) {
Message: "message_4",
}},
},
Log: &model.Log{Message: "log_message"}, // ignored
Log: &model.ErrorLog{Message: "log_message"}, // ignored
},
groupingKey: hashStrings("message_1", "message_2", "message_3", "message_4"),
},
"log_message": {
input: model.Error{
Log: &model.Log{Message: "log_message"}, // ignored
Log: &model.ErrorLog{Message: "log_message"}, // ignored
},
groupingKey: hashStrings("log_message"),
},
Expand Down
Loading