Skip to content

Commit

Permalink
transaction: Collect stats about dropped spans (#1132)
Browse files Browse the repository at this point in the history
Modifies the span timing reporting function to collect statistics about
dropped spans when they exceed the `transaction_max_spans`. Each dropped
span is now aggregated under the `transaction.dropped_spans_stats` array
with a maximum count of items of `128`.

Each dropped span is aggregated on: `destination_service_resource` and 
`outcome` reporting 2 metrics, their `duration.sum.us` and the `count` 
for each of the aggregated dropped spans.

Signed-off-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
marclop authored Oct 7, 2021
1 parent 798446f commit 36bdfef
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ endif::[]
https://github.com/elastic/apm-agent-go/compare/v1.14.0...master[View commits]
- Deprecate `http.request.socket.encrypted` and stop recording it in `module/apmhttp`, `module/apmgrpc` and `module/apmfiber`. {pull}1129[#(1129)]
- Collect and send span destination service timing statistics about the dropped spans to the apm-server. {pull}1132[#(1132)]
[[release-notes-1.x]]
=== Go Agent version 1.x
Expand Down
2 changes: 1 addition & 1 deletion breakdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func spanSelfTimeMetrics(txName, txType, spanType, spanSubtype string, count int
},
Samples: map[string]model.Metric{
"span.self_time.count": {Value: float64(count)},
"span.self_time.sum.us": {Value: sum.Seconds() * 1000000},
"span.self_time.sum.us": {Value: float64(sum) / 1e3},
},
}
}
Expand Down
49 changes: 49 additions & 0 deletions model/marshal_fastjson.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions model/marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ func TestMarshalTransaction(t *testing.T) {
"started": float64(99),
"dropped": float64(4),
},
"dropped_spans_stats": []interface{}{
map[string]interface{}{
"destination_service_resource": "http://elasticsearch:9200",
"duration": map[string]interface{}{
"count": float64(4),
"sum": map[string]interface{}{
"us": float64(1000),
},
},
"outcome": "success",
},
},
}
assert.Equal(t, expect, decoded)
}
Expand Down Expand Up @@ -625,6 +637,16 @@ func fakeTransaction() model.Transaction {
Started: 99,
Dropped: 4,
},
DroppedSpansStats: []model.DroppedSpansStats{
{
DestinationServiceResource: "http://elasticsearch:9200",
Outcome: "success",
Duration: model.AggregateDuration{
Count: 4,
Sum: model.DurationSum{Us: int64(time.Millisecond) / 1e3},
},
},
},
}
}

Expand Down
32 changes: 32 additions & 0 deletions model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@ type Transaction struct {
// SpanCount holds statistics on spans within a transaction.
SpanCount SpanCount `json:"span_count"`

// DroppedSpansStats holds information about spans that were dropped
// (for example due to transaction_max_spans or exit_span_min_duration).
DroppedSpansStats []DroppedSpansStats `json:"dropped_spans_stats,omitempty"`

// Outcome holds the transaction outcome: success, failure, or unknown.
Outcome string `json:"outcome,omitempty"`
}
Expand All @@ -284,6 +288,34 @@ type SpanCount struct {
Started int `json:"started"`
}

// DroppedSpansStats holds information about spans that were dropped
// (for example due to transaction_max_spans or exit_span_min_duration).
type DroppedSpansStats struct {
// DestinationServiceResource identifies the destination service resource
// being operated on. e.g. 'http://elastic.co:80', 'elasticsearch', 'rabbitmq/queue_name'.
DestinationServiceResource string `json:"destination_service_resource"`
// Outcome of the span: success, failure, or unknown. Outcome may be one of
// a limited set of permitted values describing the success or failure of
// the span. It can be used for calculating error rates for outgoing requests.
Outcome string `json:"outcome"`
// Duration holds duration aggregations about the dropped span.
Duration AggregateDuration `json:"duration"`
}

