diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 879f22b3f..3e2984aea 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -25,6 +25,7 @@ https://github.com/elastic/apm-agent-go/compare/v1.14.0...master[View commits] - Deprecate `http.request.socket.encrypted` and stop recording it in `module/apmhttp`, `module/apmgrpc` and `module/apmfiber`. {pull}1129[#(1129)] - Collect and send span destination service timing statistics about the dropped spans to the apm-server. {pull}1132[#(1132)] +- Experimental support to compress short exit spans into a composite span. Disabled by default. {pull}1134[#(1134)] [[release-notes-1.x]] === Go Agent version 1.x diff --git a/apmtest/debug.go b/apmtest/debug.go new file mode 100644 index 000000000..cf9723397 --- /dev/null +++ b/apmtest/debug.go @@ -0,0 +1,161 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package apmtest // import "go.elastic.co/apm/apmtest" + +import ( + "bytes" + "fmt" + "io" + "sort" + "text/tabwriter" + "time" + "unicode/utf8" + + "go.elastic.co/apm/internal/apmmath" + "go.elastic.co/apm/model" +) + +// WriteTraceTable displays the trace as a table which can be used on tests to aid +// debugging. +func WriteTraceTable(writer io.Writer, tx model.Transaction, spans []model.Span) { + w := tabwriter.NewWriter(writer, 2, 4, 2, ' ', tabwriter.TabIndent) + fmt.Fprintln(w, "#\tNAME\tTYPE\tCOMP\tN\tDURATION\tOFFSET\tSPAN ID\tPARENT ID\tTRACE ID") + + fmt.Fprintf(w, "TX\t%s\t%s\t-\t-\t%f\t%d\t%x\t%x\t%x\n", tx.Name, + tx.Type, tx.Duration, + 0, + tx.ID, tx.ParentID, tx.TraceID, + ) + + sort.SliceStable(spans, func(i, j int) bool { + return time.Time(spans[i].Timestamp).Before(time.Time(spans[j].Timestamp)) + }) + for i, span := range spans { + count := 1 + if span.Composite != nil { + count = span.Composite.Count + } + + fmt.Fprintf(w, "%d\t%s\t%s\t%v\t%d\t%f\t+%d\t%x\t%x\t%x\n", i, span.Name, + span.Type, span.Composite != nil, count, span.Duration, + time.Time(span.Timestamp).Sub(time.Time(tx.Timestamp))/1e3, + span.ID, span.ParentID, span.TraceID, + ) + } + w.Flush() +} + +// WriteTraceWaterfall the trace waterfall "console output" to the specified +// writer sorted by timestamp. +func WriteTraceWaterfall(w io.Writer, tx model.Transaction, spans []model.Span) { + maxDuration := time.Duration(tx.Duration * float64(time.Millisecond)) + if maxDuration == 0 { + for _, span := range spans { + maxDuration += time.Duration(span.Duration * float64(time.Millisecond)) + } + } + + maxWidth := int64(72) + buf := new(bytes.Buffer) + if tx.Duration > 0.0 { + writeSpan(buf, int(maxWidth), 0, fmt.Sprintf("transaction (%x) - %s", tx.ID, maxDuration.String())) + } + + sort.SliceStable(spans, func(i, j int) bool { + return time.Time(spans[i].Timestamp).Before(time.Time(spans[j].Timestamp)) + }) + + for _, span := range spans { + pos := int(apmmath.Round( + float64(time.Time(span.Timestamp).Sub(time.Time(tx.Timestamp))) / + float64(maxDuration) * float64(maxWidth), + )) + tDur := time.Duration(span.Duration * float64(time.Millisecond)) + dur := float64(tDur) / float64(maxDuration) + width := int(apmmath.Round(dur * float64(maxWidth))) + if width == int(maxWidth) { + width = int(maxWidth) - 1 + } + + spancontent := fmt.Sprintf("%s %s - %s", + span.Type, span.Name, + time.Duration(span.Duration*float64(time.Millisecond)).String(), + ) + if span.Composite != nil { + spancontent = fmt.Sprintf("%d %s - %s", + span.Composite.Count, span.Name, + time.Duration(span.Duration*float64(time.Millisecond)).String(), + ) + } + writeSpan(buf, width, pos, spancontent) + } + + io.Copy(w, buf) +} + +func writeSpan(buf *bytes.Buffer, width, pos int, content string) { + spaceRune := ' ' + fillRune := '_' + startRune := '|' + endRune := '|' + + // Prevent the spans from going out of bounds. + if pos == width { + pos = pos - 2 + } else if pos >= width { + pos = pos - 1 + } + + for i := 0; i < int(pos); i++ { + buf.WriteRune(spaceRune) + } + + if width <= 1 { + width = 1 + // Write the first letter of the span type when the width is too small. + startRune, _ = utf8.DecodeRuneInString(content) + } + + var written int + written, _ = buf.WriteRune(startRune) + if len(content) >= int(width)-1 { + content = content[:int(width)-1] + } + + spacing := (width - len(content) - 2) / 2 + for i := 0; i < spacing; i++ { + n, _ := buf.WriteRune(fillRune) + written += n + } + + n, _ := buf.WriteString(content) + written += n + for i := 0; i < spacing; i++ { + n, _ := buf.WriteRune(fillRune) + written += n + } + + if written < width { + buf.WriteRune(fillRune) + } + if width > 1 { + buf.WriteRune(endRune) + } + + buf.WriteString("\n") +} diff --git a/config.go b/config.go index 1881d370c..5419ff799 100644 --- a/config.go +++ b/config.go @@ -63,6 +63,14 @@ const ( envUseElasticTraceparentHeader = "ELASTIC_APM_USE_ELASTIC_TRACEPARENT_HEADER" envCloudProvider = "ELASTIC_APM_CLOUD_PROVIDER" + // NOTE(marclop) Experimental settings + // span_compression (default `false`) + envSpanCompressionEnabled = "ELASTIC_APM_SPAN_COMPRESSION_ENABLED" + // span_compression_exact_match_max_duration (default `50ms`) + envSpanCompressionExactMatchMaxDuration = "ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION" + // span_compression_same_kind_max_duration (default `5ms`) + envSpanCompressionSameKindMaxDuration = "ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION" + // NOTE(axw) profiling environment variables are experimental. // They may be removed in a future minor version without being // considered a breaking change. @@ -87,6 +95,11 @@ const ( maxAPIRequestSize = 5 * configutil.MByte minMetricsBufferSize = 10 * configutil.KByte maxMetricsBufferSize = 100 * configutil.MByte + + // Experimental Span Compressions default setting values + defaultSpanCompressionEnabled = false + defaultSpanCompressionExactMatchMaxDuration = 50 * time.Millisecond + defaultSpanCompressionSameKindMaxDuration = 5 * time.Millisecond ) var ( @@ -298,6 +311,26 @@ func initialUseElasticTraceparentHeader() (bool, error) { return configutil.ParseBoolEnv(envUseElasticTraceparentHeader, true) } +func initialSpanCompressionEnabled() (bool, error) { + return configutil.ParseBoolEnv(envSpanCompressionEnabled, + defaultSpanCompressionEnabled, + ) +} + +func initialSpanCompressionExactMatchMaxDuration() (time.Duration, error) { + return configutil.ParseDurationEnv( + envSpanCompressionExactMatchMaxDuration, + defaultSpanCompressionExactMatchMaxDuration, + ) +} + +func initialSpanCompressionSameKindMaxDuration() (time.Duration, error) { + return configutil.ParseDurationEnv( + envSpanCompressionSameKindMaxDuration, + defaultSpanCompressionSameKindMaxDuration, + ) +} + func initialCPUProfileIntervalDuration() (time.Duration, time.Duration, error) { interval, err := configutil.ParseDurationEnv(envCPUProfileInterval, 0) if err != nil || interval <= 0 { @@ -430,6 +463,36 @@ func (t *Tracer) updateRemoteConfig(logger WarningLogger, old, attrs map[string] delete(attrs, k) continue } + case envSpanCompressionEnabled: + val, err := strconv.ParseBool(v) + if err != nil { + errorf("central config failure: failed to parse %s: %s", k, err) + delete(attrs, k) + continue + } + updates = append(updates, func(cfg *instrumentationConfig) { + cfg.compressionOptions.enabled = val + }) + case envSpanCompressionExactMatchMaxDuration: + duration, err := configutil.ParseDuration(v) + if err != nil { + errorf("central config failure: failed to parse %s: %s", k, err) + delete(attrs, k) + continue + } + updates = append(updates, func(cfg *instrumentationConfig) { + cfg.compressionOptions.exactMatchMaxDuration = duration + }) + case envSpanCompressionSameKindMaxDuration: + duration, err := configutil.ParseDuration(v) + if err != nil { + errorf("central config failure: failed to parse %s: %s", k, err) + delete(attrs, k) + continue + } + updates = append(updates, func(cfg *instrumentationConfig) { + cfg.compressionOptions.sameKindMaxDuration = duration + }) default: warningf("central config failure: unsupported config: %s", k) delete(attrs, k) @@ -532,4 +595,5 @@ type instrumentationConfigValues struct { propagateLegacyHeader bool sanitizedFieldNames wildcard.Matchers ignoreTransactionURLs wildcard.Matchers + compressionOptions compressionOptions } diff --git a/config_test.go b/config_test.go index 490a453b1..e828652fb 100644 --- a/config_test.go +++ b/config_test.go @@ -20,6 +20,7 @@ package apm_test import ( "context" "encoding/json" + "fmt" "io/ioutil" "net/http" "net/http/httptest" @@ -127,6 +128,57 @@ func TestTracerCentralConfigUpdate(t *testing.T) { require.NoError(t, err) return tracer.IgnoredTransactionURL(u) }) + run("span_compression_enabled", "true", func(tracer *apmtest.RecordingTracer) bool { + tracer.ResetPayloads() + tx := tracer.StartTransaction("name", "type") + exitSpanOpts := apm.SpanOptions{ExitSpan: true} + for i := 0; i < 2; i++ { + span := tx.StartSpanOptions("name", "request", exitSpanOpts) + span.Duration = 50 * time.Millisecond + span.End() + } + tx.End() + tracer.Flush(nil) + return len(tracer.Payloads().Spans) == 1 + }) + run("span_compression_exact_match_max_duration", "100ms", func(tracer *apmtest.RecordingTracer) bool { + tracer.ResetPayloads() + tracer.SetSpanCompressionEnabled(true) + defer tracer.SetSpanCompressionEnabled(false) + tx := tracer.StartTransaction("name", "type") + exitSpanOpts := apm.SpanOptions{ExitSpan: true} + for i := 0; i < 2; i++ { + span := tx.StartSpanOptions("name", "request", exitSpanOpts) + span.Duration = 100 * time.Millisecond + span.End() + } + // Third span does not get compressed since it exceeds the max duration. + span := tx.StartSpanOptions("name", "request", exitSpanOpts) + span.Duration = 101 * time.Millisecond + span.End() + tx.End() + tracer.Flush(nil) + return len(tracer.Payloads().Spans) == 2 + }) + run("span_compression_same_kind_max_duration", "10ms", func(tracer *apmtest.RecordingTracer) bool { + tracer.ResetPayloads() + tracer.SetSpanCompressionEnabled(true) + defer tracer.SetSpanCompressionEnabled(false) + tx := tracer.StartTransaction("name", "type") + exitSpanOpts := apm.SpanOptions{ExitSpan: true} + for i := 0; i < 2; i++ { + span := tx.StartSpanOptions(fmt.Sprint(i), "request", exitSpanOpts) + span.Duration = 10 * time.Millisecond + span.End() + } + // Third span does not get compressed since it exceeds the max duration. + span := tx.StartSpanOptions("name", "request", exitSpanOpts) + span.Duration = 11 * time.Millisecond + span.End() + tx.End() + tracer.Flush(nil) + return len(tracer.Payloads().Spans) == 2 + }) } func testTracerCentralConfigUpdate(t *testing.T, logger apm.Logger, serverResponse string, isRemote func(*apmtest.RecordingTracer) bool) { diff --git a/docs/configuration.asciidoc b/docs/configuration.asciidoc index ed9ae728a..770db7de0 100644 --- a/docs/configuration.asciidoc +++ b/docs/configuration.asciidoc @@ -606,3 +606,68 @@ automatically collect the cloud metadata. Valid options are `"none"`, `"auto"`, `"aws"`, `"gcp"`, and `"azure"` If this config value is set to `"none"`, then no cloud metadata will be collected. + +[float] +[[config-span-compression-enabled]] +=== `ELASTIC_APM_SPAN_COMPRESSION_ENABLED` + +<> + +[options="header"] +|============ +| Environment | Default +| `ELASTIC_APM_SPAN_COMPRESSION_ENABLED` | `false` +|============ + +When enabled, the agent will attempt to compress _short_ exit spans that share the +same parent into a composite span. The exact duration for what is considered +_short_, depends on the compression strategy used (`same_kind` or `exact_match`). + +In order for a span to be compressible, these conditions need to be met: + +* Spans are exit spans. +* Spans are siblings (share the same parent). +* Spans have not propagated their context downstream. +* Each span duration is equal or lower to the compression strategy maximum duration. +* Spans are compressed with `same_kind` strategy when these attributes are equal: +** `span.type`. +** `span.subtype`. +** `span.context.destination.service.resource` +* Spans are compressed with `exact_match` strategy when all the previous conditions +are met and the `span.name` is equal. + +Compressing short exit spans should provide some storage savings for services that +create a lot of consecutive short exit spans to for example databases or cache +services which are generally uninteresting when viewing a trace. + +experimental::["This feature is experimental and requires APM Server v7.15 or later."] + +[float] +[[config-span-compression-exact-match-duration]] +=== `ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION` + +<> + +[options="header"] +|============ +| Environment | Default +| `ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION` | `50ms` +|============ + +The maximum duration to consider for compressing sibling exit spans that are an +exact match for compression. + +[float] +[[config-span-compression-same-kind-duration]] +=== `ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION` + +<> + +[options="header"] +|============ +| Environment | Default +| `ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION` | `5ms` +|============ + +The maximum duration to consider for compressing sibling exit spans that are of +the same kind for compression. diff --git a/utils_go10.go b/internal/apmmath/round_go10.go similarity index 88% rename from utils_go10.go rename to internal/apmmath/round_go10.go index 1027442fa..9cacc1803 100644 --- a/utils_go10.go +++ b/internal/apmmath/round_go10.go @@ -18,10 +18,11 @@ //go:build go1.10 // +build go1.10 -package apm // import "go.elastic.co/apm" +package apmmath import "math" -func round(x float64) float64 { +// Round is the current math.Round implementation for >= Go1.10. +func Round(x float64) float64 { return math.Round(x) } diff --git a/utils_go9.go b/internal/apmmath/round_go9.go similarity index 89% rename from utils_go9.go rename to internal/apmmath/round_go9.go index b5a109e24..111317d7f 100644 --- a/utils_go9.go +++ b/internal/apmmath/round_go9.go @@ -18,14 +18,14 @@ //go:build !go1.10 // +build !go1.10 -package apm // import "go.elastic.co/apm" +package apmmath import "math" -// Implementation of math.Round for Go < 1.10. +// Round Implementation of math.Round for Go < 1.10. // // Code shamelessly copied from pkg/math. -func round(x float64) float64 { +func Round(x float64) float64 { t := math.Trunc(x) if math.Abs(x-t) >= 0.5 { return t + math.Copysign(1, x) diff --git a/internal/apmstrings/concat10.go b/internal/apmstrings/concat10.go new file mode 100644 index 000000000..8f6105402 --- /dev/null +++ b/internal/apmstrings/concat10.go @@ -0,0 +1,38 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build go1.10 +// +build go1.10 + +package apmstrings + +import "strings" + +// Concat concatenates all the string arguments efficiently. +func Concat(elements ...string) string { + var builder strings.Builder + var length int + for i := range elements { + length += len(elements[i]) + } + builder.Grow(length) + + for i := range elements { + builder.WriteString(elements[i]) + } + return builder.String() +} diff --git a/internal/apmstrings/concat10_test.go b/internal/apmstrings/concat10_test.go new file mode 100644 index 000000000..ebb719830 --- /dev/null +++ b/internal/apmstrings/concat10_test.go @@ -0,0 +1,60 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build go1.10 +// +build go1.10 + +package apmstrings + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestConcat(t *testing.T) { + type args struct { + elements []string + } + tests := []struct { + expect string + elements []string + }{ + {elements: []string{"a", "b", "c"}, expect: "abc"}, + {elements: []string{"Calls to ", "redis"}, expect: "Calls to redis"}, + } + for i, test := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + assert.Equal(t, test.expect, Concat(test.elements...), i) + }) + } +} + +var benchRes string + +func BenchmarkConcat(b *testing.B) { + elements := map[bool][]string{ + true: {"Calls to ", "redis"}, + false: {"Calls to ", "verylongservicenamewegothere"}, + } + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + benchRes = Concat(elements[i%2 == 0]...) + } +} diff --git a/internal/apmstrings/concat9.go b/internal/apmstrings/concat9.go new file mode 100644 index 000000000..74f288657 --- /dev/null +++ b/internal/apmstrings/concat9.go @@ -0,0 +1,41 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build !go1.10 +// +build !go1.10 + +package apmstrings + +import ( + "bytes" +) + +// Concat concatenates all the string arguments efficiently. +func Concat(elements ...string) string { + var buf bytes.Buffer + var length int + for i := range elements { + length += len(elements[i]) + } + buf.Grow(length) + + for i := range elements { + buf.WriteString(elements[i]) + } + return buf.String() + +} diff --git a/model/marshal_fastjson.go b/model/marshal_fastjson.go index c7996faad..c1c734a6a 100644 --- a/model/marshal_fastjson.go +++ b/model/marshal_fastjson.go @@ -622,6 +622,12 @@ func (v *Span) MarshalFastJSON(w *fastjson.Writer) error { w.RawString(",\"action\":") w.String(v.Action) } + if v.Composite != nil { + w.RawString(",\"composite\":") + if err := v.Composite.MarshalFastJSON(w); err != nil && firstErr == nil { + firstErr = err + } + } if v.Context != nil { w.RawString(",\"context\":") if err := v.Context.MarshalFastJSON(w); err != nil && firstErr == nil { @@ -895,6 +901,18 @@ func (v *DatabaseSpanContext) MarshalFastJSON(w *fastjson.Writer) error { return nil } +func (v *CompositeSpan) MarshalFastJSON(w *fastjson.Writer) error { + w.RawByte('{') + w.RawString("\"compression_strategy\":") + w.String(v.CompressionStrategy) + w.RawString(",\"count\":") + w.Int64(int64(v.Count)) + w.RawString(",\"sum\":") + w.Float64(v.Sum) + w.RawByte('}') + return nil +} + func (v *Context) MarshalFastJSON(w *fastjson.Writer) error { var firstErr error w.RawByte('{') diff --git a/model/model.go b/model/model.go index 66079f496..821a2b9eb 100644 --- a/model/model.go +++ b/model/model.go @@ -362,6 +362,10 @@ type Span struct { // Outcome holds the span outcome: success, failure, or unknown. Outcome string `json:"outcome,omitempty"` + + // Composite is set when the span is a composite span and represents an + // aggregated set of spans as defined by `composite.compression_strategy`. + Composite *CompositeSpan `json:"composite,omitempty"` } // SpanContext holds contextual information relating to the span. @@ -464,6 +468,19 @@ type HTTPSpanContext struct { StatusCode int `json:"status_code,omitempty"` } +// CompositeSpan holds details on a group of spans represented by a single one. +type CompositeSpan struct { + // Count is the number of compressed spans the composite span represents. + // The minimum count is 2, as a composite span represents at least two spans. + Count int `json:"count"` + // Sum is the durations of all compressed spans this composite span + // represents in milliseconds. + Sum float64 `json:"sum"` + // A string value indicating which compression strategy was used. The valid + // values are `exact_match` and `same_kind`. + CompressionStrategy string `json:"compression_strategy"` +} + // Context holds contextual information relating to a transaction or error. type Context struct { // Custom holds custom context relating to the transaction or error. diff --git a/modelwriter.go b/modelwriter.go index db3f38c58..91073fec2 100644 --- a/modelwriter.go +++ b/modelwriter.go @@ -151,6 +151,9 @@ func (w *modelWriter) buildModelSpan(out *model.Span, span *Span, sd *SpanData) out.Duration = sd.Duration.Seconds() * 1000 out.Outcome = normalizeOutcome(sd.Outcome) out.Context = sd.Context.build() + if sd.composite.count > 1 { + out.Composite = sd.composite.build() + } // Copy the span type to context.destination.service.type. if out.Context != nil && out.Context.Destination != nil && out.Context.Destination.Service != nil { diff --git a/sampler.go b/sampler.go index 4a3d789d6..25f02c84e 100644 --- a/sampler.go +++ b/sampler.go @@ -23,6 +23,8 @@ import ( "math/big" "github.com/pkg/errors" + + "go.elastic.co/apm/internal/apmmath" ) // Sampler provides a means of sampling transactions. @@ -126,5 +128,5 @@ func roundSampleRate(r float64) float64 { if r > 0 && r < 0.0001 { r = 0.0001 } - return round(r*10000) / 10000 + return apmmath.Round(r*10000) / 10000 } diff --git a/span.go b/span.go index dc7acadbe..0d2011ae9 100644 --- a/span.go +++ b/span.go @@ -22,6 +22,7 @@ import ( "encoding/binary" "strings" "sync" + "sync/atomic" "time" "go.elastic.co/apm/stacktrace" @@ -124,6 +125,7 @@ func (tx *Transaction) StartSpanOptions(name, spanType string, opts SpanOptions) } span.stackFramesMinDuration = tx.spanFramesMinDuration span.stackTraceLimit = tx.stackTraceLimit + span.compressedSpan.options = tx.compressedSpan.options tx.spansCreated++ } @@ -175,6 +177,7 @@ func (t *Tracer) StartSpan(name, spanType string, transactionID SpanID, opts Spa instrumentationConfig := t.instrumentationConfig() span.stackFramesMinDuration = instrumentationConfig.spanFramesMinDuration span.stackTraceLimit = instrumentationConfig.stackTraceLimit + span.compressedSpan.options = instrumentationConfig.compressionOptions if opts.ExitSpan { span.exit = true } @@ -257,6 +260,9 @@ type Span struct { parentID SpanID exit bool + // ctxPropagated is set to 1 when the traceContext is propagated downstream. + ctxPropagated uint32 + mu sync.RWMutex // SpanData holds the span data. This field is set to nil when @@ -269,6 +275,7 @@ func (s *Span) TraceContext() TraceContext { if s == nil { return TraceContext{} } + atomic.StoreUint32(&s.ctxPropagated, 1) return s.traceContext } @@ -352,9 +359,29 @@ func (s *Span) End() { if len(s.stacktrace) == 0 && s.Duration >= s.stackFramesMinDuration { s.setStacktrace(1) } + if s.tx != nil { s.reportSelfTime() } + + s.end() +} + +// end represents a subset of the public `s.End()` API and will only attempt +// to compress the span if it's compressable, or enqueue it in case it's not. +// +// end must only be called from `s.End()` and `tx.End()`. +func (s *Span) end() { + evictedSpan, cached := s.attemptCompress() + if evictedSpan != nil { + evictedSpan.end() + } + if cached { + // s has been cached for potential compression, and will be enqueued + // by a future call to attemptCompress on a sibling span, or when the + // parent is ended. + return + } s.enqueue() s.SpanData = nil } @@ -461,6 +488,8 @@ type SpanData struct { stackTraceLimit int timestamp time.Time childrenTimer childrenTimer + composite compositeSpan + compressedSpan compressedSpan // Name holds the span name, initialized with the value passed to StartSpan. Name string diff --git a/span_compressed.go b/span_compressed.go new file mode 100644 index 000000000..90f2ece1c --- /dev/null +++ b/span_compressed.go @@ -0,0 +1,319 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package apm // import "go.elastic.co/apm" + +import ( + "sync/atomic" + "time" + + "go.elastic.co/apm/internal/apmstrings" + "go.elastic.co/apm/model" +) + +const ( + _ int = iota + compressedStrategyExactMatch + compressedStrategySameKind +) + +const ( + compressedSpanSameKindName = "Calls to " +) + +type compositeSpan struct { + lastSiblingEndTime time.Time + // this internal representation should be set in Nanoseconds, although + // the model unit is set in Milliseconds. + sum time.Duration + count int + compressionStrategy int +} + +func (cs compositeSpan) build() *model.CompositeSpan { + var out model.CompositeSpan + switch cs.compressionStrategy { + case compressedStrategyExactMatch: + out.CompressionStrategy = "exact_match" + case compressedStrategySameKind: + out.CompressionStrategy = "same_kind" + } + out.Count = cs.count + out.Sum = float64(cs.sum) / float64(time.Millisecond) + return &out +} + +func (cs compositeSpan) empty() bool { + return cs.count < 1 +} + +// A span is eligible for compression if all the following conditions are met +// 1. It's an exit span +// 2. The trace context has not been propagated to a downstream service +// 3. If the span has outcome (i.e., outcome is present and it's not null) then +// it should be success. It means spans with outcome indicating an issue of +// potential interest should not be compressed. +// The second condition is important so that we don't remove (compress) a span +// that may be the parent of a downstream service. This would orphan the sub- +// graph started by the downstream service and cause it to not appear in the +// waterfall view. +func (s *Span) compress(sibling *Span) bool { + // If the spans aren't siblings, we cannot compress them. + if s.parentID != sibling.parentID { + return false + } + + strategy := s.canCompressComposite(sibling) + if strategy == 0 { + strategy = s.canCompressStandard(sibling) + } + + // If the span cannot be compressed using any strategy. + if strategy == 0 { + return false + } + + if s.composite.empty() { + s.composite = compositeSpan{ + count: 1, + sum: s.Duration, + compressionStrategy: strategy, + } + } + + s.composite.count++ + s.composite.sum += sibling.Duration + siblingTimestamp := sibling.timestamp.Add(sibling.Duration) + if siblingTimestamp.After(s.composite.lastSiblingEndTime) { + s.composite.lastSiblingEndTime = siblingTimestamp + } + return true +} + +// +// Span // +// + +// attemptCompress tries to compress a span into a "composite span" when: +// * Compression is enabled on agent. +// * The cached span and the incoming span: +// * Share the same parent (are siblings). +// * Are consecutive spans. +// * Are both exit spans, outcome == success and are short enough (See +// `ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION` and +// `ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION` for more info). +// * Represent the same exact operation or the same kind of operation: +// * Are an exact match (same name, kind and destination service). +// OR +// * Are the same kind match (same kind and destination service). +// When a span has already been compressed using a particular strategy, it +// CANNOT continue to compress spans using a different strategy. +// The compression algorithm is fairly simple and only compresses spans into a +// composite span when the conditions listed above are met for all consecutive +// spans, at any point any span that doesn't meet the conditions, will cause +// the cache be evicted and the cached span will be returned. +// * When the incoming span is compressible, it will replace the cached span. +// * When the incoming span is not compressible, it will be enqueued as well. +// +// Returns `true` when the span has been cached, thus the caller should not +// enqueue the span. When `false` is returned, the cache is evicted and the +// caller should enqueue the span. +// +// It needs to be called with s.mu held, and will attempt to hold s.parent.mu +// when not nil or s.tx.mu and s.tx.TransactionData.mu when s.parent is nil. +func (s *Span) attemptCompress() (*Span, bool) { + // If the span has already been evicted from the cache, ask the caller to + // end it. + if !s.compressedSpan.options.enabled { + return nil, false + } + + // When a parent span ends, flush its cache. + if cache := s.compressedSpan.evict(); cache != nil { + return cache, false + } + + // There are two distinct places where the span can be cached; the parent + // span and the transaction. The algorithm prefers storing the cached spans + // in its parent, and if nil, it will use the transaction's cache. + if s.parent != nil { + s.parent.mu.Lock() + defer s.parent.mu.Unlock() + if !s.parent.ended() { + return s.parent.compressedSpan.compressOrEvictCache(s) + } + return nil, false + } + + if s.tx != nil { + s.tx.mu.RLock() + defer s.tx.mu.RUnlock() + if !s.tx.ended() { + s.tx.TransactionData.mu.Lock() + defer s.tx.TransactionData.mu.Unlock() + return s.tx.compressedSpan.compressOrEvictCache(s) + } + } + return nil, false +} + +func (s *Span) isCompressionEligible() bool { + if s == nil { + return false + } + ctxPropagated := atomic.LoadUint32(&s.ctxPropagated) == 1 + return s.exit && !ctxPropagated && + (s.Outcome == "" || s.Outcome == "success") +} + +func (s *Span) canCompressStandard(sibling *Span) int { + if !s.isSameKind(sibling) { + return 0 + } + + // We've already established the spans are the same kind. + strategy := compressedStrategySameKind + maxDuration := s.compressedSpan.options.sameKindMaxDuration + + // If it's an exact match, we then switch the settings + if s.isExactMatch(sibling) { + maxDuration = s.compressedSpan.options.exactMatchMaxDuration + strategy = compressedStrategyExactMatch + } + + // Any spans that go over the maximum duration cannot be compressed. + if !s.durationLowerOrEq(sibling, maxDuration) { + return 0 + } + + // If the composite span already has a compression strategy it differs from + // the chosen strategy, the spans cannot be compressed. + if !s.composite.empty() && s.composite.compressionStrategy != strategy { + return 0 + } + + // Return whichever strategy was chosen. + return strategy +} + +func (s *Span) canCompressComposite(sibling *Span) int { + if s.composite.empty() { + return 0 + } + switch s.composite.compressionStrategy { + case compressedStrategyExactMatch: + if s.isExactMatch(sibling) && s.durationLowerOrEq(sibling, + s.compressedSpan.options.exactMatchMaxDuration, + ) { + return compressedStrategyExactMatch + } + case compressedStrategySameKind: + if s.isSameKind(sibling) && s.durationLowerOrEq(sibling, + s.compressedSpan.options.sameKindMaxDuration, + ) { + return compressedStrategySameKind + } + } + return 0 +} + +func (s *Span) durationLowerOrEq(sibling *Span, max time.Duration) bool { + return s.Duration <= max && sibling.Duration <= max +} + +// +// SpanData // +// + +// isExactMatch is used for compression purposes, two spans are considered an +// exact match if the have the same name and are of the same kind (see +// isSameKind for more details). +func (s *SpanData) isExactMatch(span *Span) bool { + return s.Name == span.Name && s.isSameKind(span) +} + +// isSameKind is used for compression purposes, two spans are considered to be +// of the same kind if they have the same values for type, subtype, and +// `destination.service.resource`. +func (s *SpanData) isSameKind(span *Span) bool { + sameType := s.Type == span.Type + sameSubType := s.Subtype == span.Subtype + dstService := s.Context.destination.Service + otherDstService := span.Context.destination.Service + sameService := dstService != nil && otherDstService != nil && + dstService.Resource == otherDstService.Resource + + return sameType && sameSubType && sameService +} + +// setCompressedSpanName changes the span name to "Calls to " +// for composite spans that are compressed with the `"same_kind"` strategy. +func (s *SpanData) setCompressedSpanName() { + if s.composite.compressionStrategy != compressedStrategySameKind { + return + } + service := s.Context.destinationService.Resource + s.Name = apmstrings.Concat(compressedSpanSameKindName, service) +} + +type compressedSpan struct { + cache *Span + options compressionOptions +} + +// evict resets the cache to nil and returns the cached span after adjusting +// its Name, Duration, and timers. +// +// Should be only be called from Transaction.End() and Span.End(). +func (cs *compressedSpan) evict() *Span { + if cs.cache == nil { + return nil + } + cached := cs.cache + // Disable compression on the evicted span to avoid the span from ending up + // swapping the cache and causing an infinite loop. + cached.compressedSpan.options.enabled = false + cs.cache = nil + // When the span composite is not empty, we need to adjust the duration just + // before it is reported and no more spans will be compressed into the + // composite. If this is done before ending the span, the duration of the span + // could potentially grow over the compressable threshold and result in + // compressable span not being compressed and reported separately. + if !cached.composite.empty() { + cached.Duration = cached.composite.lastSiblingEndTime.Sub(cached.timestamp) + cached.setCompressedSpanName() + } + return cached +} + +func (cs *compressedSpan) compressOrEvictCache(s *Span) (*Span, bool) { + if !s.isCompressionEligible() { + return cs.evict(), false + } + + if cs.cache == nil { + cs.cache = s + return nil, true + } + + var evictedSpan *Span + if !cs.cache.compress(s) { + evictedSpan = cs.evict() + cs.cache = s + } + return evictedSpan, true +} diff --git a/span_test.go b/span_test.go index d1c8da7ee..0396c4f93 100644 --- a/span_test.go +++ b/span_test.go @@ -19,6 +19,10 @@ package apm_test import ( "context" + "fmt" + "os" + "sort" + "sync" "testing" "time" @@ -190,6 +194,743 @@ func TestStartExitSpan(t *testing.T) { assert.True(t, span.IsExitSpan()) } +func TestCompressSpanNonSiblings(t *testing.T) { + // Asserts that non sibling spans are not compressed. + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + + tracer.SetSpanCompressionEnabled(true) + + tx := tracer.StartTransaction("name", "type") + parent := tx.StartSpan("parent", "parent", nil) + + createSpans := []struct { + name, typ string + parent apm.TraceContext + }{ + {name: "not compressed", typ: "internal", parent: parent.TraceContext()}, + {name: "not compressed", typ: "internal", parent: tx.TraceContext()}, + {name: "compressed", typ: "internal", parent: parent.TraceContext()}, + {name: "compressed", typ: "internal", parent: parent.TraceContext()}, + {name: "compressed", typ: "different", parent: tx.TraceContext()}, + {name: "compressed", typ: "different", parent: tx.TraceContext()}, + } + for _, span := range createSpans { + span := tx.StartSpanOptions(span.name, span.typ, apm.SpanOptions{ + ExitSpan: true, Parent: span.parent, + }) + span.Duration = time.Millisecond + span.End() + } + + parent.End() + tx.End() + tracer.Flush(nil) + + spans := tracer.Payloads().Spans + require.Len(t, spans, 5) + + // First two spans should not have been compressed together. + require.Nil(t, spans[0].Composite) + require.Nil(t, spans[1].Composite) + + require.NotNil(t, spans[2].Composite) + require.Equal(t, 2, spans[2].Composite.Count) + require.Equal(t, float64(2), spans[2].Composite.Sum) + require.Equal(t, "exact_match", spans[2].Composite.CompressionStrategy) + + require.NotNil(t, spans[3].Composite) + require.Equal(t, 2, spans[3].Composite.Count) + require.Equal(t, float64(2), spans[3].Composite.Sum) + require.Equal(t, "exact_match", spans[3].Composite.CompressionStrategy) +} + +func TestCompressSpanExactMatch(t *testing.T) { + // Aserts that that span compression works on compressable spans with + // "exact_match" strategy. + tests := []struct { + setup func(t *testing.T) func() + assertFunc func(t *testing.T, tx model.Transaction, spans []model.Span) + name string + compressionEnabled bool + }{ + // |______________transaction (095b51e1b6ca784c) - 2.0013ms_______________| + // m + // m + // m + // m + // m + // m + // m + // m + // m + // m + // |___________________mysql SELECT * FROM users - 2ms____________________| + // r + // i + // r + { + name: "CompressFalse", + compressionEnabled: false, + assertFunc: func(t *testing.T, tx model.Transaction, spans []model.Span) { + require.NotEmpty(t, tx) + require.Equal(t, 14, len(spans)) + for _, span := range spans { + require.Nil(t, span.Composite) + } + }, + }, + // |______________transaction (7d3254511f02b26b) - 2.0013ms_______________| + // 10 + // |___________________mysql SELECT * FROM users - 2ms___________________| + // r + // i + // r + { + name: "CompressTrueSettingTweak", + compressionEnabled: true, + setup: func(*testing.T) func() { + // This setting + envVarName := "ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION" + og := os.Getenv(envVarName) + os.Setenv(envVarName, "1ms") + return func() { os.Setenv(envVarName, og) } + }, + assertFunc: func(t *testing.T, tx model.Transaction, spans []model.Span) { + require.NotNil(t, tx) + require.Equal(t, 5, len(spans)) + composite := spans[0] + require.NotNil(t, composite.Composite) + assert.Equal(t, "exact_match", composite.Composite.CompressionStrategy) + assert.Equal(t, composite.Composite.Count, 10) + assert.Equal(t, 0.001, composite.Composite.Sum) + assert.Equal(t, 0.001, composite.Duration) + + for _, span := range spans[1:] { + require.Nil(t, span.Composite) + } + }, + }, + // |______________transaction (5797fe58c6ccce29) - 2.0013ms_______________| + // |_____________________11 Calls to mysql - 2.001ms______________________| + // r + // i + // r + { + name: "CompressSpanCount4", + compressionEnabled: true, + assertFunc: func(t *testing.T, tx model.Transaction, spans []model.Span) { + require.NotEmpty(t, tx) + var composite = spans[0] + assert.Equal(t, composite.Context.Destination.Service.Resource, "mysql") + + require.NotNil(t, composite.Composite) + assert.Equal(t, composite.Composite.Count, 11) + assert.Equal(t, "exact_match", composite.Composite.CompressionStrategy) + // Sum should be at least the time that each span ran for. The + // model time is in Milliseconds and the span duration should be + // at least 2 Milliseconds + assert.Equal(t, int(composite.Composite.Sum), 2) + assert.Equal(t, int(composite.Duration), 2) + + for _, span := range spans { + if span.Type == "mysql" { + continue + } + assert.Nil(t, span.Composite) + } + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if test.setup != nil { + defer test.setup(t)() + } + + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + tracer.SetSpanCompressionEnabled(test.compressionEnabled) + + // When compression is enabled: + // Compress 10 spans into 1 and add another span with a different type + // [ Transaction ] + // [ mysql (11) ] [ request ] [ internal ] [ request ] + // + txStart := time.Now() + tx := tracer.StartTransactionOptions("name", "type", + apm.TransactionOptions{Start: txStart}, + ) + currentTime := txStart + for i := 0; i < 10; i++ { + span := tx.StartSpanOptions("SELECT * FROM users", "mysql", apm.SpanOptions{ + ExitSpan: true, Start: currentTime, + }) + span.Duration = 100 * time.Nanosecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + // Compressed when the exact_match threshold is >= 2ms. + { + span := tx.StartSpanOptions("SELECT * FROM users", "mysql", apm.SpanOptions{ + ExitSpan: true, Start: currentTime, + }) + span.Duration = 2 * time.Millisecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + + // None of these should be added to the composite. + { + span := tx.StartSpanOptions("GET /", "request", apm.SpanOptions{ + ExitSpan: true, Start: currentTime, + }) + span.Duration = 100 * time.Nanosecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + { + // Not an exit span, should not be compressed + span := tx.StartSpanOptions("calculate complex", "internal", apm.SpanOptions{ + Start: currentTime, + }) + span.Duration = 100 * time.Nanosecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + { + // Exit span, this is a good candidate to be compressed, but + // since it can't be compressed with the last request type ("internal") + span := tx.StartSpanOptions("GET /", "request", apm.SpanOptions{ + ExitSpan: true, Start: currentTime, + }) + span.Duration = 100 * time.Nanosecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + tx.Duration = currentTime.Sub(txStart) + tx.End() + tracer.Flush(nil) + + transaction := tracer.Payloads().Transactions[0] + spans := tracer.Payloads().Spans + defer func() { + if t.Failed() { + apmtest.WriteTraceWaterfall(os.Stdout, transaction, spans) + apmtest.WriteTraceTable(os.Stdout, transaction, spans) + } + }() + + if test.assertFunc != nil { + test.assertFunc(t, transaction, spans) + } + }) + } +} + +func TestCompressSpanSameKind(t *testing.T) { + // Aserts that that span compression works on compressable spans with + // "same_kind" strategy, and that different span types are not compressed. + tests := []struct { + setup func(t *testing.T) func() + assertFunc func(t *testing.T, spans []model.Span) + name string + compressionEnabled bool + }{ + { + name: "DefaultThreshold", + assertFunc: func(t *testing.T, spans []model.Span) { + require.Equal(t, 3, len(spans)) + mysqlSpan := spans[0] + assert.Equal(t, "mysql", mysqlSpan.Context.Destination.Service.Resource) + assert.Nil(t, mysqlSpan.Composite) + + requestSpan := spans[1] + assert.Equal(t, "request", requestSpan.Context.Destination.Service.Resource) + require.NotNil(t, requestSpan.Composite) + assert.Equal(t, 5, requestSpan.Composite.Count) + assert.Equal(t, "same_kind", requestSpan.Composite.CompressionStrategy) + assert.Equal(t, "Calls to request", requestSpan.Name) + // Check that the sum and span duration is at least the duration of the time set. + assert.Equal(t, 0.0005, requestSpan.Composite.Sum, requestSpan.Composite.Sum) + assert.Equal(t, 0.0005, requestSpan.Duration, requestSpan.Duration) + }, + }, + { + name: "10msThreshold", + setup: func(*testing.T) func() { + os.Setenv("ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION", "10ms") + return func() { os.Unsetenv("ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION") } + }, + assertFunc: func(t *testing.T, spans []model.Span) { + require.Equal(t, 2, len(spans)) + + mysqlSpan := spans[0] + assert.Equal(t, mysqlSpan.Context.Destination.Service.Resource, "mysql") + assert.Nil(t, mysqlSpan.Composite) + + requestSpan := spans[1] + assert.Equal(t, requestSpan.Context.Destination.Service.Resource, "request") + assert.NotNil(t, requestSpan.Composite) + assert.Equal(t, 6, requestSpan.Composite.Count) + assert.Equal(t, "Calls to request", requestSpan.Name) + assert.Equal(t, "same_kind", requestSpan.Composite.CompressionStrategy) + // Check that the aggregate sum is at least the duration of the time we + // we waited for. + assert.Greater(t, requestSpan.Composite.Sum, float64(5*100/time.Millisecond)) + + // Check that the total composite span duration is at least 5 milliseconds. + assert.Greater(t, requestSpan.Duration, float64(5*100/time.Millisecond)) + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if test.setup != nil { + defer test.setup(t)() + } + + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + tracer.SetSpanCompressionEnabled(true) + + // Compress 5 spans into 1 and add another span with a different type + // |______________transaction (572da67c206e9996) - 6.0006ms_______________| + // m + // 5 + // |________________________request GET /f - 6ms_________________________| + // + txStart := time.Now() + tx := tracer.StartTransactionOptions("name", "type", + apm.TransactionOptions{Start: txStart}, + ) + currentTime := txStart + + // Span is compressable, but cannot be compressed since the next span + // is not the same kind. It's published. + { + span := tx.StartSpanOptions("SELECT * FROM users", "mysql", apm.SpanOptions{ + ExitSpan: true, Start: currentTime, + }) + span.Duration = 100 * time.Nanosecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + + // These spans should be compressed into 1. + path := []string{"/a", "/b", "/c", "/d", "/e"} + for i := 0; i < 5; i++ { + span := tx.StartSpanOptions(fmt.Sprint("GET ", path[i]), "request", apm.SpanOptions{ + ExitSpan: true, Start: currentTime, + }) + span.Duration = 100 * time.Nanosecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + // This span exceeds the default threshold (5ms) and won't be compressed. + { + span := tx.StartSpanOptions("GET /f", "request", apm.SpanOptions{ + ExitSpan: true, Start: currentTime, + }) + span.Duration = 6 * time.Millisecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + tx.Duration = currentTime.Sub(txStart) + tx.End() + tracer.Flush(nil) + + transaction := tracer.Payloads().Transactions[0] + spans := tracer.Payloads().Spans + defer func() { + if t.Failed() { + apmtest.WriteTraceWaterfall(os.Stdout, transaction, spans) + apmtest.WriteTraceTable(os.Stdout, transaction, spans) + } + }() + + require.NotNil(t, transaction) + if test.assertFunc != nil { + test.assertFunc(t, spans) + } + }) + } +} + +func TestCompressSpanSameKindParentSpan(t *testing.T) { + // This test asserts the span compression works when the spans are children + // of another span. + tracer := apmtest.NewRecordingTracer() + tracer.SetSpanCompressionEnabled(true) + + // This test case covers spans that have other spans as parents. + // |_______________transaction (6b1e4866252dea6f) - 1.45ms________________| + // |__internal internal op - 700µs___| + // |request GET /r| + // |request G| + // |___internal another op - 750µs____| + // |2 Calls to re| + txStart := time.Now() + tx := tracer.StartTransactionOptions("name", "type", + apm.TransactionOptions{Start: txStart}, + ) + + ctx := apm.ContextWithTransaction(context.Background(), tx) + currentTime := txStart + { + // Doesn't compress any spans since none meet the necessary conditions + // the "request" type are both the same type but the parent + parent, ctx := apm.StartSpanOptions(ctx, "internal op", "internal", apm.SpanOptions{ + Start: currentTime, + }) + // Have span propagate context downstream, this should not allow for + // compression + child, ctx := apm.StartSpanOptions(ctx, "GET /resource", "request", apm.SpanOptions{ + Start: currentTime.Add(100 * time.Microsecond), + }) + + grandChild, _ := apm.StartSpanOptions(ctx, "GET /different", "request", apm.SpanOptions{ + ExitSpan: true, + Start: currentTime.Add(120 * time.Microsecond), + }) + + grandChild.Duration = 200 * time.Microsecond + grandChild.End() + + child.Duration = 300 * time.Microsecond + child.End() + + parent.Duration = 700 * time.Microsecond + currentTime = currentTime.Add(parent.Duration) + parent.End() + } + { + // Compresses the last two spans together since they are both exit + // spans, same "request" type, don't propagate ctx and succeed. + parent, ctx := apm.StartSpanOptions(ctx, "another op", "internal", apm.SpanOptions{ + Start: currentTime.Add(50 * time.Microsecond), + }) + child, _ := apm.StartSpanOptions(ctx, "GET /res", "request", apm.SpanOptions{ + ExitSpan: true, + Start: currentTime.Add(120 * time.Microsecond), + }) + + otherChild, _ := apm.StartSpanOptions(ctx, "GET /diff", "request", apm.SpanOptions{ + ExitSpan: true, + Start: currentTime.Add(150 * time.Microsecond), + }) + + otherChild.Duration = 250 * time.Microsecond + otherChild.End() + + child.Duration = 300 * time.Microsecond + child.End() + + parent.Duration = 750 * time.Microsecond + currentTime = currentTime.Add(parent.Duration) + parent.End() + } + + tx.Duration = currentTime.Sub(txStart) + tx.End() + tracer.Flush(nil) + + transaction := tracer.Payloads().Transactions[0] + spans := tracer.Payloads().Spans + + defer func() { + if t.Failed() { + apmtest.WriteTraceTable(os.Stdout, transaction, spans) + apmtest.WriteTraceWaterfall(os.Stdout, transaction, spans) + } + }() + require.NotNil(t, transaction) + assert.Equal(t, 5, len(spans)) + + compositeSpan := spans[3] + compositeParent := spans[4] + require.NotNil(t, compositeSpan) + require.NotNil(t, compositeSpan.Composite) + assert.Equal(t, "Calls to request", compositeSpan.Name) + assert.Equal(t, "request", compositeSpan.Type) + assert.Equal(t, "internal", compositeParent.Type) + assert.Equal(t, compositeSpan.Composite.Count, 2) + assert.Equal(t, compositeSpan.ParentID, compositeParent.ID) + assert.GreaterOrEqual(t, compositeParent.Duration, compositeSpan.Duration) +} + +func TestCompressSpanSameKindParentSpanContext(t *testing.T) { + // This test ensures that the compression also works when the s.Parent is + // set (via the context.Context). + // |________________transaction (ab51fc698fef307a) - 15ms_________________| + // |___________________internal parent - 13ms____________________| + // |3 Calls to re| + // |internal algorithm - 5m| + // |2 Calls t| + // |inte| + tracer := apmtest.NewRecordingTracer() + tracer.SetSpanCompressionEnabled(true) + + txStart := time.Now() + tx := tracer.StartTransactionOptions("name", "type", + apm.TransactionOptions{Start: txStart}, + ) + + ctx := apm.ContextWithTransaction(context.Background(), tx) + parentStart := txStart.Add(time.Millisecond) + parent, ctx := apm.StartSpanOptions(ctx, "parent", "internal", apm.SpanOptions{ + Start: parentStart, + }) + + childrenStart := parentStart.Add(2 * time.Millisecond) + for i := 0; i < 3; i++ { + span, _ := apm.StartSpanOptions(ctx, "db", "redis", apm.SpanOptions{ + ExitSpan: true, + Start: childrenStart, + }) + childrenStart = childrenStart.Add(time.Millisecond) + span.Duration = time.Millisecond + span.End() + } + + testSpans := []struct { + name string + typ string + duration time.Duration + }{ + {name: "GET /some", typ: "client", duration: time.Millisecond}, + {name: "GET /resource", typ: "client", duration: 2 * time.Millisecond}, + {name: "compute something", typ: "internal", duration: time.Millisecond}, + } + + subParent, ctx := apm.StartSpanOptions(ctx, "algorithm", "internal", apm.SpanOptions{ + Start: childrenStart.Add(time.Millisecond), + }) + childrenStart = childrenStart.Add(time.Millisecond) + for _, cfg := range testSpans { + child, _ := apm.StartSpanOptions(ctx, cfg.name, cfg.typ, apm.SpanOptions{ + ExitSpan: true, Start: childrenStart.Add(cfg.duration), + }) + childrenStart = childrenStart.Add(cfg.duration) + child.Duration = cfg.duration + child.End() + } + childrenStart = childrenStart.Add(time.Millisecond) + subParent.Duration = 6 * time.Millisecond + subParent.End() + + parent.Duration = childrenStart.Add(2 * time.Millisecond).Sub(txStart) + parent.End() + tx.Duration = 15 * time.Millisecond + tx.End() + + tracer.Flush(nil) + + transaction := tracer.Payloads().Transactions[0] + spans := tracer.Payloads().Spans + + defer func() { + if t.Failed() { + apmtest.WriteTraceTable(os.Stdout, transaction, spans) + apmtest.WriteTraceWaterfall(os.Stdout, transaction, spans) + } + }() + require.NotNil(t, transaction) + assert.Equal(t, 5, len(spans)) + + sort.SliceStable(spans, func(i, j int) bool { + return time.Time(spans[i].Timestamp).Before(time.Time(spans[j].Timestamp)) + }) + + redisSpan := spans[1] + require.NotNil(t, redisSpan.Composite) + assert.Equal(t, 3, redisSpan.Composite.Count) + assert.Equal(t, float64(3), redisSpan.Composite.Sum) + assert.Equal(t, "exact_match", redisSpan.Composite.CompressionStrategy) + + clientSpan := spans[3] + require.NotNil(t, clientSpan.Composite) + assert.Equal(t, clientSpan.ParentID, spans[2].ID) + assert.Equal(t, 2, clientSpan.Composite.Count) + assert.Equal(t, float64(3), clientSpan.Composite.Sum) + assert.Equal(t, "same_kind", clientSpan.Composite.CompressionStrategy) +} + +func TestCompressSpanSameKindConcurrent(t *testing.T) { + // This test verifies there aren't any deadlocks. + tracer := apmtest.NewRecordingTracer() + tracer.SetSpanCompressionEnabled(true) + + tx := tracer.StartTransaction("name", "type") + var wg sync.WaitGroup + count := 100 + wg.Add(count) + exitSpanOpts := apm.SpanOptions{ExitSpan: true} + for i := 0; i < count; i++ { + go func(i int) { + span := tx.StartSpanOptions(fmt.Sprint(i), "request", exitSpanOpts) + span.End() + wg.Done() + }(i) + } + + ctx := apm.ContextWithTransaction(context.Background(), tx) + parent, ctx := apm.StartSpan(ctx, "parent", "internal") + wg.Add(count) + + compressedSome := make(chan struct{}) + for i := 0; i < count; i++ { + go func(i int) { + select { + case compressedSome <- struct{}{}: + default: + } + span, _ := apm.StartSpanOptions(ctx, fmt.Sprint(i), "request", apm.SpanOptions{ + ExitSpan: true, + }) + span.End() + wg.Done() + }(i) + } + + // Wait until at least a goroutine has been scheduled + <-compressedSome + tx.End() + parent.End() + wg.Wait() + close(compressedSome) +} + +func TestCompressSpanPrematureEnd(t *testing.T) { + // This test cases assert that the cached spans are sent when the span or + // tx that holds their cache is ended and the cache isn't lost. + type expect struct { + compressionStrategy string + compositeSum float64 + spanCount int + compositeCount int + } + assertResult := func(t *testing.T, spans []model.Span, expect expect) { + assert.Equal(t, expect.spanCount, len(spans)) + var composite *model.CompositeSpan + for _, span := range spans { + if span.Composite != nil { + assert.Equal(t, expect.compositeCount, span.Composite.Count) + assert.Equal(t, expect.compressionStrategy, span.Composite.CompressionStrategy) + assert.Equal(t, expect.compositeSum, span.Composite.Sum) + composite = span.Composite + } + } + require.NotNil(t, composite) + } + + // 1. The parent ends before they do. + // The parent holds the compression cache in this test case. + // | tx | + // | parent | <--- The parent ends before the children ends. + // | child | <--- compressed + // | child | <--- compressed + // | child | <--- NOT compressed + // The expected result are 3 spans, the cache is invalidated and the span + // ended after the parent ends. + t.Run("ParentContext", func(t *testing.T) { + tracer := apmtest.NewRecordingTracer() + tracer.SetSpanCompressionEnabled(true) + + tx := tracer.StartTransaction("name", "type") + ctx := apm.ContextWithTransaction(context.Background(), tx) + parent, ctx := apm.StartSpan(ctx, "parent", "internal") + for i := 0; i < 4; i++ { + child, _ := apm.StartSpanOptions(ctx, fmt.Sprint(i), "type", apm.SpanOptions{ + Parent: parent.TraceContext(), + ExitSpan: true, + }) + child.Duration = time.Microsecond + child.End() + if i == 2 { + parent.End() + } + } + tx.End() + tracer.Flush(nil) + assertResult(t, tracer.Payloads().Spans, expect{ + spanCount: 3, + compositeCount: 3, + compressionStrategy: "same_kind", + compositeSum: 0.003, + }) + }) + + // 2. The tx ends before the parent ends. + // The tx holds the compression cache in this test case. + // | tx | <--- The TX ends before parent. + // | parent | + // | child | <--- compressed + // | child | <--- compressed + // The expected result are 3 spans, the cache is invalidated and the span + // ended after the parent ends. + t.Run("TxEndBefore", func(t *testing.T) { + tracer := apmtest.NewRecordingTracer() + tracer.SetSpanCompressionEnabled(true) + + tx := tracer.StartTransaction("name", "type") + ctx := apm.ContextWithTransaction(context.Background(), tx) + + parent, ctx := apm.StartSpan(ctx, "parent", "internal") + for i := 0; i < 2; i++ { + child, _ := apm.StartSpanOptions(ctx, fmt.Sprint(i), "type", apm.SpanOptions{ + ExitSpan: true, + }) + child.Duration = time.Microsecond + child.End() + } + tx.End() + parent.End() + tracer.Flush(nil) + assertResult(t, tracer.Payloads().Spans, expect{ + spanCount: 2, + compositeCount: 2, + compressionStrategy: "same_kind", + compositeSum: 0.002, + }) + }) + + // 2. The parent ends before the last of the children span are finished. + // The tx holds the compression cache in this test case. + // | tx | + // | parent | <--- The parent ends before the last child ends. + // | child | <--- compressed + // | child | <--- compressed + // | child | <--- NOT compressed + t.Run("ParentFromTx", func(t *testing.T) { + tracer := apmtest.NewRecordingTracer() + tracer.SetSpanCompressionEnabled(true) + + tx := tracer.StartTransaction("name", "type") + parent := tx.StartSpan("parent", "internal", nil) + for i := 0; i < 3; i++ { + child := tx.StartSpanOptions(fmt.Sprint(i), "type", apm.SpanOptions{ + Parent: parent.TraceContext(), + ExitSpan: true, + }) + child.Duration = time.Microsecond + child.End() + if i == 1 { + parent.End() + } + } + tx.End() + tracer.Flush(nil) + assertResult(t, tracer.Payloads().Spans, expect{ + spanCount: 3, + compositeCount: 2, + compressionStrategy: "same_kind", + compositeSum: 0.002, + }) + }) +} + func TestExitSpanDoesNotOverwriteDestinationServiceResource(t *testing.T) { _, spans, _ := apmtest.WithTransaction(func(ctx context.Context) { span, _ := apm.StartSpanOptions(ctx, "name", "type", apm.SpanOptions{ExitSpan: true}) diff --git a/tracer.go b/tracer.go index e21c432ae..f65d2faca 100644 --- a/tracer.go +++ b/tracer.go @@ -117,6 +117,8 @@ type TracerOptions struct { cpuProfileInterval time.Duration cpuProfileDuration time.Duration heapProfileInterval time.Duration + + compressionOptions compressionOptions } // initDefaults updates opts with default values. @@ -164,6 +166,21 @@ func (opts *TracerOptions) initDefaults(continueOnError bool) error { maxSpans = defaultMaxSpans } + spanCompressionEnabled, err := initialSpanCompressionEnabled() + if failed(err) { + spanCompressionEnabled = defaultSpanCompressionEnabled + } + + spanCompressionExactMatchMaxDuration, err := initialSpanCompressionExactMatchMaxDuration() + if failed(err) { + spanCompressionExactMatchMaxDuration = defaultSpanCompressionExactMatchMaxDuration + } + + spanCompressionSameKindMaxDuration, err := initialSpanCompressionSameKindMaxDuration() + if failed(err) { + spanCompressionSameKindMaxDuration = defaultSpanCompressionSameKindMaxDuration + } + sampler, err := initialSampler() if failed(err) { sampler = nil @@ -244,6 +261,11 @@ func (opts *TracerOptions) initDefaults(continueOnError bool) error { opts.bufferSize = bufferSize opts.metricsBufferSize = metricsBufferSize opts.maxSpans = maxSpans + opts.compressionOptions = compressionOptions{ + enabled: spanCompressionEnabled, + exactMatchMaxDuration: spanCompressionExactMatchMaxDuration, + sameKindMaxDuration: spanCompressionSameKindMaxDuration, + } opts.sampler = sampler opts.sanitizedFieldNames = initialSanitizedFieldNames() opts.disabledMetrics = initialDisabledMetrics() @@ -284,6 +306,12 @@ func (opts *TracerOptions) initDefaults(continueOnError bool) error { return nil } +type compressionOptions struct { + enabled bool + exactMatchMaxDuration time.Duration + sameKindMaxDuration time.Duration +} + // Tracer manages the sampling and sending of transactions to // Elastic APM. // @@ -413,6 +441,15 @@ func newTracer(opts TracerOptions) *Tracer { t.setLocalInstrumentationConfig(envMaxSpans, func(cfg *instrumentationConfigValues) { cfg.maxSpans = opts.maxSpans }) + t.setLocalInstrumentationConfig(envSpanCompressionEnabled, func(cfg *instrumentationConfigValues) { + cfg.compressionOptions.enabled = opts.compressionOptions.enabled + }) + t.setLocalInstrumentationConfig(envSpanCompressionExactMatchMaxDuration, func(cfg *instrumentationConfigValues) { + cfg.compressionOptions.exactMatchMaxDuration = opts.compressionOptions.exactMatchMaxDuration + }) + t.setLocalInstrumentationConfig(envSpanCompressionSameKindMaxDuration, func(cfg *instrumentationConfigValues) { + cfg.compressionOptions.sameKindMaxDuration = opts.compressionOptions.sameKindMaxDuration + }) t.setLocalInstrumentationConfig(envTransactionSampleRate, func(cfg *instrumentationConfigValues) { cfg.sampler = opts.sampler cfg.extendedSampler, _ = opts.sampler.(ExtendedSampler) @@ -708,6 +745,29 @@ func (t *Tracer) SetMaxSpans(n int) { }) } +// SetSpanCompressionEnabled enables/disables the span compression feature. +func (t *Tracer) SetSpanCompressionEnabled(v bool) { + t.setLocalInstrumentationConfig(envSpanCompressionEnabled, func(cfg *instrumentationConfigValues) { + cfg.compressionOptions.enabled = v + }) +} + +// SetSpanCompressionExactMatchMaxDuration sets the maximum duration for a span +// to be compressed with `compression_strategy` == `exact_match`. +func (t *Tracer) SetSpanCompressionExactMatchMaxDuration(v time.Duration) { + t.setLocalInstrumentationConfig(envSpanCompressionExactMatchMaxDuration, func(cfg *instrumentationConfigValues) { + cfg.compressionOptions.exactMatchMaxDuration = v + }) +} + +// SetSpanCompressionSameKindMaxDuration sets the maximum duration for a span +// to be compressed with `compression_strategy` == `same_kind`. +func (t *Tracer) SetSpanCompressionSameKindMaxDuration(v time.Duration) { + t.setLocalInstrumentationConfig(envSpanCompressionSameKindMaxDuration, func(cfg *instrumentationConfigValues) { + cfg.compressionOptions.sameKindMaxDuration = v + }) +} + // SetSpanFramesMinDuration sets the minimum duration for a span after which // we will capture its stack frames. func (t *Tracer) SetSpanFramesMinDuration(d time.Duration) { diff --git a/transaction.go b/transaction.go index 04aa7cb0b..5c1e054ca 100644 --- a/transaction.go +++ b/transaction.go @@ -69,6 +69,7 @@ func (t *Tracer) StartTransactionOptions(name, transactionType string, opts Tran } tx.maxSpans = instrumentationConfig.maxSpans + tx.compressedSpan.options = instrumentationConfig.compressionOptions tx.spanFramesMinDuration = instrumentationConfig.spanFramesMinDuration tx.stackTraceLimit = instrumentationConfig.stackTraceLimit tx.Context.captureHeaders = instrumentationConfig.captureHeaders @@ -286,6 +287,13 @@ func (tx *Transaction) End() { if tx.Outcome == "" { tx.Outcome = tx.Context.outcome() } + // Hold the transaction data lock to check if the transaction has any + // compressed spans in its cache, if so, evict cache and end the span. + tx.TransactionData.mu.Lock() + if evictedSpan := tx.compressedSpan.evict(); evictedSpan != nil { + evictedSpan.end() + } + tx.TransactionData.mu.Unlock() tx.enqueue() } else { tx.reset(tx.tracer) @@ -366,6 +374,8 @@ type TransactionData struct { spanTimings spanTimingsMap droppedSpansStats droppedSpanTimingsMap rand *rand.Rand // for ID generation + + compressedSpan compressedSpan } // reset resets the TransactionData back to its zero state and places it back