Skip to content

Commit

Permalink
otlp: Add beta support to receive logs (#6768)
Browse files Browse the repository at this point in the history
Adds support to receive OTLP logs on the GRPC receiver. This feature is
_beta_ and should be used sparsely until the OTel spec declares it as
stable.

Signed-off-by: Marc Lopez Rubio <[email protected]>
(cherry picked from commit 5e5623e)

# Conflicts:
#	changelogs/head.asciidoc
  • Loading branch information
marclop authored and mergify-bot committed Dec 7, 2021
1 parent f500a74 commit a0d04a5
Show file tree
Hide file tree
Showing 28 changed files with 593 additions and 33 deletions.
3 changes: 3 additions & 0 deletions apmpackage/apm/changelog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
- description: updated ingest pipelines to reject events from apm-servers newer than installed integration
type: enhancement
link: https://github.com/elastic/apm-server/pull/6791
- description: added event.{outcome,severity} and log.level to app_logs data stream
type: enhancement
link: https://github.com/elastic/apm-server/pull/6791
- version: "7.16.1"
changes:
- description: Added `agent_config_applied` mapping to `metrics-apm.internal` data stream.
Expand Down
6 changes: 6 additions & 0 deletions apmpackage/apm/data_stream/app_logs/fields/ecs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
name: ecs.version
- external: ecs
name: event.outcome
- external: ecs
name: event.severity
- external: ecs
name: event.action
- external: ecs
name: host.architecture
- external: ecs
Expand All @@ -79,6 +83,8 @@
- external: ecs
name: labels
dynamic: true
- external: ecs
name: log.level
- external: ecs
name: message
- external: ecs
Expand Down
8 changes: 8 additions & 0 deletions beater/otlp/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,22 @@ var (
gRPCMetricsMonitoringMap = request.MonitoringMapForRegistry(gRPCMetricsRegistry, monitoringKeys)
gRPCTracesRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.traces")
gRPCTracesMonitoringMap = request.MonitoringMapForRegistry(gRPCTracesRegistry, monitoringKeys)
gRPCLogsRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.logs")
gRPCLogsMonitoringMap = request.MonitoringMapForRegistry(gRPCLogsRegistry, monitoringKeys)

// RegistryMonitoringMaps provides mappings from the fully qualified gRPC
// method name to its respective monitoring map.
RegistryMonitoringMaps = map[string]map[request.ResultID]*monitoring.Int{
metricsFullMethod: gRPCMetricsMonitoringMap,
tracesFullMethod: gRPCTracesMonitoringMap,
logsFullMethod: gRPCLogsMonitoringMap,
}
)

const (
metricsFullMethod = "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export"
tracesFullMethod = "/opentelemetry.proto.collector.trace.v1.TraceService/Export"
logsFullMethod = "/opentelemetry.proto.collector.logs.v1.LogsService/Export"
)

func init() {
Expand All @@ -68,6 +72,7 @@ func MethodAuthenticators(authenticator *auth.Authenticator) map[string]intercep
return map[string]interceptors.MethodAuthenticator{
metricsFullMethod: metadataMethodAuthenticator,
tracesFullMethod: metadataMethodAuthenticator,
logsFullMethod: metadataMethodAuthenticator,
}
}

Expand All @@ -86,6 +91,9 @@ func RegisterGRPCServices(grpcServer *grpc.Server, processor model.BatchProcesso
if err := otlpreceiver.RegisterMetricsReceiver(context.Background(), consumer, grpcServer); err != nil {
return errors.Wrap(err, "failed to register OTLP metrics receiver")
}
if err := otlpreceiver.RegisterLogsReceiver(context.Background(), consumer, grpcServer); err != nil {
return errors.Wrap(err, "failed to register OTLP logs receiver")
}
return nil
}

Expand Down
47 changes: 47 additions & 0 deletions beater/otlp/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,53 @@ func TestConsumeMetrics(t *testing.T) {
}, actual)
}

