Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce model.Processor #5984

Merged
merged 11 commits into from
Aug 24, 2021
1 change: 1 addition & 0 deletions beater/api/profile/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func appendProfileSampleBatch(pp *profile.Profile, baseEvent model.APMEvent, out
}

event := baseEvent
event.Processor = model.ProfileProcessor
event.Labels = event.Labels.Clone()
if n := len(sample.Label); n > 0 {
for k, v := range sample.Label {
Expand Down
2 changes: 2 additions & 0 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/elastic/beats/v7/libbeat/common/transport"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/go-ucfg"

"github.com/pkg/errors"
Expand Down Expand Up @@ -479,6 +480,7 @@ func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) R
modelprocessor.SetErrorMessage{},
newObserverBatchProcessor(s.beat.Info),
model.ProcessBatchFunc(ecsVersionBatchProcessor),
modelprocessor.NewEventCounter(monitoring.Default.GetRegistry("apm-server")),
}
if s.config.DefaultServiceEnvironment != "" {
processors = append(processors, &modelprocessor.SetDefaultServiceEnvironment{
Expand Down
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits]
- `transaction.page` and `error.page` no longer recorded {pull}5872[5872]
- experimental:["This breaking change applies to the experimental tail-based sampling feature."] `apm-server.sampling.tail` now requires `apm-server.data_streams.enabled` {pull}5952[5952]
- beta:["This breaking change applies to the beta <<apm-integration>>."] The `traces-sampled-*` data stream is now `traces-apm.sampled-*` {pull}5952[5952]
- Removed unused stacktrace/frame monitoring counters {pull}5984[5984]

[float]
==== Bug fixes
Expand Down
5 changes: 4 additions & 1 deletion model/apmevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type APMEvent struct {
Network Network
Session Session
URL URL
Processor Processor
Trace Trace

// Timestamp holds the event timestamp.
Expand Down Expand Up @@ -91,7 +92,8 @@ func (e *APMEvent) BeatEvent(ctx context.Context) beat.Event {
event.Fields = e.Error.fields()
case e.ProfileSample != nil:
event.Fields = e.ProfileSample.fields()
default:
}
if event.Fields == nil {
event.Fields = make(common.MapStr)
}

Expand Down Expand Up @@ -133,6 +135,7 @@ func (e *APMEvent) BeatEvent(ctx context.Context) beat.Event {
fields.maybeSetMapStr("event", e.Event.fields())
fields.maybeSetMapStr("url", e.URL.fields())
fields.maybeSetMapStr("session", e.Session.fields())
fields.maybeSetMapStr("processor", e.Processor.fields())
fields.maybeSetMapStr("trace", e.Trace.fields())
fields.maybeSetString("message", e.Message)
return event
Expand Down
9 changes: 5 additions & 4 deletions model/apmevent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestAPMEventFields(t *testing.T) {
Message: "bottle",
Transaction: &Transaction{},
Timestamp: time.Date(2019, 1, 3, 15, 17, 4, 908.596*1e6, time.FixedZone("+0100", 3600)),
Processor: Processor{Name: "processor_name", Event: "processor_event"},
Trace: Trace{ID: traceID},
},
output: common.MapStr{
Expand Down Expand Up @@ -107,12 +108,12 @@ func TestAPMEventFields(t *testing.T) {
"trace": common.MapStr{
"id": traceID,
},

// fields related to APMEvent.Transaction
"processor": common.MapStr{
"name": "transaction",
"event": "transaction",
"name": "processor_name",
"event": "processor_event",
},

// fields related to APMEvent.Transaction
"timestamp": common.MapStr{"us": int64(1546525024908596)},
"transaction": common.MapStr{
"duration": common.MapStr{"us": 0},
Expand Down
30 changes: 4 additions & 26 deletions model/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,15 @@ package model

import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/monitoring"
)

var (
errorMetrics = monitoring.Default.NewRegistry("apm-server.processor.error")
errorTransformations = monitoring.NewInt(errorMetrics, "transformations")
errorStacktraceCounter = monitoring.NewInt(errorMetrics, "stacktraces")
errorFrameCounter = monitoring.NewInt(errorMetrics, "frames")
errorProcessorEntry = common.MapStr{"name": errorProcessorName, "event": errorDocType}
// ErrorProcessor is the Processor value that should be assigned to error events.
ErrorProcessor = Processor{Name: "error", Event: "error"}
)

const (
errorProcessorName = "error"
errorDocType = "error"
ErrorsDataset = "apm.error"
ErrorsDataset = "apm.error"
)

type Error struct {
Expand Down Expand Up @@ -76,16 +70,7 @@ type Log struct {
}

func (e *Error) fields() common.MapStr {
errorTransformations.Inc()

if e.Exception != nil {
addStacktraceCounter(e.Exception.Stacktrace)
}
if e.Log != nil {
addStacktraceCounter(e.Log.Stacktrace)
}

fields := mapStr{"processor": errorProcessorEntry}
var fields mapStr
if e.HTTP != nil {
fields.maybeSetMapStr("http", e.HTTP.transactionTopLevelFields())
}
Expand Down Expand Up @@ -161,13 +146,6 @@ func (e *Error) logFields() common.MapStr {
return common.MapStr(log)
}

func addStacktraceCounter(st Stacktrace) {
if frames := len(st); frames > 0 {
errorStacktraceCounter.Inc()
errorFrameCounter.Add(int64(frames))
}
}

// flattenExceptionTree recursively traverses the causes of an exception to return a slice of exceptions.
// Tree traversal is Depth First.
// The parent of a exception in the resulting slice is at the position indicated by the `parent` property
Expand Down
27 changes: 8 additions & 19 deletions model/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,16 @@ package model

import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/monitoring"
)

const (
metricsetProcessorName = "metric"
metricsetDocType = "metric"
metricsetEventKey = "event"
metricsetTransactionKey = "transaction"
metricsetSpanKey = "span"
AppMetricsDataset = "apm.app"
InternalMetricsDataset = "apm.internal"
AppMetricsDataset = "apm.app"
InternalMetricsDataset = "apm.internal"
)

var (
metricsetMetrics = monitoring.Default.NewRegistry("apm-server.processor.metric")
metricsetTransformations = monitoring.NewInt(metricsetMetrics, "transformations")
metricsetProcessorEntry = common.MapStr{"name": metricsetProcessorName, "event": metricsetDocType}
// MetricsetProcessor is the Processor value that should be assigned to metricset events.
MetricsetProcessor = Processor{Name: "metric", Event: "metric"}
)

// MetricType describes the type of a metric: gauge, counter, or histogram.
Expand Down Expand Up @@ -141,13 +134,9 @@ type MetricsetSpan struct {
}

func (me *Metricset) fields() common.MapStr {
metricsetTransformations.Inc()

var fields mapStr
fields.set("processor", metricsetProcessorEntry)

fields.maybeSetMapStr(metricsetTransactionKey, me.Transaction.fields())
fields.maybeSetMapStr(metricsetSpanKey, me.Span.fields())
fields.maybeSetMapStr("transaction", me.Transaction.fields())
fields.maybeSetMapStr("span", me.Span.fields())
if me.TimeseriesInstanceID != "" {
fields.set("timeseries", common.MapStr{"instance": me.TimeseriesInstanceID})
}
Expand All @@ -158,7 +147,7 @@ func (me *Metricset) fields() common.MapStr {

var metricDescriptions mapStr
for name, sample := range me.Samples {
sample.set(name, fields)
sample.set(name, &fields)

var md mapStr
md.maybeSetString("type", string(sample.Type))
Expand Down Expand Up @@ -190,7 +179,7 @@ func (s *MetricsetSpan) fields() common.MapStr {
return common.MapStr(fields)
}

func (s *MetricsetSample) set(name string, fields mapStr) {
func (s *MetricsetSample) set(name string, fields *mapStr) {
if s.Type == MetricTypeHistogram {
fields.set(name, common.MapStr{
"counts": s.Counts,
Expand Down
12 changes: 2 additions & 10 deletions model/metricset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,12 @@ func TestMetricset(t *testing.T) {
}{
{
Metricset: &Metricset{},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
},
Msg: "Payload with empty metric.",
Output: common.MapStr{},
Msg: "Payload with empty metric.",
},
{
Metricset: &Metricset{Name: "raj"},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
"metricset.name": "raj",
},
Msg: "Payload with metricset name.",
Expand All @@ -67,7 +64,6 @@ func TestMetricset(t *testing.T) {
},
},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
"a.counter": 612.0,
"some.gauge": 9.16,
},
Expand All @@ -82,7 +78,6 @@ func TestMetricset(t *testing.T) {
},
},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
"transaction": common.MapStr{"type": trType, "name": trName},
"span": common.MapStr{
"type": spType, "subtype": spSubtype,
Expand Down Expand Up @@ -111,7 +106,6 @@ func TestMetricset(t *testing.T) {
DocCount: 6,
},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
"timeseries": common.MapStr{"instance": "foo"},
"transaction": common.MapStr{
"type": trType,
Expand Down Expand Up @@ -143,7 +137,6 @@ func TestMetricset(t *testing.T) {
},
},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
"span": common.MapStr{
"type": spType, "subtype": spSubtype,
"destination": common.MapStr{"service": common.MapStr{"resource": resource}},
Expand Down Expand Up @@ -173,7 +166,6 @@ func TestMetricset(t *testing.T) {
},
},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
"latency_histogram": common.MapStr{
"counts": []int64{1, 2, 3},
"values": []float64{1.1, 2.2, 3.3},
Expand Down
24 changes: 14 additions & 10 deletions model/modeldecoder/rumv3/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,20 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch
*batch = append(*batch, transaction)

for _, m := range root.Transaction.Metricsets {
metricset := input.Base
mapToMetricsetModel(&m, &metricset)
metricset.Metricset.Transaction.Name = transaction.Transaction.Name
metricset.Metricset.Transaction.Type = transaction.Transaction.Type
*batch = append(*batch, metricset)
event := input.Base
mapToMetricsetModel(&m, &event)
event.Metricset.Transaction.Name = transaction.Transaction.Name
event.Metricset.Transaction.Type = transaction.Transaction.Type
*batch = append(*batch, event)
}

offset := len(*batch)
for _, s := range root.Transaction.Spans {
span := input.Base
mapToSpanModel(&s, &span)
span.Span.TransactionID = transaction.Transaction.ID
span.Trace = transaction.Trace
*batch = append(*batch, span)
event := input.Base
mapToSpanModel(&s, &event)
event.Span.TransactionID = transaction.Transaction.ID
event.Trace = transaction.Trace
*batch = append(*batch, event)
}
spans := (*batch)[offset:]
for i, s := range root.Transaction.Spans {
Expand All @@ -190,6 +190,7 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch
func mapToErrorModel(from *errorEvent, event *model.APMEvent) {
out := &model.Error{}
event.Error = out
event.Processor = model.ErrorProcessor

// overwrite metadata with event specific information
mapToServiceModel(from.Context.Service, &event.Service)
Expand Down Expand Up @@ -391,6 +392,7 @@ func mapToMetadataModel(m *metadata, out *model.APMEvent) {
func mapToMetricsetModel(from *metricset, event *model.APMEvent) {
out := &model.Metricset{}
event.Metricset = out
event.Processor = model.MetricsetProcessor

// map samples information
if from.Samples.IsSet() {
Expand Down Expand Up @@ -512,6 +514,7 @@ func mapToAgentModel(from contextServiceAgent, out *model.Agent) {
func mapToSpanModel(from *span, event *model.APMEvent) {
out := &model.Span{}
event.Span = out
event.Processor = model.SpanProcessor

// map span specific data
if !from.Action.IsSet() && !from.Subtype.IsSet() {
Expand Down Expand Up @@ -690,6 +693,7 @@ func mapToStracktraceModel(from []stacktraceFrame, out model.Stacktrace) {
func mapToTransactionModel(from *transaction, event *model.APMEvent) {
out := &model.Transaction{}
event.Transaction = out
event.Processor = model.TransactionProcessor

// overwrite metadata with event specific information
mapToServiceModel(from.Context.Service, &event.Service)
Expand Down
1 change: 1 addition & 0 deletions model/modeldecoder/rumv3/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func metadataExceptions(keys ...string) func(key string) bool {
"Network",
"Observer",
"Process",
"Processor",
"Service.Node",
"Service.Agent.EphemeralID",
"Host",
Expand Down
4 changes: 4 additions & 0 deletions model/modeldecoder/v2/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func mapToClientModel(from contextRequest, out *model.Client) {
func mapToErrorModel(from *errorEvent, config modeldecoder.Config, event *model.APMEvent) {
out := &model.Error{}
event.Error = out
event.Processor = model.ErrorProcessor

// overwrite metadata with event specific information
mapToServiceModel(from.Context.Service, &event.Service)
Expand Down Expand Up @@ -538,6 +539,7 @@ func mapToMetadataModel(from *metadata, out *model.APMEvent) {
func mapToMetricsetModel(from *metricset, config modeldecoder.Config, event *model.APMEvent) {
out := &model.Metricset{}
event.Metricset = out
event.Processor = model.MetricsetProcessor

if !from.Timestamp.Val.IsZero() {
event.Timestamp = from.Timestamp.Val
Expand Down Expand Up @@ -725,6 +727,7 @@ func mapToAgentModel(from contextServiceAgent, out *model.Agent) {
func mapToSpanModel(from *span, config modeldecoder.Config, event *model.APMEvent) {
out := &model.Span{}
event.Span = out
event.Processor = model.SpanProcessor

// map span specific data
if !from.Action.IsSet() && !from.Subtype.IsSet() {
Expand Down Expand Up @@ -987,6 +990,7 @@ func mapToStracktraceModel(from []stacktraceFrame, out model.Stacktrace) {

func mapToTransactionModel(from *transaction, config modeldecoder.Config, event *model.APMEvent) {
out := &model.Transaction{}
event.Processor = model.TransactionProcessor
event.Transaction = out

// overwrite metadata with event specific information
Expand Down
3 changes: 3 additions & 0 deletions model/modeldecoder/v2/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func isUnmappedMetadataField(key string) bool {
"Observer.VersionMajor",
"Process.CommandLine",
"Process.Executable",
"Processor",
"Processor.Event",
"Processor.Name",
"Host.OS.Full",
"Host.OS.Type",
"Host.ID",
Expand Down
Loading