Skip to content

Commit

Permalink
aggregation/spanmetrics: handle composite spans (#5896)
Browse files Browse the repository at this point in the history
Update spanmetrics to take into account composite spans,
multiplying the span count by the composite count and
using the reported composite span duration instead of
the "gross" duration.

(cherry picked from commit b1bf67d)

# Conflicts:
#	changelogs/head.asciidoc
  • Loading branch information
axw authored and mergify-bot committed Aug 8, 2021
1 parent 0d7d075 commit 13fa4aa
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 2 deletions.
25 changes: 25 additions & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[[release-notes-head]]
== APM Server version HEAD

https://github.com/elastic/apm-server/compare/7.13\...master[View commits]

[float]
==== Breaking Changes
- `network.connection_type` is now `network.connection.type` {pull}5671[5671]
- `transaction.page` and `error.page` no longer recorded {pull}5872[5872]

[float]
==== Bug fixes
- Fix apm_error_grouping_name and apm_convert_destination_address {pull}5876[5876]
- corrected OTel attribute names for `net.host.connection.*` {pull}5671[5671]

[float]
==== Intake API Changes
- `network.connection.type` was added to stream metadata {pull}5671[5671]

[float]
==== Added
- `service_destination` span metrics now take into account composite spans {pull}5896[5896]

[float]
==== Deprecated
14 changes: 12 additions & 2 deletions x-pack/apm-server/aggregation/spanmetrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,26 @@ func (a *Aggregator) processSpan(event *model.APMEvent) model.APMEvent {
return model.APMEvent{}
}

// For composite spans we use the composite sum duration, which is the sum of
// pre-aggregated spans and excludes time gaps that are counted in the reported
// span duration. For non-composite spans we just use the reported span duration.
count := 1
durationMillis := event.Span.Duration
if event.Span.Composite != nil {
count = event.Span.Composite.Count
durationMillis = event.Span.Composite.Sum
}
duration := time.Duration(durationMillis * float64(time.Millisecond))

key := aggregationKey{
serviceEnvironment: event.Service.Environment,
serviceName: event.Service.Name,
agentName: event.Agent.Name,
outcome: event.Event.Outcome,
resource: event.Span.DestinationService.Resource,
}
duration := time.Duration(event.Span.Duration * float64(time.Millisecond))
metrics := spanMetrics{
count: event.Span.RepresentativeCount,
count: float64(count) * event.Span.RepresentativeCount,
sum: float64(duration.Microseconds()) * event.Span.RepresentativeCount,
}
if a.active.storeOrUpdate(key, metrics) {
Expand Down
39 changes: 39 additions & 0 deletions x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,45 @@ func TestAggregatorRun(t *testing.T) {
}
}

func TestAggregateCompositeSpan(t *testing.T) {
batches := make(chan model.Batch, 1)
agg, err := NewAggregator(AggregatorConfig{
BatchProcessor: makeChanBatchProcessor(batches),
Interval: 10 * time.Millisecond,
MaxGroups: 1000,
})
require.NoError(t, err)

span := makeSpan("service-A", "java", "final_destination", "success", time.Second, 2)
span.Span.Composite = &model.Composite{Count: 25, Sum: 700 /* milliseconds */}
err = agg.ProcessBatch(context.Background(), &model.Batch{span})
require.NoError(t, err)

// Start the aggregator after processing to ensure metrics are aggregated deterministically.
go agg.Run()
defer agg.Stop(context.Background())

batch := expectBatch(t, batches)
metricsets := batchMetricsets(t, batch)

assert.Equal(t, []model.APMEvent{{
Agent: model.Agent{Name: "java"},
Service: model.Service{Name: "service-A"},
Event: model.Event{Outcome: "success"},
Metricset: &model.Metricset{
Name: "service_destination",
Span: model.MetricsetSpan{
DestinationService: model.DestinationService{Resource: "final_destination"},
},
Samples: map[string]model.MetricsetSample{
"span.destination.service.response_time.count": {Value: 50.0},
"span.destination.service.response_time.sum.us": {Value: 1400000},
"metricset.period": {Value: 10},
},
},
}}, metricsets)
}

func TestAggregatorOverflow(t *testing.T) {
batches := make(chan model.Batch, 1)
agg, err := NewAggregator(AggregatorConfig{
Expand Down

0 comments on commit 13fa4aa

Please sign in to comment.