func TestConsumeLogs(t *testing.T) {
var batches []model.Batch
var reportError error
var batchProcessor model.ProcessBatchFunc = func(ctx context.Context, batch *model.Batch) error {
batches = append(batches, *batch)
return reportError
}

conn := newServer(t, batchProcessor)
client := otlpgrpc.NewLogsClient(conn)

// Send a minimal log to verify that everything is connected properly.
//
// We intentionally do not check the published event contents; those are
// tested in processor/otel.
logs := pdata.NewLogs()
log := logs.ResourceLogs().AppendEmpty().InstrumentationLibraryLogs().AppendEmpty().Logs().AppendEmpty()
log.SetName("log_name")

_, err := client.Export(context.Background(), logs)
assert.NoError(t, err)
require.Len(t, batches, 1)

reportError = errors.New("failed to publish events")
_, err = client.Export(context.Background(), logs)
assert.Error(t, err)
errStatus := status.Convert(err)
assert.Equal(t, "failed to publish events", errStatus.Message())
require.Len(t, batches, 2)
assert.Len(t, batches[0], 1)
assert.Len(t, batches[1], 1)

actual := map[string]interface{}{}
monitoring.GetRegistry("apm-server.otlp.grpc.logs").Do(monitoring.Full, func(key string, value interface{}) {
actual[key] = value
})
assert.Equal(t, map[string]interface{}{
"request.count": int64(2),
"response.count": int64(2),
"response.errors.count": int64(1),
"response.valid.count": int64(1),
"response.errors.ratelimit": int64(0),
"response.errors.timeout": int64(0),
"response.errors.unauthorized": int64(0),
}, actual)
}

