Skip to content

Commit

Permalink
[7.x] aggregation/spanmetrics: handle composite spans (backport #5896) (
Browse files Browse the repository at this point in the history
#5904)

* aggregation/spanmetrics: handle composite spans (#5896)

Update spanmetrics to take into account composite spans,
multiplying the span count by the composite count and
using the reported composite span duration instead of
the "gross" duration.

(cherry picked from commit b1bf67d)

# Conflicts:
#	changelogs/head.asciidoc

* Delete head.asciidoc

* make update

Co-authored-by: Andrew Wilkins <[email protected]>
  • Loading branch information
mergify[bot] and axw authored Aug 8, 2021
1 parent 0d7d075 commit c3939f3
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 4 deletions.
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1022,11 +1022,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected]/LIC

--------------------------------------------------------------------------------
Dependency : github.com/elastic/go-elasticsearch/v7
Version: v7.5.1-0.20210728153421-6462d8b84e7d
Version: v7.5.1-0.20210803065541-d87f0bdcf0fe
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/go-elasticsearch/[email protected].20210728153421-6462d8b84e7d/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/elastic/go-elasticsearch/[email protected].20210803065541-d87f0bdcf0fe/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down
14 changes: 12 additions & 2 deletions x-pack/apm-server/aggregation/spanmetrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,26 @@ func (a *Aggregator) processSpan(event *model.APMEvent) model.APMEvent {
return model.APMEvent{}
}

// For composite spans we use the composite sum duration, which is the sum of
// pre-aggregated spans and excludes time gaps that are counted in the reported
// span duration. For non-composite spans we just use the reported span duration.
count := 1
durationMillis := event.Span.Duration
if event.Span.Composite != nil {
count = event.Span.Composite.Count
durationMillis = event.Span.Composite.Sum
}
duration := time.Duration(durationMillis * float64(time.Millisecond))

key := aggregationKey{
serviceEnvironment: event.Service.Environment,
serviceName: event.Service.Name,
agentName: event.Agent.Name,
outcome: event.Event.Outcome,
resource: event.Span.DestinationService.Resource,
}
duration := time.Duration(event.Span.Duration * float64(time.Millisecond))
metrics := spanMetrics{
count: event.Span.RepresentativeCount,
count: float64(count) * event.Span.RepresentativeCount,
sum: float64(duration.Microseconds()) * event.Span.RepresentativeCount,
}
if a.active.storeOrUpdate(key, metrics) {
Expand Down
39 changes: 39 additions & 0 deletions x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,45 @@ func TestAggregatorRun(t *testing.T) {
}
}

func TestAggregateCompositeSpan(t *testing.T) {
batches := make(chan model.Batch, 1)
agg, err := NewAggregator(AggregatorConfig{
BatchProcessor: makeChanBatchProcessor(batches),
Interval: 10 * time.Millisecond,
MaxGroups: 1000,
})
require.NoError(t, err)

span := makeSpan("service-A", "java", "final_destination", "success", time.Second, 2)
span.Span.Composite = &model.Composite{Count: 25, Sum: 700 /* milliseconds */}
err = agg.ProcessBatch(context.Background(), &model.Batch{span})
require.NoError(t, err)

// Start the aggregator after processing to ensure metrics are aggregated deterministically.
go agg.Run()
defer agg.Stop(context.Background())

batch := expectBatch(t, batches)
metricsets := batchMetricsets(t, batch)

assert.Equal(t, []model.APMEvent{{
Agent: model.Agent{Name: "java"},
Service: model.Service{Name: "service-A"},
Event: model.Event{Outcome: "success"},
Metricset: &model.Metricset{
Name: "service_destination",
Span: model.MetricsetSpan{
DestinationService: model.DestinationService{Resource: "final_destination"},
},
Samples: map[string]model.MetricsetSample{
"span.destination.service.response_time.count": {Value: 50.0},
"span.destination.service.response_time.sum.us": {Value: 1400000},
"metricset.period": {Value: 10},
},
},
}}, metricsets)
}

func TestAggregatorOverflow(t *testing.T) {
batches := make(chan model.Batch, 1)
agg, err := NewAggregator(AggregatorConfig{
Expand Down

0 comments on commit c3939f3

Please sign in to comment.