// AggregateDuration duration
type AggregateDuration struct {
// Count holds the number of times the dropped span happened.
Count int `json:"count"`
// Sum holds dimensions about the dropped span's duration.
Sum DurationSum `json:"sum"`
}

// DurationSum contains units for duration
type DurationSum struct {
// Sum of the duration of a span in Microseconds.
Us int64 `json:"us"`
}

// Span represents a span within a transaction.
type Span struct {
// Name holds the name of the span.
Expand Down
25 changes: 25 additions & 0 deletions modelwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package apm // import "go.elastic.co/apm"

import (
"time"

"go.elastic.co/apm/internal/ringbuffer"
"go.elastic.co/apm/model"
"go.elastic.co/apm/stacktrace"
Expand Down Expand Up @@ -122,6 +124,10 @@ func (w *modelWriter) buildModelTransaction(out *model.Transaction, tx *Transact
out.Duration = td.Duration.Seconds() * 1000
out.SpanCount.Started = td.spansCreated
out.SpanCount.Dropped = td.spansDropped
if dss := buildDroppedSpansStats(td.droppedSpansStats); len(dss) > 0 {
out.DroppedSpansStats = dss
}

if sampled {
out.Context = td.Context.build()
}
Expand Down Expand Up @@ -273,3 +279,22 @@ func normalizeOutcome(outcome string) string {
return "unknown"
}
}

func buildDroppedSpansStats(dss droppedSpanTimingsMap) []model.DroppedSpansStats {
out := make([]model.DroppedSpansStats, 0, len(dss))
for k, timing := range dss {
out = append(out, model.DroppedSpansStats{
DestinationServiceResource: k.destination,
Outcome: k.outcome,
Duration: model.AggregateDuration{
Count: int(timing.count),
Sum: model.DurationSum{
// The internal representation of spanTimingsMap is in time.Nanosecond
// unit which we need to convert to us.
Us: timing.duration / int64(time.Microsecond),
},
},
})
}
return out
}
48 changes: 41 additions & 7 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,13 @@ func (tx *Transaction) StartSpanOptions(name, spanType string, opts SpanOptions)
// Guard access to spansCreated, spansDropped, rand, and childrenTimer.
tx.TransactionData.mu.Lock()
defer tx.TransactionData.mu.Unlock()
if !span.traceContext.Options.Recorded() {
span.tracer = nil // span is dropped
} else if tx.maxSpans >= 0 && tx.spansCreated >= tx.maxSpans {
span.tracer = nil // span is dropped

notRecorded := !span.traceContext.Options.Recorded()
exceedsMaxSpans := tx.maxSpans >= 0 && tx.spansCreated >= tx.maxSpans
// Drop span when it is not recorded.
if span.dropWhen(notRecorded) {
// nothing to do here since it isn't recorded.
} else if span.dropWhen(exceedsMaxSpans) {
tx.spansDropped++
} else {
if opts.SpanID.Validate() == nil {
Expand Down Expand Up @@ -299,6 +302,21 @@ func (s *Span) dropped() bool {
return s.tracer == nil
}

// dropWhen unsets the tracer when the passed bool cond is `true` and returns
// `true` only when the span is dropped. If the span has already been dropped
// or the condition isn't `true`, it then returns `false`.
//
// Must be called with s.mu.Lock held to be able to write to s.tracer.
func (s *Span) dropWhen(cond bool) bool {
if s.Dropped() {
return false
}
if cond {
s.tracer = nil
}
return cond
}

// End marks the s as being complete; s must not be used after this.
//
// If s.Duration has not been set, End will set it to the elapsed time
Expand All @@ -321,11 +339,12 @@ func (s *Span) End() {
s.Outcome = s.Context.outcome()
}
if s.dropped() {
if s.tx == nil {
droppedSpanDataPool.Put(s.SpanData)
} else {
if s.tx != nil {
s.reportSelfTime()
s.aggregateDroppedSpanStats()
s.reset(s.tx.tracer)
} else {
droppedSpanDataPool.Put(s.SpanData)
}
s.SpanData = nil
return
Expand Down Expand Up @@ -419,6 +438,21 @@ func (s *Span) IsExitSpan() bool {
return s.exit
}

// aggregateDroppedSpanStats aggregates the current span into the transaction
// dropped spans stats timings.
//
// Must only be called from End() with s.tx.mu held.
func (s *Span) aggregateDroppedSpanStats() {
// An exit span would have the destination service set but in any case, we
// check the field value before adding an entry to the dropped spans stats.
service := s.Context.destinationService.Resource
if s.dropped() && s.IsExitSpan() && service != "" {
s.tx.TransactionData.mu.Lock()
s.tx.droppedSpansStats.add(service, s.Outcome, s.Duration)
s.tx.TransactionData.mu.Unlock()
}
}

// SpanData holds the details for a span, and is embedded inside Span.
// When a span is ended or discarded, its SpanData field will be set
// to nil.
Expand Down
59 changes: 48 additions & 11 deletions transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ import (
"time"
)

const (
// maxDroppedSpanStats sets the hard limit for the number of dropped span
// stats that are stored in a transaction.
maxDroppedSpanStats = 128
)

// StartTransaction returns a new Transaction with the specified
// name and type, and with the start time set to the current time.
// This is equivalent to calling StartTransactionOptions with a
Expand All @@ -43,7 +49,8 @@ func (t *Tracer) StartTransactionOptions(name, transactionType string, opts Tran
Context: Context{
captureBodyMask: CaptureBodyTransactions,
},
spanTimings: make(spanTimingsMap),
spanTimings: make(spanTimingsMap),
droppedSpansStats: make(droppedSpanTimingsMap, maxDroppedSpanStats),
}
var seed int64
if err := binary.Read(cryptorand.Reader, binary.LittleEndian, &seed); err != nil {
Expand Down Expand Up @@ -352,24 +359,54 @@ type TransactionData struct {
propagateLegacyHeader bool
timestamp time.Time

mu sync.Mutex
spansCreated int
spansDropped int
childrenTimer childrenTimer
spanTimings spanTimingsMap
rand *rand.Rand // for ID generation
mu sync.Mutex
spansCreated int
spansDropped int
childrenTimer childrenTimer
spanTimings spanTimingsMap
droppedSpansStats droppedSpanTimingsMap
rand *rand.Rand // for ID generation
}

// reset resets the TransactionData back to its zero state and places it back
// into the transaction pool.
func (td *TransactionData) reset(tracer *Tracer) {
*td = TransactionData{
Context: td.Context,
Duration: -1,
rand: td.rand,
spanTimings: td.spanTimings,
Context: td.Context,
Duration: -1,
rand: td.rand,
spanTimings: td.spanTimings,
droppedSpansStats: td.droppedSpansStats,
}
td.Context.reset()
td.spanTimings.reset()
td.droppedSpansStats.reset()
tracer.transactionDataPool.Put(td)
}

type droppedSpanTimingsKey struct {
destination string
outcome string
}

// droppedSpanTimingsMap records span timings for groups of dropped spans.
type droppedSpanTimingsMap map[droppedSpanTimingsKey]spanTiming

// add accumulates the timing for a {destination, outcome} pair, silently drops
// any pairs that would cause the map to exceed the maxDroppedSpanStats.
func (m droppedSpanTimingsMap) add(destination, outcome string, d time.Duration) {
k := droppedSpanTimingsKey{destination: destination, outcome: outcome}
timing, ok := m[k]
if ok || maxDroppedSpanStats > len(m) {
timing.count++
timing.duration += int64(d)
m[k] = timing
}
}

// reset resets m back to its initial zero state.
func (m droppedSpanTimingsMap) reset() {
for k := range m {
delete(m, k)
}
}
Loading

0 comments on commit 36bdfef

Please sign in to comment.