func newServer(t *testing.T, batchProcessor model.BatchProcessor) *grpc.ClientConn {
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
Expand Down
48 changes: 48 additions & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
[[release-notes-head]]
== APM version HEAD

https://github.com/elastic/apm-server/compare/7.15\...master[View commits]

[float]
==== Breaking Changes
- Removed legacy Jaeger gRPC/HTTP endpoints {pull}6417[6417]
- Removed source map upload endpoint {pull}6447[6447]
- Removed unsupported libbeat `processors` configuration {pull}6474[6474]
- Removed `apm-server.aggregation.transactions.enabled` configuration option {pull}6495[6495]
- Removed `apm-server.aggregation.service_destinations.enabled` configuration option {pull}6503[6503]
- Removed `apm-server.sampling.keep_unsampled` configuration option; non-RUM unsampled transactions are always dropped {pull}6514[6514] {pull}6669[6669]
- Removed `apm-server.jaeger` configuration options {pull}6560[6560]
- Removed `apm-server.instrumentation` configuration options in favor of `instrumentation` {pull}6560[6560]
- Removed `apm-server.rum.{allowed_service,event_rate}` configuration option in favor of `apm-server.auth.anonymous.{allow_service,rate_limit}` {pull}6560[6560]
- Removed `apm-server.{api_key,secret_token}` configuration options in favor of `apm-server.auth.{api_key,secret_token}` {pull}6560[6560]
- Onboarding documents are no longer indexed {pull}6431[6431]
- Removed `apm-server.register.ingest.pipeline` and `output.elasticsearch.pipeline` configuration options {pull}6575[6575]
- Removed unused `span.start.us` field, and deprecated `span.http.*` fields {pull}6602[6602]
- Removed `apm-server.data_streams.enabled`, and `setup.*` configuration options {pull}6606[6606]
- Removed `logging.ecs` and `logging.json` config {pull}6613[6613]

[float]
==== Bug fixes
- In accord with ECS, the server logs now set `source.address` to the immediate network peer's IP address, and `client.ip` to the originating client IP if known {pull}6690[6690]
- `host.ip` is now stored as an array, as specified by ECS {pull}6694[6694]

[float]
==== Intake API Changes
- `transaction.name` was added to the error objects in the intake API {pull}6539[6539]

[float]
==== Added
- Introduced a delete phase for all data streams. Traces, errors and logs are kept for 10 days, metrics are kept for 90 days {pull}6480[6480]
- Changed RUM traces to use a dedicated data stream (`traces-apm.rum`). RUM traces are kept for 90 days {pull}6480[6480]
- `apm-server` artifacts now have the apm java-attacher.jar packaged alongside them {pull}6593[6593]
- Added metrics for new Elasticsearch output: `libbeat.output.events.{acked,batches,toomany}`; added tracing and log correlation {pull}6630[6630]
- Run the java attacher jar when configured and not in a cloud environment {pull}6617[6617]
- The `labels` indexed field is now ECS compliant (string only) and added a new `numeric_labels` object that holds labels with numeric values {pull}6633[6633]
- Modify default standalone apm-server config values to be more in line with the default managed apm-server values {pull}6675[6675]
- APM Server is now using a new Elasticsearch output implementation {pull}6656[6656]
- Standalone apm-server can now fetch source maps uploaded to Kibana, when `apm-server.kibana` is configured {pull}6447[6447]
- APM Server now has beta support to receive OpenTelemetry Logs on the OTLP/GRPC receiver {pull}6768[6768]

[float]
==== Licensing Changes
- Updated the `x-pack` source files license to the Elastic License 2.0 {pull}6524[6524]
2 changes: 1 addition & 1 deletion docs/open-telemetry.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -401,4 +401,4 @@ Here is an example of AWS Lambda Node.js function managed with Terraform and the
[[open-telemetry-logs-limitations]]
===== OpenTelemetry logs

* OpenTelemetry logs are not yet supported https://github.com/elastic/apm-server/issues/5491[#5491]
* OpenTelemetry logs are supported with **beta support** from 8.0 https://github.com/elastic/apm-server/pull/6768[#6768].
5 changes: 2 additions & 3 deletions model/apmevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ import (
// Exactly one of the event fields should be non-nil.
type APMEvent struct {
// DataStream optionally holds data stream identifiers.
//
// This will have the zero value when APM Server is run
// in standalone mode.
DataStream DataStream

ECSVersion string
Expand All @@ -59,6 +56,7 @@ type APMEvent struct {
Child Child
HTTP HTTP
FAAS FAAS
Log Log

// Timestamp holds the event timestamp.
//
Expand Down Expand Up @@ -152,5 +150,6 @@ func (e *APMEvent) BeatEvent(ctx context.Context) beat.Event {
fields.maybeSetString("message", e.Message)
fields.maybeSetMapStr("http", e.HTTP.fields())
fields.maybeSetMapStr("faas", e.FAAS.fields())
fields.maybeSetMapStr("log", e.Log.fields())
return event
}
4 changes: 2 additions & 2 deletions model/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Error struct {
Custom common.MapStr

Exception *Exception
Log *Log
Log *ErrorLog
}

type Exception struct {
Expand All @@ -52,7 +52,7 @@ type Exception struct {
Cause []Exception
}

type Log struct {
type ErrorLog struct {
Message string
Level string
ParamMessage string
Expand Down
6 changes: 3 additions & 3 deletions model/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func (e *Exception) withCode(code string) *Exception {
return e
}

func baseLog() *Log {
return &Log{Message: "error log message"}
func baseLog() *ErrorLog {
return &ErrorLog{Message: "error log message"}
}

func TestHandleExceptionTree(t *testing.T) {
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestEventFields(t *testing.T) {
loggerName := "logger"
logMsg := "error log message"
paramMsg := "param message"
log := Log{
log := ErrorLog{
Level: level,
Message: logMsg,
ParamMessage: paramMsg,
Expand Down
10 changes: 10 additions & 0 deletions model/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,20 @@ type Event struct {

// Outcome holds the event outcome: "success", "failure", or "unknown".
Outcome string

// Severity holds the numeric severity of the event for log events.
Severity int64

// Severity holds the action captured by the event for log events.
Action string
}

func (e *Event) fields() common.MapStr {
var fields mapStr
fields.maybeSetString("outcome", e.Outcome)
fields.maybeSetString("action", e.Action)
if e.Severity > 0 {
fields.set("severity", e.Severity)
}
return common.MapStr(fields)
}
16 changes: 16 additions & 0 deletions model/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package model

import "github.com/elastic/beats/v7/libbeat/common"

const (
AppLogsDataset = "apm.app"
)
Expand All @@ -25,3 +27,17 @@ var (
// LogProcessor is the Processor value that should be assigned to log events.
LogProcessor = Processor{Name: "log", Event: "log"}
)

// Log holds information about a log, as defined by ECS.
//
// https://www.elastic.co/guide/en/ecs/current/ecs-log.html
type Log struct {
// Level holds the log level of the log event.
Level string
}

func (e Log) fields() common.MapStr {
var fields mapStr
fields.maybeSetString("level", e.Level)
return common.MapStr(fields)
}
2 changes: 1 addition & 1 deletion model/modeldecoder/rumv3/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func mapToErrorModel(from *errorEvent, event *model.APMEvent) {
out.ID = from.ID.Val
}
if from.Log.IsSet() {
log := model.Log{}
log := model.ErrorLog{}
if from.Log.Level.IsSet() {
log.Level = from.Log.Level.Val
}
Expand Down
1 change: 1 addition & 0 deletions model/modeldecoder/rumv3/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func metadataExceptions(keys ...string) func(key string) bool {
"Session",
"Trace",
"URL",
"Log",

// Dedicated test for it.
"NumericLabels",
Expand Down
2 changes: 1 addition & 1 deletion model/modeldecoder/v2/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func mapToErrorModel(from *errorEvent, event *model.APMEvent) {
out.ID = from.ID.Val
}
if from.Log.IsSet() {
log := model.Log{}
log := model.ErrorLog{}
if from.Log.Level.IsSet() {
log.Level = from.Log.Level.Val
}
Expand Down
4 changes: 4 additions & 0 deletions model/modeldecoder/v2/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func isUnmappedMetadataField(key string) bool {
"Event",
"Event.Duration",
"Event.Outcome",
"Event.Severity",
"Event.Action",
"Log",
"Log.Level",
"Service.Origin",
"Service.Origin.ID",
"Service.Origin.Name",
Expand Down
10 changes: 5 additions & 5 deletions model/modelprocessor/culprit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,23 @@ func TestSetCulprit(t *testing.T) {
}, {
input: model.Error{
Culprit: "already_set",
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{{SourcemapUpdated: false, Filename: "foo.go"}},
},
},
culprit: "already_set",
}, {
input: model.Error{
Culprit: "already_set",
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{{SourcemapUpdated: true, LibraryFrame: true, Filename: "foo.go"}},
},
},
culprit: "already_set",
}, {
input: model.Error{
Culprit: "already_set",
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{
{SourcemapUpdated: true, LibraryFrame: true, Filename: "foo.go"},
{SourcemapUpdated: true, LibraryFrame: false, Filename: "foo2.go"},
Expand All @@ -67,7 +67,7 @@ func TestSetCulprit(t *testing.T) {
}, {
input: model.Error{
Culprit: "already_set",
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{{SourcemapUpdated: true, LibraryFrame: true, Filename: "foo.go"}},
},
Exception: &model.Exception{
Expand All @@ -77,7 +77,7 @@ func TestSetCulprit(t *testing.T) {
culprit: "foo2.go",
}, {
input: model.Error{
Log: &model.Log{
Log: &model.ErrorLog{
Stacktrace: model.Stacktrace{
{SourcemapUpdated: true, Classname: "AbstractFactoryManagerBean", Function: "toString"},
},
Expand Down
6 changes: 3 additions & 3 deletions model/modelprocessor/errormessage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,20 @@ func TestSetErrorMessage(t *testing.T) {
input: model.Error{},
message: "",
}, {
input: model.Error{Log: &model.Log{Message: "log_message"}},
input: model.Error{Log: &model.ErrorLog{Message: "log_message"}},
message: "log_message",
}, {
input: model.Error{Exception: &model.Exception{Message: "exception_message"}},
message: "exception_message",
}, {
input: model.Error{
Log: &model.Log{},
Log: &model.ErrorLog{},
Exception: &model.Exception{Message: "exception_message"},
},
message: "exception_message",
}, {
input: model.Error{
Log: &model.Log{Message: "log_message"},
Log: &model.ErrorLog{Message: "log_message"},
Exception: &model.Exception{Message: "exception_message"},
},
message: "log_message",
Expand Down
Loading

0 comments on commit a0d04a5

Please sign in to comment.