From 27abca23207b669b6761c307915141026b68d8d8 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Wed, 13 Oct 2021 18:37:36 +0800 Subject: [PATCH] span (experimental): Compress short exit spans (#1134) Implements the span compression algorithm described in elastic/apm#432. The implementation pretty close to what the spec describes, with a few modifications due to the differences in the Go agent implementation. The span compression is experimental and is disabled by default with the default thresholds for the different compression strategies set: - `ELASTIC_APM_SPAN_COMPRESSION_ENABLED=false` - `ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION=50ms` - `ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION=5ms` The implementation uses a new field (cache `*Span`) in `TransactionData` and `SpanData` which is used to cache compressed or compression eligible spans in that field. An additional `composite` field has also been added to the `SpanData` and `TransactionData`. The algorithm states that any exit span that is lower or equal duration than the set `ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION` or `ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION` with a destination service set is compressable into a composite span using the appropriate strategy. Compressable spans need to be "exit" and siblings which **have not** propagated their context downstream. Sibling spans are spans that share the same parent span (or transaction). Spans can be compressed with `same_kind` or `exact_match` as a strategy, and their sum is the aggregated duration of of all When a compressed span has been compressed into a composite using the `same_kind` strategy, its name is mutated to `Calls to `. Spans will be compressed into a composite only when the siblings are consecutive and any compression attempt that doesn't meet that, will cause the cache to be evicted. Additionally, two helper functions have been added under the `apmtest`: package: `WriteTraceTable` and `WriteTraceWaterfall`, which help with understanding why a test is failing if they are and debugging those. Signed-off-by: Marc Lopez Rubio --- CHANGELOG.asciidoc | 1 + apmtest/debug.go | 161 ++++ config.go | 64 ++ config_test.go | 52 ++ docs/configuration.asciidoc | 65 ++ .../apmmath/round_go10.go | 5 +- utils_go9.go => internal/apmmath/round_go9.go | 6 +- internal/apmstrings/concat10.go | 38 + internal/apmstrings/concat10_test.go | 60 ++ internal/apmstrings/concat9.go | 41 + model/marshal_fastjson.go | 18 + model/model.go | 17 + modelwriter.go | 3 + sampler.go | 4 +- span.go | 29 + span_compressed.go | 319 ++++++++ span_test.go | 741 ++++++++++++++++++ tracer.go | 60 ++ transaction.go | 10 + 19 files changed, 1688 insertions(+), 6 deletions(-) create mode 100644 apmtest/debug.go rename utils_go10.go => internal/apmmath/round_go10.go (88%) rename utils_go9.go => internal/apmmath/round_go9.go (89%) create mode 100644 internal/apmstrings/concat10.go create mode 100644 internal/apmstrings/concat10_test.go create mode 100644 internal/apmstrings/concat9.go create mode 100644 span_compressed.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 879f22b3f..3e2984aea 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -25,6 +25,7 @@ https://github.com/elastic/apm-agent-go/compare/v1.14.0...master[View commits] - Deprecate `http.request.socket.encrypted` and stop recording it in `module/apmhttp`, `module/apmgrpc` and `module/apmfiber`. {pull}1129[#(1129)] - Collect and send span destination service timing statistics about the dropped spans to the apm-server. {pull}1132[#(1132)] +- Experimental support to compress short exit spans into a composite span. Disabled by default. {pull}1134[#(1134)] [[release-notes-1.x]] === Go Agent version 1.x diff --git a/apmtest/debug.go b/apmtest/debug.go new file mode 100644 index 000000000..cf9723397 --- /dev/null +++ b/apmtest/debug.go @@ -0,0 +1,161 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package apmtest // import "go.elastic.co/apm/apmtest" + +import ( + "bytes" + "fmt" + "io" + "sort" + "text/tabwriter" + "time" + "unicode/utf8" + + "go.elastic.co/apm/internal/apmmath" + "go.elastic.co/apm/model" +) + +// WriteTraceTable displays the trace as a table which can be used on tests to aid +// debugging. +func WriteTraceTable(writer io.Writer, tx model.Transaction, spans []model.Span) { + w := tabwriter.NewWriter(writer, 2, 4, 2, ' ', tabwriter.TabIndent) + fmt.Fprintln(w, "#\tNAME\tTYPE\tCOMP\tN\tDURATION\tOFFSET\tSPAN ID\tPARENT ID\tTRACE ID") + + fmt.Fprintf(w, "TX\t%s\t%s\t-\t-\t%f\t%d\t%x\t%x\t%x\n", tx.Name, + tx.Type, tx.Duration, + 0, + tx.ID, tx.ParentID, tx.TraceID, + ) + + sort.SliceStable(spans, func(i, j int) bool { + return time.Time(spans[i].Timestamp).Before(time.Time(spans[j].Timestamp)) + }) + for i, span := range spans { + count := 1 + if span.Composite != nil { + count = span.Composite.Count + } + + fmt.Fprintf(w, "%d\t%s\t%s\t%v\t%d\t%f\t+%d\t%x\t%x\t%x\n", i, span.Name, + span.Type, span.Composite != nil, count, span.Duration, + time.Time(span.Timestamp).Sub(time.Time(tx.Timestamp))/1e3, + span.ID, span.ParentID, span.TraceID, + ) + } + w.Flush() +} + +// WriteTraceWaterfall the trace waterfall "console output" to the specified +// writer sorted by timestamp. +func WriteTraceWaterfall(w io.Writer, tx model.Transaction, spans []model.Span) { + maxDuration := time.Duration(tx.Duration * float64(time.Millisecond)) + if maxDuration == 0 { + for _, span := range spans { + maxDuration += time.Duration(span.Duration * float64(time.Millisecond)) + } + } + + maxWidth := int64(72) + buf := new(bytes.Buffer) + if tx.Duration > 0.0 { + writeSpan(buf, int(maxWidth), 0, fmt.Sprintf("transaction (%x) - %s", tx.ID, maxDuration.String())) + } + + sort.SliceStable(spans, func(i, j int) bool { + return time.Time(spans[i].Timestamp).Before(time.Time(spans[j].Timestamp)) + }) + + for _, span := range spans { + pos := int(apmmath.Round( + float64(time.Time(span.Timestamp).Sub(time.Time(tx.Timestamp))) / + float64(maxDuration) * float64(maxWidth), + )) + tDur := time.Duration(span.Duration * float64(time.Millisecond)) + dur := float64(tDur) / float64(maxDuration) + width := int(apmmath.Round(dur * float64(maxWidth))) + if width == int(maxWidth) { + width = int(maxWidth) - 1 + } + + spancontent := fmt.Sprintf("%s %s - %s", + span.Type, span.Name, + time.Duration(span.Duration*float64(time.Millisecond)).String(), + ) + if span.Composite != nil { + spancontent = fmt.Sprintf("%d %s - %s", + span.Composite.Count, span.Name, + time.Duration(span.Duration*float64(time.Millisecond)).String(), + ) + } + writeSpan(buf, width, pos, spancontent) + } + + io.Copy(w, buf) +} + +func writeSpan(buf *bytes.Buffer, width, pos int, content string) { + spaceRune := ' ' + fillRune := '_' + startRune := '|' + endRune := '|' + + // Prevent the spans from going out of bounds. + if pos == width { + pos = pos - 2 + } else if pos >= width { + pos = pos - 1 + } + + for i := 0; i < int(pos); i++ { + buf.WriteRune(spaceRune) + } + + if width <= 1 { + width = 1 + // Write the first letter of the span type when the width is too small. + startRune, _ = utf8.DecodeRuneInString(content) + } + + var written int + written, _ = buf.WriteRune(startRune) + if len(content) >= int(width)-1 { + content = content[:int(width)-1] + } + + spacing := (width - len(content) - 2) / 2 + for i := 0; i < spacing; i++ { + n, _ := buf.WriteRune(fillRune) + written += n + } + + n, _ := buf.WriteString(content) + written += n + for i := 0; i < spacing; i++ { + n, _ := buf.WriteRune(fillRune) + written += n + } + + if written < width { + buf.WriteRune(fillRune) + } + if width > 1 { + buf.WriteRune(endRune) + } + + buf.WriteString("\n") +} diff --git a/config.go b/config.go index 1881d370c..5419ff799 100644 --- a/config.go +++ b/config.go @@ -63,6 +63,14 @@ const ( envUseElasticTraceparentHeader = "ELASTIC_APM_USE_ELASTIC_TRACEPARENT_HEADER" envCloudProvider = "ELASTIC_APM_CLOUD_PROVIDER" + // NOTE(marclop) Experimental settings + // span_compression (default `false`) + envSpanCompressionEnabled = "ELASTIC_APM_SPAN_COMPRESSION_ENABLED" + // span_compression_exact_match_max_duration (default `50ms`) + envSpanCompressionExactMatchMaxDuration = "ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION" + // span_compression_same_kind_max_duration (default `5ms`) + envSpanCompressionSameKindMaxDuration = "ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION" + // NOTE(axw) profiling environment variables are experimental. // They may be removed in a future minor version without being // considered a breaking change. @@ -87,6 +95,11 @@ const ( maxAPIRequestSize = 5 * configutil.MByte minMetricsBufferSize = 10 * configutil.KByte maxMetricsBufferSize = 100 * configutil.MByte + + // Experimental Span Compressions default setting values + defaultSpanCompressionEnabled = false + defaultSpanCompressionExactMatchMaxDuration = 50 * time.Millisecond + defaultSpanCompressionSameKindMaxDuration = 5 * time.Millisecond ) var ( @@ -298,6 +311,26 @@ func initialUseElasticTraceparentHeader() (bool, error) { return configutil.ParseBoolEnv(envUseElasticTraceparentHeader, true) } +func initialSpanCompressionEnabled() (bool, error) { + return configutil.ParseBoolEnv(envSpanCompressionEnabled, + defaultSpanCompressionEnabled, + ) +} + +func initialSpanCompressionExactMatchMaxDuration() (time.Duration, error) { + return configutil.ParseDurationEnv( + envSpanCompressionExactMatchMaxDuration, + defaultSpanCompressionExactMatchMaxDuration, + ) +} + +func initialSpanCompressionSameKindMaxDuration() (time.Duration, error) { + return configutil.ParseDurationEnv( + envSpanCompressionSameKindMaxDuration, + defaultSpanCompressionSameKindMaxDuration, + ) +} + func initialCPUProfileIntervalDuration() (time.Duration, time.Duration, error) { interval, err := configutil.ParseDurationEnv(envCPUProfileInterval, 0) if err != nil || interval <= 0 { @@ -430,6 +463,36 @@ func (t *Tracer) updateRemoteConfig(logger WarningLogger, old, attrs map[string] delete(attrs, k) continue } + case envSpanCompressionEnabled: + val, err := strconv.ParseBool(v) + if err != nil { + errorf("central config failure: failed to parse %s: %s", k, err) + delete(attrs, k) + continue + } + updates = append(updates, func(cfg *instrumentationConfig) { + cfg.compressionOptions.enabled = val + }) + case envSpanCompressionExactMatchMaxDuration: + duration, err := configutil.ParseDuration(v) + if err != nil { + errorf("central config failure: failed to parse %s: %s", k, err) + delete(attrs, k) + continue + } + updates = append(updates, func(cfg *instrumentationConfig) { + cfg.compressionOptions.exactMatchMaxDuration = duration + }) + case envSpanCompressionSameKindMaxDuration: + duration, err := configutil.ParseDuration(v) + if err != nil { + errorf("central config failure: failed to parse %s: %s", k, err) + delete(attrs, k) + continue + } + updates = append(updates, func(cfg *instrumentationConfig) { + cfg.compressionOptions.sameKindMaxDuration = duration + }) default: warningf("central config failure: unsupported config: %s", k) delete(attrs, k) @@ -532,4 +595,5 @@ type instrumentationConfigValues struct { propagateLegacyHeader bool sanitizedFieldNames wildcard.Matchers ignoreTransactionURLs wildcard.Matchers + compressionOptions compressionOptions } diff --git a/config_test.go b/config_test.go index 490a453b1..e828652fb 100644 --- a/config_test.go +++ b/config_test.go @@ -20,6 +20,7 @@ package apm_test import ( "context" "encoding/json" + "fmt" "io/ioutil" "net/http" "net/http/httptest" @@ -127,6 +128,57 @@ func TestTracerCentralConfigUpdate(t *testing.T) { require.NoError(t, err) return tracer.IgnoredTransactionURL(u) }) + run("span_compression_enabled", "true", func(tracer *apmtest.RecordingTracer) bool { + tracer.ResetPayloads() + tx := tracer.StartTransaction("name", "type") + exitSpanOpts := apm.SpanOptions{ExitSpan: true} + for i := 0; i < 2; i++ { + span := tx.StartSpanOptions("name", "request", exitSpanOpts) + span.Duration = 50 * time.Millisecond + span.End() + } + tx.End() + tracer.Flush(nil) + return len(tracer.Payloads().Spans) == 1 + }) + run("span_compression_exact_match_max_duration", "100ms", func(tracer *apmtest.RecordingTracer) bool { + tracer.ResetPayloads() + tracer.SetSpanCompressionEnabled(true) + defer tracer.SetSpanCompressionEnabled(false) + tx := tracer.StartTransaction("name", "type") + exitSpanOpts := apm.SpanOptions{ExitSpan: true} + for i := 0; i < 2; i++ { + span := tx.StartSpanOptions("name", "request", exitSpanOpts) + span.Duration = 100 * time.Millisecond + span.End() + } + // Third span does not get compressed since it exceeds the max duration. + span := tx.StartSpanOptions("name", "request", exitSpanOpts) + span.Duration = 101 * time.Millisecond + span.End() + tx.End() + tracer.Flush(nil) + return len(tracer.Payloads().Spans) == 2 + }) + run("span_compression_same_kind_max_duration", "10ms", func(tracer *apmtest.RecordingTracer) bool { + tracer.ResetPayloads() + tracer.SetSpanCompressionEnabled(true) + defer tracer.SetSpanCompressionEnabled(false) + tx := tracer.StartTransaction("name", "type") + exitSpanOpts := apm.SpanOptions{ExitSpan: true} + for i := 0; i < 2; i++ { + span := tx.StartSpanOptions(fmt.Sprint(i), "request", exitSpanOpts) + span.Duration = 10 * time.Millisecond + span.End() + } + // Third span does not get compressed since it exceeds the max duration. + span := tx.StartSpanOptions("name", "request", exitSpanOpts) + span.Duration = 11 * time.Millisecond + span.End() + tx.End() + tracer.Flush(nil) + return len(tracer.Payloads().Spans) == 2 + }) } func testTracerCentralConfigUpdate(t *testing.T, logger apm.Logger, serverResponse string, isRemote func(*apmtest.RecordingTracer) bool) { diff --git a/docs/configuration.asciidoc b/docs/configuration.asciidoc index ed9ae728a..770db7de0 100644 --- a/docs/configuration.asciidoc +++ b/docs/configuration.asciidoc @@ -606,3 +606,68 @@ automatically collect the cloud metadata. Valid options are `"none"`, `"auto"`, `"aws"`, `"gcp"`, and `"azure"` If this config value is set to `"none"`, then no cloud metadata will be collected. + +[float] +[[config-span-compression-enabled]] +=== `ELASTIC_APM_SPAN_COMPRESSION_ENABLED` + +<> + +[options="header"] +|============ +| Environment | Default +| `ELASTIC_APM_SPAN_COMPRESSION_ENABLED` | `false` +|============ + +When enabled, the agent will attempt to compress _short_ exit spans that share the +same parent into a composite span. The exact duration for what is considered +_short_, depends on the compression strategy used (`same_kind` or `exact_match`). + +In order for a span to be compressible, these conditions need to be met: + +* Spans are exit spans. +* Spans are siblings (share the same parent). +* Spans have not propagated their context downstream. +* Each span duration is equal or lower to the compression strategy maximum duration. +* Spans are compressed with `same_kind` strategy when these attributes are equal: +** `span.type`. +** `span.subtype`. +** `span.context.destination.service.resource` +* Spans are compressed with `exact_match` strategy when all the previous conditions +are met and the `span.name` is equal. + +Compressing short exit spans should provide some storage savings for services that +create a lot of consecutive short exit spans to for example databases or cache +services which are generally uninteresting when viewing a trace. + +experimental::["This feature is experimental and requires APM Server v7.15 or later."] + +[float] +[[config-span-compression-exact-match-duration]] +=== `ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION` + +<> + +[options="header"] +|============ +| Environment | Default +| `ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION` | `50ms` +|============ + +The maximum duration to consider for compressing sibling exit spans that are an +exact match for compression. + +[float] +[[config-span-compression-same-kind-duration]] +=== `ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION` + +<> + +[options="header"] +|============ +| Environment | Default +| `ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION` | `5ms` +|============ + +The maximum duration to consider for compressing sibling exit spans that are of +the same kind for compression. diff --git a/utils_go10.go b/internal/apmmath/round_go10.go similarity index 88% rename from utils_go10.go rename to internal/apmmath/round_go10.go index 1027442fa..9cacc1803 100644 --- a/utils_go10.go +++ b/internal/apmmath/round_go10.go @@ -18,10 +18,11 @@ //go:build go1.10 // +build go1.10 -package apm // import "go.elastic.co/apm" +package apmmath import "math" -func round(x float64) float64 { +// Round is the current math.Round implementation for >= Go1.10. +func Round(x float64) float64 { return math.Round(x) } diff --git a/utils_go9.go b/internal/apmmath/round_go9.go similarity index 89% rename from utils_go9.go rename to internal/apmmath/round_go9.go index b5a109e24..111317d7f 100644 --- a/utils_go9.go +++ b/internal/apmmath/round_go9.go @@ -18,14 +18,14 @@ //go:build !go1.10 // +build !go1.10 -package apm // import "go.elastic.co/apm" +package apmmath import "math" -// Implementation of math.Round for Go < 1.10. +// Round Implementation of math.Round for Go < 1.10. // // Code shamelessly copied from pkg/math. -func round(x float64) float64 { +func Round(x float64) float64 { t := math.Trunc(x) if math.Abs(x-t) >= 0.5 { return t + math.Copysign(1, x) diff --git a/internal/apmstrings/concat10.go b/internal/apmstrings/concat10.go new file mode 100644 index 000000000..8f6105402 --- /dev/null +++ b/internal/apmstrings/concat10.go @@ -0,0 +1,38 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build go1.10 +// +build go1.10 + +package apmstrings + +import "strings" + +// Concat concatenates all the string arguments efficiently. +func Concat(elements ...string) string { + var builder strings.Builder + var length int + for i := range elements { + length += len(elements[i]) + } + builder.Grow(length) + + for i := range elements { + builder.WriteString(elements[i]) + } + return builder.String() +} diff --git a/internal/apmstrings/concat10_test.go b/internal/apmstrings/concat10_test.go new file mode 100644 index 000000000..ebb719830 --- /dev/null +++ b/internal/apmstrings/concat10_test.go @@ -0,0 +1,60 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build go1.10 +// +build go1.10 + +package apmstrings + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestConcat(t *testing.T) { + type args struct { + elements []string + } + tests := []struct { + expect string + elements []string + }{ + {elements: []string{"a", "b", "c"}, expect: "abc"}, + {elements: []string{"Calls to ", "redis"}, expect: "Calls to redis"}, + } + for i, test := range tests { + t.Run(fmt.Sprint(i), func(t *testing.T) { + assert.Equal(t, test.expect, Concat(test.elements...), i) + }) + } +} + +var benchRes string + +func BenchmarkConcat(b *testing.B) { + elements := map[bool][]string{ + true: {"Calls to ", "redis"}, + false: {"Calls to ", "verylongservicenamewegothere"}, + } + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + benchRes = Concat(elements[i%2 == 0]...) + } +} diff --git a/internal/apmstrings/concat9.go b/internal/apmstrings/concat9.go new file mode 100644 index 000000000..74f288657 --- /dev/null +++ b/internal/apmstrings/concat9.go @@ -0,0 +1,41 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build !go1.10 +// +build !go1.10 + +package apmstrings + +import ( + "bytes" +) + +// Concat concatenates all the string arguments efficiently. +func Concat(elements ...string) string { + var buf bytes.Buffer + var length int + for i := range elements { + length += len(elements[i]) + } + buf.Grow(length) + + for i := range elements { + buf.WriteString(elements[i]) + } + return buf.String() + +} diff --git a/model/marshal_fastjson.go b/model/marshal_fastjson.go index c7996faad..c1c734a6a 100644 --- a/model/marshal_fastjson.go +++ b/model/marshal_fastjson.go @@ -622,6 +622,12 @@ func (v *Span) MarshalFastJSON(w *fastjson.Writer) error { w.RawString(",\"action\":") w.String(v.Action) } + if v.Composite != nil { + w.RawString(",\"composite\":") + if err := v.Composite.MarshalFastJSON(w); err != nil && firstErr == nil { + firstErr = err + } + } if v.Context != nil { w.RawString(",\"context\":") if err := v.Context.MarshalFastJSON(w); err != nil && firstErr == nil { @@ -895,6 +901,18 @@ func (v *DatabaseSpanContext) MarshalFastJSON(w *fastjson.Writer) error { return nil } +func (v *CompositeSpan) MarshalFastJSON(w *fastjson.Writer) error { + w.RawByte('{') + w.RawString("\"compression_strategy\":") + w.String(v.CompressionStrategy) + w.RawString(",\"count\":") + w.Int64(int64(v.Count)) + w.RawString(",\"sum\":") + w.Float64(v.Sum) + w.RawByte('}') + return nil +} + func (v *Context) MarshalFastJSON(w *fastjson.Writer) error { var firstErr error w.RawByte('{') diff --git a/model/model.go b/model/model.go index 66079f496..821a2b9eb 100644 --- a/model/model.go +++ b/model/model.go @@ -362,6 +362,10 @@ type Span struct { // Outcome holds the span outcome: success, failure, or unknown. Outcome string `json:"outcome,omitempty"` + + // Composite is set when the span is a composite span and represents an + // aggregated set of spans as defined by `composite.compression_strategy`. + Composite *CompositeSpan `json:"composite,omitempty"` } // SpanContext holds contextual information relating to the span. @@ -464,6 +468,19 @@ type HTTPSpanContext struct { StatusCode int `json:"status_code,omitempty"` } +// CompositeSpan holds details on a group of spans represented by a single one. +type CompositeSpan struct { + // Count is the number of compressed spans the composite span represents. + // The minimum count is 2, as a composite span represents at least two spans. + Count int `json:"count"` + // Sum is the durations of all compressed spans this composite span + // represents in milliseconds. + Sum float64 `json:"sum"` + // A string value indicating which compression strategy was used. The valid + // values are `exact_match` and `same_kind`. + CompressionStrategy string `json:"compression_strategy"` +} + // Context holds contextual information relating to a transaction or error. type Context struct { // Custom holds custom context relating to the transaction or error. diff --git a/modelwriter.go b/modelwriter.go index db3f38c58..91073fec2 100644 --- a/modelwriter.go +++ b/modelwriter.go @@ -151,6 +151,9 @@ func (w *modelWriter) buildModelSpan(out *model.Span, span *Span, sd *SpanData) out.Duration = sd.Duration.Seconds() * 1000 out.Outcome = normalizeOutcome(sd.Outcome) out.Context = sd.Context.build() + if sd.composite.count > 1 { + out.Composite = sd.composite.build() + } // Copy the span type to context.destination.service.type. if out.Context != nil && out.Context.Destination != nil && out.Context.Destination.Service != nil { diff --git a/sampler.go b/sampler.go index 4a3d789d6..25f02c84e 100644 --- a/sampler.go +++ b/sampler.go @@ -23,6 +23,8 @@ import ( "math/big" "github.com/pkg/errors" + + "go.elastic.co/apm/internal/apmmath" ) // Sampler provides a means of sampling transactions. @@ -126,5 +128,5 @@ func roundSampleRate(r float64) float64 { if r > 0 && r < 0.0001 { r = 0.0001 } - return round(r*10000) / 10000 + return apmmath.Round(r*10000) / 10000 } diff --git a/span.go b/span.go index dc7acadbe..0d2011ae9 100644 --- a/span.go +++ b/span.go @@ -22,6 +22,7 @@ import ( "encoding/binary" "strings" "sync" + "sync/atomic" "time" "go.elastic.co/apm/stacktrace" @@ -124,6 +125,7 @@ func (tx *Transaction) StartSpanOptions(name, spanType string, opts SpanOptions) } span.stackFramesMinDuration = tx.spanFramesMinDuration span.stackTraceLimit = tx.stackTraceLimit + span.compressedSpan.options = tx.compressedSpan.options tx.spansCreated++ } @@ -175,6 +177,7 @@ func (t *Tracer) StartSpan(name, spanType string, transactionID SpanID, opts Spa instrumentationConfig := t.instrumentationConfig() span.stackFramesMinDuration = instrumentationConfig.spanFramesMinDuration span.stackTraceLimit = instrumentationConfig.stackTraceLimit + span.compressedSpan.options = instrumentationConfig.compressionOptions if opts.ExitSpan { span.exit = true } @@ -257,6 +260,9 @@ type Span struct { parentID SpanID exit bool + // ctxPropagated is set to 1 when the traceContext is propagated downstream. + ctxPropagated uint32 + mu sync.RWMutex // SpanData holds the span data. This field is set to nil when @@ -269,6 +275,7 @@ func (s *Span) TraceContext() TraceContext { if s == nil { return TraceContext{} } + atomic.StoreUint32(&s.ctxPropagated, 1) return s.traceContext } @@ -352,9 +359,29 @@ func (s *Span) End() { if len(s.stacktrace) == 0 && s.Duration >= s.stackFramesMinDuration { s.setStacktrace(1) } + if s.tx != nil { s.reportSelfTime() } + + s.end() +} + +// end represents a subset of the public `s.End()` API and will only attempt +// to compress the span if it's compressable, or enqueue it in case it's not. +// +// end must only be called from `s.End()` and `tx.End()`. +func (s *Span) end() { + evictedSpan, cached := s.attemptCompress() + if evictedSpan != nil { + evictedSpan.end() + } + if cached { + // s has been cached for potential compression, and will be enqueued + // by a future call to attemptCompress on a sibling span, or when the + // parent is ended. + return + } s.enqueue() s.SpanData = nil } @@ -461,6 +488,8 @@ type SpanData struct { stackTraceLimit int timestamp time.Time childrenTimer childrenTimer + composite compositeSpan + compressedSpan compressedSpan // Name holds the span name, initialized with the value passed to StartSpan. Name string diff --git a/span_compressed.go b/span_compressed.go new file mode 100644 index 000000000..90f2ece1c --- /dev/null +++ b/span_compressed.go @@ -0,0 +1,319 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package apm // import "go.elastic.co/apm" + +import ( + "sync/atomic" + "time" + + "go.elastic.co/apm/internal/apmstrings" + "go.elastic.co/apm/model" +) + +const ( + _ int = iota + compressedStrategyExactMatch + compressedStrategySameKind +) + +const ( + compressedSpanSameKindName = "Calls to " +) + +type compositeSpan struct { + lastSiblingEndTime time.Time + // this internal representation should be set in Nanoseconds, although + // the model unit is set in Milliseconds. + sum time.Duration + count int + compressionStrategy int +} + +func (cs compositeSpan) build() *model.CompositeSpan { + var out model.CompositeSpan + switch cs.compressionStrategy { + case compressedStrategyExactMatch: + out.CompressionStrategy = "exact_match" + case compressedStrategySameKind: + out.CompressionStrategy = "same_kind" + } + out.Count = cs.count + out.Sum = float64(cs.sum) / float64(time.Millisecond) + return &out +} + +func (cs compositeSpan) empty() bool { + return cs.count < 1 +} + +// A span is eligible for compression if all the following conditions are met +// 1. It's an exit span +// 2. The trace context has not been propagated to a downstream service +// 3. If the span has outcome (i.e., outcome is present and it's not null) then +// it should be success. It means spans with outcome indicating an issue of +// potential interest should not be compressed. +// The second condition is important so that we don't remove (compress) a span +// that may be the parent of a downstream service. This would orphan the sub- +// graph started by the downstream service and cause it to not appear in the +// waterfall view. +func (s *Span) compress(sibling *Span) bool { + // If the spans aren't siblings, we cannot compress them. + if s.parentID != sibling.parentID { + return false + } + + strategy := s.canCompressComposite(sibling) + if strategy == 0 { + strategy = s.canCompressStandard(sibling) + } + + // If the span cannot be compressed using any strategy. + if strategy == 0 { + return false + } + + if s.composite.empty() { + s.composite = compositeSpan{ + count: 1, + sum: s.Duration, + compressionStrategy: strategy, + } + } + + s.composite.count++ + s.composite.sum += sibling.Duration + siblingTimestamp := sibling.timestamp.Add(sibling.Duration) + if siblingTimestamp.After(s.composite.lastSiblingEndTime) { + s.composite.lastSiblingEndTime = siblingTimestamp + } + return true +} + +// +// Span // +// + +// attemptCompress tries to compress a span into a "composite span" when: +// * Compression is enabled on agent. +// * The cached span and the incoming span: +// * Share the same parent (are siblings). +// * Are consecutive spans. +// * Are both exit spans, outcome == success and are short enough (See +// `ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION` and +// `ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION` for more info). +// * Represent the same exact operation or the same kind of operation: +// * Are an exact match (same name, kind and destination service). +// OR +// * Are the same kind match (same kind and destination service). +// When a span has already been compressed using a particular strategy, it +// CANNOT continue to compress spans using a different strategy. +// The compression algorithm is fairly simple and only compresses spans into a +// composite span when the conditions listed above are met for all consecutive +// spans, at any point any span that doesn't meet the conditions, will cause +// the cache be evicted and the cached span will be returned. +// * When the incoming span is compressible, it will replace the cached span. +// * When the incoming span is not compressible, it will be enqueued as well. +// +// Returns `true` when the span has been cached, thus the caller should not +// enqueue the span. When `false` is returned, the cache is evicted and the +// caller should enqueue the span. +// +// It needs to be called with s.mu held, and will attempt to hold s.parent.mu +// when not nil or s.tx.mu and s.tx.TransactionData.mu when s.parent is nil. +func (s *Span) attemptCompress() (*Span, bool) { + // If the span has already been evicted from the cache, ask the caller to + // end it. + if !s.compressedSpan.options.enabled { + return nil, false + } + + // When a parent span ends, flush its cache. + if cache := s.compressedSpan.evict(); cache != nil { + return cache, false + } + + // There are two distinct places where the span can be cached; the parent + // span and the transaction. The algorithm prefers storing the cached spans + // in its parent, and if nil, it will use the transaction's cache. + if s.parent != nil { + s.parent.mu.Lock() + defer s.parent.mu.Unlock() + if !s.parent.ended() { + return s.parent.compressedSpan.compressOrEvictCache(s) + } + return nil, false + } + + if s.tx != nil { + s.tx.mu.RLock() + defer s.tx.mu.RUnlock() + if !s.tx.ended() { + s.tx.TransactionData.mu.Lock() + defer s.tx.TransactionData.mu.Unlock() + return s.tx.compressedSpan.compressOrEvictCache(s) + } + } + return nil, false +} + +func (s *Span) isCompressionEligible() bool { + if s == nil { + return false + } + ctxPropagated := atomic.LoadUint32(&s.ctxPropagated) == 1 + return s.exit && !ctxPropagated && + (s.Outcome == "" || s.Outcome == "success") +} + +func (s *Span) canCompressStandard(sibling *Span) int { + if !s.isSameKind(sibling) { + return 0 + } + + // We've already established the spans are the same kind. + strategy := compressedStrategySameKind + maxDuration := s.compressedSpan.options.sameKindMaxDuration + + // If it's an exact match, we then switch the settings + if s.isExactMatch(sibling) { + maxDuration = s.compressedSpan.options.exactMatchMaxDuration + strategy = compressedStrategyExactMatch + } + + // Any spans that go over the maximum duration cannot be compressed. + if !s.durationLowerOrEq(sibling, maxDuration) { + return 0 + } + + // If the composite span already has a compression strategy it differs from + // the chosen strategy, the spans cannot be compressed. + if !s.composite.empty() && s.composite.compressionStrategy != strategy { + return 0 + } + + // Return whichever strategy was chosen. + return strategy +} + +func (s *Span) canCompressComposite(sibling *Span) int { + if s.composite.empty() { + return 0 + } + switch s.composite.compressionStrategy { + case compressedStrategyExactMatch: + if s.isExactMatch(sibling) && s.durationLowerOrEq(sibling, + s.compressedSpan.options.exactMatchMaxDuration, + ) { + return compressedStrategyExactMatch + } + case compressedStrategySameKind: + if s.isSameKind(sibling) && s.durationLowerOrEq(sibling, + s.compressedSpan.options.sameKindMaxDuration, + ) { + return compressedStrategySameKind + } + } + return 0 +} + +func (s *Span) durationLowerOrEq(sibling *Span, max time.Duration) bool { + return s.Duration <= max && sibling.Duration <= max +} + +// +// SpanData // +// + +// isExactMatch is used for compression purposes, two spans are considered an +// exact match if the have the same name and are of the same kind (see +// isSameKind for more details). +func (s *SpanData) isExactMatch(span *Span) bool { + return s.Name == span.Name && s.isSameKind(span) +} + +// isSameKind is used for compression purposes, two spans are considered to be +// of the same kind if they have the same values for type, subtype, and +// `destination.service.resource`. +func (s *SpanData) isSameKind(span *Span) bool { + sameType := s.Type == span.Type + sameSubType := s.Subtype == span.Subtype + dstService := s.Context.destination.Service + otherDstService := span.Context.destination.Service + sameService := dstService != nil && otherDstService != nil && + dstService.Resource == otherDstService.Resource + + return sameType && sameSubType && sameService +} + +// setCompressedSpanName changes the span name to "Calls to " +// for composite spans that are compressed with the `"same_kind"` strategy. +func (s *SpanData) setCompressedSpanName() { + if s.composite.compressionStrategy != compressedStrategySameKind { + return + } + service := s.Context.destinationService.Resource + s.Name = apmstrings.Concat(compressedSpanSameKindName, service) +} + +type compressedSpan struct { + cache *Span + options compressionOptions +} + +// evict resets the cache to nil and returns the cached span after adjusting +// its Name, Duration, and timers. +// +// Should be only be called from Transaction.End() and Span.End(). +func (cs *compressedSpan) evict() *Span { + if cs.cache == nil { + return nil + } + cached := cs.cache + // Disable compression on the evicted span to avoid the span from ending up + // swapping the cache and causing an infinite loop. + cached.compressedSpan.options.enabled = false + cs.cache = nil + // When the span composite is not empty, we need to adjust the duration just + // before it is reported and no more spans will be compressed into the + // composite. If this is done before ending the span, the duration of the span + // could potentially grow over the compressable threshold and result in + // compressable span not being compressed and reported separately. + if !cached.composite.empty() { + cached.Duration = cached.composite.lastSiblingEndTime.Sub(cached.timestamp) + cached.setCompressedSpanName() + } + return cached +} + +func (cs *compressedSpan) compressOrEvictCache(s *Span) (*Span, bool) { + if !s.isCompressionEligible() { + return cs.evict(), false + } + + if cs.cache == nil { + cs.cache = s + return nil, true + } + + var evictedSpan *Span + if !cs.cache.compress(s) { + evictedSpan = cs.evict() + cs.cache = s + } + return evictedSpan, true +} diff --git a/span_test.go b/span_test.go index d1c8da7ee..0396c4f93 100644 --- a/span_test.go +++ b/span_test.go @@ -19,6 +19,10 @@ package apm_test import ( "context" + "fmt" + "os" + "sort" + "sync" "testing" "time" @@ -190,6 +194,743 @@ func TestStartExitSpan(t *testing.T) { assert.True(t, span.IsExitSpan()) } +func TestCompressSpanNonSiblings(t *testing.T) { + // Asserts that non sibling spans are not compressed. + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + + tracer.SetSpanCompressionEnabled(true) + + tx := tracer.StartTransaction("name", "type") + parent := tx.StartSpan("parent", "parent", nil) + + createSpans := []struct { + name, typ string + parent apm.TraceContext + }{ + {name: "not compressed", typ: "internal", parent: parent.TraceContext()}, + {name: "not compressed", typ: "internal", parent: tx.TraceContext()}, + {name: "compressed", typ: "internal", parent: parent.TraceContext()}, + {name: "compressed", typ: "internal", parent: parent.TraceContext()}, + {name: "compressed", typ: "different", parent: tx.TraceContext()}, + {name: "compressed", typ: "different", parent: tx.TraceContext()}, + } + for _, span := range createSpans { + span := tx.StartSpanOptions(span.name, span.typ, apm.SpanOptions{ + ExitSpan: true, Parent: span.parent, + }) + span.Duration = time.Millisecond + span.End() + } + + parent.End() + tx.End() + tracer.Flush(nil) + + spans := tracer.Payloads().Spans + require.Len(t, spans, 5) + + // First two spans should not have been compressed together. + require.Nil(t, spans[0].Composite) + require.Nil(t, spans[1].Composite) + + require.NotNil(t, spans[2].Composite) + require.Equal(t, 2, spans[2].Composite.Count) + require.Equal(t, float64(2), spans[2].Composite.Sum) + require.Equal(t, "exact_match", spans[2].Composite.CompressionStrategy) + + require.NotNil(t, spans[3].Composite) + require.Equal(t, 2, spans[3].Composite.Count) + require.Equal(t, float64(2), spans[3].Composite.Sum) + require.Equal(t, "exact_match", spans[3].Composite.CompressionStrategy) +} + +func TestCompressSpanExactMatch(t *testing.T) { + // Aserts that that span compression works on compressable spans with + // "exact_match" strategy. + tests := []struct { + setup func(t *testing.T) func() + assertFunc func(t *testing.T, tx model.Transaction, spans []model.Span) + name string + compressionEnabled bool + }{ + // |______________transaction (095b51e1b6ca784c) - 2.0013ms_______________| + // m + // m + // m + // m + // m + // m + // m + // m + // m + // m + // |___________________mysql SELECT * FROM users - 2ms____________________| + // r + // i + // r + { + name: "CompressFalse", + compressionEnabled: false, + assertFunc: func(t *testing.T, tx model.Transaction, spans []model.Span) { + require.NotEmpty(t, tx) + require.Equal(t, 14, len(spans)) + for _, span := range spans { + require.Nil(t, span.Composite) + } + }, + }, + // |______________transaction (7d3254511f02b26b) - 2.0013ms_______________| + // 10 + // |___________________mysql SELECT * FROM users - 2ms___________________| + // r + // i + // r + { + name: "CompressTrueSettingTweak", + compressionEnabled: true, + setup: func(*testing.T) func() { + // This setting + envVarName := "ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION" + og := os.Getenv(envVarName) + os.Setenv(envVarName, "1ms") + return func() { os.Setenv(envVarName, og) } + }, + assertFunc: func(t *testing.T, tx model.Transaction, spans []model.Span) { + require.NotNil(t, tx) + require.Equal(t, 5, len(spans)) + composite := spans[0] + require.NotNil(t, composite.Composite) + assert.Equal(t, "exact_match", composite.Composite.CompressionStrategy) + assert.Equal(t, composite.Composite.Count, 10) + assert.Equal(t, 0.001, composite.Composite.Sum) + assert.Equal(t, 0.001, composite.Duration) + + for _, span := range spans[1:] { + require.Nil(t, span.Composite) + } + }, + }, + // |______________transaction (5797fe58c6ccce29) - 2.0013ms_______________| + // |_____________________11 Calls to mysql - 2.001ms______________________| + // r + // i + // r + { + name: "CompressSpanCount4", + compressionEnabled: true, + assertFunc: func(t *testing.T, tx model.Transaction, spans []model.Span) { + require.NotEmpty(t, tx) + var composite = spans[0] + assert.Equal(t, composite.Context.Destination.Service.Resource, "mysql") + + require.NotNil(t, composite.Composite) + assert.Equal(t, composite.Composite.Count, 11) + assert.Equal(t, "exact_match", composite.Composite.CompressionStrategy) + // Sum should be at least the time that each span ran for. The + // model time is in Milliseconds and the span duration should be + // at least 2 Milliseconds + assert.Equal(t, int(composite.Composite.Sum), 2) + assert.Equal(t, int(composite.Duration), 2) + + for _, span := range spans { + if span.Type == "mysql" { + continue + } + assert.Nil(t, span.Composite) + } + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if test.setup != nil { + defer test.setup(t)() + } + + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + tracer.SetSpanCompressionEnabled(test.compressionEnabled) + + // When compression is enabled: + // Compress 10 spans into 1 and add another span with a different type + // [ Transaction ] + // [ mysql (11) ] [ request ] [ internal ] [ request ] + // + txStart := time.Now() + tx := tracer.StartTransactionOptions("name", "type", + apm.TransactionOptions{Start: txStart}, + ) + currentTime := txStart + for i := 0; i < 10; i++ { + span := tx.StartSpanOptions("SELECT * FROM users", "mysql", apm.SpanOptions{ + ExitSpan: true, Start: currentTime, + }) + span.Duration = 100 * time.Nanosecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + // Compressed when the exact_match threshold is >= 2ms. + { + span := tx.StartSpanOptions("SELECT * FROM users", "mysql", apm.SpanOptions{ + ExitSpan: true, Start: currentTime, + }) + span.Duration = 2 * time.Millisecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + + // None of these should be added to the composite. + { + span := tx.StartSpanOptions("GET /", "request", apm.SpanOptions{ + ExitSpan: true, Start: currentTime, + }) + span.Duration = 100 * time.Nanosecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + { + // Not an exit span, should not be compressed + span := tx.StartSpanOptions("calculate complex", "internal", apm.SpanOptions{ + Start: currentTime, + }) + span.Duration = 100 * time.Nanosecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + { + // Exit span, this is a good candidate to be compressed, but + // since it can't be compressed with the last request type ("internal") + span := tx.StartSpanOptions("GET /", "request", apm.SpanOptions{ + ExitSpan: true, Start: currentTime, + }) + span.Duration = 100 * time.Nanosecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + tx.Duration = currentTime.Sub(txStart) + tx.End() + tracer.Flush(nil) + + transaction := tracer.Payloads().Transactions[0] + spans := tracer.Payloads().Spans + defer func() { + if t.Failed() { + apmtest.WriteTraceWaterfall(os.Stdout, transaction, spans) + apmtest.WriteTraceTable(os.Stdout, transaction, spans) + } + }() + + if test.assertFunc != nil { + test.assertFunc(t, transaction, spans) + } + }) + } +} + +func TestCompressSpanSameKind(t *testing.T) { + // Aserts that that span compression works on compressable spans with + // "same_kind" strategy, and that different span types are not compressed. + tests := []struct { + setup func(t *testing.T) func() + assertFunc func(t *testing.T, spans []model.Span) + name string + compressionEnabled bool + }{ + { + name: "DefaultThreshold", + assertFunc: func(t *testing.T, spans []model.Span) { + require.Equal(t, 3, len(spans)) + mysqlSpan := spans[0] + assert.Equal(t, "mysql", mysqlSpan.Context.Destination.Service.Resource) + assert.Nil(t, mysqlSpan.Composite) + + requestSpan := spans[1] + assert.Equal(t, "request", requestSpan.Context.Destination.Service.Resource) + require.NotNil(t, requestSpan.Composite) + assert.Equal(t, 5, requestSpan.Composite.Count) + assert.Equal(t, "same_kind", requestSpan.Composite.CompressionStrategy) + assert.Equal(t, "Calls to request", requestSpan.Name) + // Check that the sum and span duration is at least the duration of the time set. + assert.Equal(t, 0.0005, requestSpan.Composite.Sum, requestSpan.Composite.Sum) + assert.Equal(t, 0.0005, requestSpan.Duration, requestSpan.Duration) + }, + }, + { + name: "10msThreshold", + setup: func(*testing.T) func() { + os.Setenv("ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION", "10ms") + return func() { os.Unsetenv("ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION") } + }, + assertFunc: func(t *testing.T, spans []model.Span) { + require.Equal(t, 2, len(spans)) + + mysqlSpan := spans[0] + assert.Equal(t, mysqlSpan.Context.Destination.Service.Resource, "mysql") + assert.Nil(t, mysqlSpan.Composite) + + requestSpan := spans[1] + assert.Equal(t, requestSpan.Context.Destination.Service.Resource, "request") + assert.NotNil(t, requestSpan.Composite) + assert.Equal(t, 6, requestSpan.Composite.Count) + assert.Equal(t, "Calls to request", requestSpan.Name) + assert.Equal(t, "same_kind", requestSpan.Composite.CompressionStrategy) + // Check that the aggregate sum is at least the duration of the time we + // we waited for. + assert.Greater(t, requestSpan.Composite.Sum, float64(5*100/time.Millisecond)) + + // Check that the total composite span duration is at least 5 milliseconds. + assert.Greater(t, requestSpan.Duration, float64(5*100/time.Millisecond)) + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if test.setup != nil { + defer test.setup(t)() + } + + tracer := apmtest.NewRecordingTracer() + defer tracer.Close() + tracer.SetSpanCompressionEnabled(true) + + // Compress 5 spans into 1 and add another span with a different type + // |______________transaction (572da67c206e9996) - 6.0006ms_______________| + // m + // 5 + // |________________________request GET /f - 6ms_________________________| + // + txStart := time.Now() + tx := tracer.StartTransactionOptions("name", "type", + apm.TransactionOptions{Start: txStart}, + ) + currentTime := txStart + + // Span is compressable, but cannot be compressed since the next span + // is not the same kind. It's published. + { + span := tx.StartSpanOptions("SELECT * FROM users", "mysql", apm.SpanOptions{ + ExitSpan: true, Start: currentTime, + }) + span.Duration = 100 * time.Nanosecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + + // These spans should be compressed into 1. + path := []string{"/a", "/b", "/c", "/d", "/e"} + for i := 0; i < 5; i++ { + span := tx.StartSpanOptions(fmt.Sprint("GET ", path[i]), "request", apm.SpanOptions{ + ExitSpan: true, Start: currentTime, + }) + span.Duration = 100 * time.Nanosecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + // This span exceeds the default threshold (5ms) and won't be compressed. + { + span := tx.StartSpanOptions("GET /f", "request", apm.SpanOptions{ + ExitSpan: true, Start: currentTime, + }) + span.Duration = 6 * time.Millisecond + currentTime = currentTime.Add(span.Duration) + span.End() + } + tx.Duration = currentTime.Sub(txStart) + tx.End() + tracer.Flush(nil) + + transaction := tracer.Payloads().Transactions[0] + spans := tracer.Payloads().Spans + defer func() { + if t.Failed() { + apmtest.WriteTraceWaterfall(os.Stdout, transaction, spans) + apmtest.WriteTraceTable(os.Stdout, transaction, spans) + } + }() + + require.NotNil(t, transaction) + if test.assertFunc != nil { + test.assertFunc(t, spans) + } + }) + } +} + +func TestCompressSpanSameKindParentSpan(t *testing.T) { + // This test asserts the span compression works when the spans are children + // of another span. + tracer := apmtest.NewRecordingTracer() + tracer.SetSpanCompressionEnabled(true) + + // This test case covers spans that have other spans as parents. + // |_______________transaction (6b1e4866252dea6f) - 1.45ms________________| + // |__internal internal op - 700µs___| + // |request GET /r| + // |request G| + // |___internal another op - 750µs____| + // |2 Calls to re| + txStart := time.Now() + tx := tracer.StartTransactionOptions("name", "type", + apm.TransactionOptions{Start: txStart}, + ) + + ctx := apm.ContextWithTransaction(context.Background(), tx) + currentTime := txStart + { + // Doesn't compress any spans since none meet the necessary conditions + // the "request" type are both the same type but the parent + parent, ctx := apm.StartSpanOptions(ctx, "internal op", "internal", apm.SpanOptions{ + Start: currentTime, + }) + // Have span propagate context downstream, this should not allow for + // compression + child, ctx := apm.StartSpanOptions(ctx, "GET /resource", "request", apm.SpanOptions{ + Start: currentTime.Add(100 * time.Microsecond), + }) + + grandChild, _ := apm.StartSpanOptions(ctx, "GET /different", "request", apm.SpanOptions{ + ExitSpan: true, + Start: currentTime.Add(120 * time.Microsecond), + }) + + grandChild.Duration = 200 * time.Microsecond + grandChild.End() + + child.Duration = 300 * time.Microsecond + child.End() + + parent.Duration = 700 * time.Microsecond + currentTime = currentTime.Add(parent.Duration) + parent.End() + } + { + // Compresses the last two spans together since they are both exit + // spans, same "request" type, don't propagate ctx and succeed. + parent, ctx := apm.StartSpanOptions(ctx, "another op", "internal", apm.SpanOptions{ + Start: currentTime.Add(50 * time.Microsecond), + }) + child, _ := apm.StartSpanOptions(ctx, "GET /res", "request", apm.SpanOptions{ + ExitSpan: true, + Start: currentTime.Add(120 * time.Microsecond), + }) + + otherChild, _ := apm.StartSpanOptions(ctx, "GET /diff", "request", apm.SpanOptions{ + ExitSpan: true, + Start: currentTime.Add(150 * time.Microsecond), + }) + + otherChild.Duration = 250 * time.Microsecond + otherChild.End() + + child.Duration = 300 * time.Microsecond + child.End() + + parent.Duration = 750 * time.Microsecond + currentTime = currentTime.Add(parent.Duration) + parent.End() + } + + tx.Duration = currentTime.Sub(txStart) + tx.End() + tracer.Flush(nil) + + transaction := tracer.Payloads().Transactions[0] + spans := tracer.Payloads().Spans + + defer func() { + if t.Failed() { + apmtest.WriteTraceTable(os.Stdout, transaction, spans) + apmtest.WriteTraceWaterfall(os.Stdout, transaction, spans) + } + }() + require.NotNil(t, transaction) + assert.Equal(t, 5, len(spans)) + + compositeSpan := spans[3] + compositeParent := spans[4] + require.NotNil(t, compositeSpan) + require.NotNil(t, compositeSpan.Composite) + assert.Equal(t, "Calls to request", compositeSpan.Name) + assert.Equal(t, "request", compositeSpan.Type) + assert.Equal(t, "internal", compositeParent.Type) + assert.Equal(t, compositeSpan.Composite.Count, 2) + assert.Equal(t, compositeSpan.ParentID, compositeParent.ID) + assert.GreaterOrEqual(t, compositeParent.Duration, compositeSpan.Duration) +} + +func TestCompressSpanSameKindParentSpanContext(t *testing.T) { + // This test ensures that the compression also works when the s.Parent is + // set (via the context.Context). + // |________________transaction (ab51fc698fef307a) - 15ms_________________| + // |___________________internal parent - 13ms____________________| + // |3 Calls to re| + // |internal algorithm - 5m| + // |2 Calls t| + // |inte| + tracer := apmtest.NewRecordingTracer() + tracer.SetSpanCompressionEnabled(true) + + txStart := time.Now() + tx := tracer.StartTransactionOptions("name", "type", + apm.TransactionOptions{Start: txStart}, + ) + + ctx := apm.ContextWithTransaction(context.Background(), tx) + parentStart := txStart.Add(time.Millisecond) + parent, ctx := apm.StartSpanOptions(ctx, "parent", "internal", apm.SpanOptions{ + Start: parentStart, + }) + + childrenStart := parentStart.Add(2 * time.Millisecond) + for i := 0; i < 3; i++ { + span, _ := apm.StartSpanOptions(ctx, "db", "redis", apm.SpanOptions{ + ExitSpan: true, + Start: childrenStart, + }) + childrenStart = childrenStart.Add(time.Millisecond) + span.Duration = time.Millisecond + span.End() + } + + testSpans := []struct { + name string + typ string + duration time.Duration + }{ + {name: "GET /some", typ: "client", duration: time.Millisecond}, + {name: "GET /resource", typ: "client", duration: 2 * time.Millisecond}, + {name: "compute something", typ: "internal", duration: time.Millisecond}, + } + + subParent, ctx := apm.StartSpanOptions(ctx, "algorithm", "internal", apm.SpanOptions{ + Start: childrenStart.Add(time.Millisecond), + }) + childrenStart = childrenStart.Add(time.Millisecond) + for _, cfg := range testSpans { + child, _ := apm.StartSpanOptions(ctx, cfg.name, cfg.typ, apm.SpanOptions{ + ExitSpan: true, Start: childrenStart.Add(cfg.duration), + }) + childrenStart = childrenStart.Add(cfg.duration) + child.Duration = cfg.duration + child.End() + } + childrenStart = childrenStart.Add(time.Millisecond) + subParent.Duration = 6 * time.Millisecond + subParent.End() + + parent.Duration = childrenStart.Add(2 * time.Millisecond).Sub(txStart) + parent.End() + tx.Duration = 15 * time.Millisecond + tx.End() + + tracer.Flush(nil) + + transaction := tracer.Payloads().Transactions[0] + spans := tracer.Payloads().Spans + + defer func() { + if t.Failed() { + apmtest.WriteTraceTable(os.Stdout, transaction, spans) + apmtest.WriteTraceWaterfall(os.Stdout, transaction, spans) + } + }() + require.NotNil(t, transaction) + assert.Equal(t, 5, len(spans)) + + sort.SliceStable(spans, func(i, j int) bool { + return time.Time(spans[i].Timestamp).Before(time.Time(spans[j].Timestamp)) + }) + + redisSpan := spans[1] + require.NotNil(t, redisSpan.Composite) + assert.Equal(t, 3, redisSpan.Composite.Count) + assert.Equal(t, float64(3), redisSpan.Composite.Sum) + assert.Equal(t, "exact_match", redisSpan.Composite.CompressionStrategy) + + clientSpan := spans[3] + require.NotNil(t, clientSpan.Composite) + assert.Equal(t, clientSpan.ParentID, spans[2].ID) + assert.Equal(t, 2, clientSpan.Composite.Count) + assert.Equal(t, float64(3), clientSpan.Composite.Sum) + assert.Equal(t, "same_kind", clientSpan.Composite.CompressionStrategy) +} + +func TestCompressSpanSameKindConcurrent(t *testing.T) { + // This test verifies there aren't any deadlocks. + tracer := apmtest.NewRecordingTracer() + tracer.SetSpanCompressionEnabled(true) + + tx := tracer.StartTransaction("name", "type") + var wg sync.WaitGroup + count := 100 + wg.Add(count) + exitSpanOpts := apm.SpanOptions{ExitSpan: true} + for i := 0; i < count; i++ { + go func(i int) { + span := tx.StartSpanOptions(fmt.Sprint(i), "request", exitSpanOpts) + span.End() + wg.Done() + }(i) + } + + ctx := apm.ContextWithTransaction(context.Background(), tx) + parent, ctx := apm.StartSpan(ctx, "parent", "internal") + wg.Add(count) + + compressedSome := make(chan struct{}) + for i := 0; i < count; i++ { + go func(i int) { + select { + case compressedSome <- struct{}{}: + default: + } + span, _ := apm.StartSpanOptions(ctx, fmt.Sprint(i), "request", apm.SpanOptions{ + ExitSpan: true, + }) + span.End() + wg.Done() + }(i) + } + + // Wait until at least a goroutine has been scheduled + <-compressedSome + tx.End() + parent.End() + wg.Wait() + close(compressedSome) +} + +func TestCompressSpanPrematureEnd(t *testing.T) { + // This test cases assert that the cached spans are sent when the span or + // tx that holds their cache is ended and the cache isn't lost. + type expect struct { + compressionStrategy string + compositeSum float64 + spanCount int + compositeCount int + } + assertResult := func(t *testing.T, spans []model.Span, expect expect) { + assert.Equal(t, expect.spanCount, len(spans)) + var composite *model.CompositeSpan + for _, span := range spans { + if span.Composite != nil { + assert.Equal(t, expect.compositeCount, span.Composite.Count) + assert.Equal(t, expect.compressionStrategy, span.Composite.CompressionStrategy) + assert.Equal(t, expect.compositeSum, span.Composite.Sum) + composite = span.Composite + } + } + require.NotNil(t, composite) + } + + // 1. The parent ends before they do. + // The parent holds the compression cache in this test case. + // | tx | + // | parent | <--- The parent ends before the children ends. + // | child | <--- compressed + // | child | <--- compressed + // | child | <--- NOT compressed + // The expected result are 3 spans, the cache is invalidated and the span + // ended after the parent ends. + t.Run("ParentContext", func(t *testing.T) { + tracer := apmtest.NewRecordingTracer() + tracer.SetSpanCompressionEnabled(true) + + tx := tracer.StartTransaction("name", "type") + ctx := apm.ContextWithTransaction(context.Background(), tx) + parent, ctx := apm.StartSpan(ctx, "parent", "internal") + for i := 0; i < 4; i++ { + child, _ := apm.StartSpanOptions(ctx, fmt.Sprint(i), "type", apm.SpanOptions{ + Parent: parent.TraceContext(), + ExitSpan: true, + }) + child.Duration = time.Microsecond + child.End() + if i == 2 { + parent.End() + } + } + tx.End() + tracer.Flush(nil) + assertResult(t, tracer.Payloads().Spans, expect{ + spanCount: 3, + compositeCount: 3, + compressionStrategy: "same_kind", + compositeSum: 0.003, + }) + }) + + // 2. The tx ends before the parent ends. + // The tx holds the compression cache in this test case. + // | tx | <--- The TX ends before parent. + // | parent | + // | child | <--- compressed + // | child | <--- compressed + // The expected result are 3 spans, the cache is invalidated and the span + // ended after the parent ends. + t.Run("TxEndBefore", func(t *testing.T) { + tracer := apmtest.NewRecordingTracer() + tracer.SetSpanCompressionEnabled(true) + + tx := tracer.StartTransaction("name", "type") + ctx := apm.ContextWithTransaction(context.Background(), tx) + + parent, ctx := apm.StartSpan(ctx, "parent", "internal") + for i := 0; i < 2; i++ { + child, _ := apm.StartSpanOptions(ctx, fmt.Sprint(i), "type", apm.SpanOptions{ + ExitSpan: true, + }) + child.Duration = time.Microsecond + child.End() + } + tx.End() + parent.End() + tracer.Flush(nil) + assertResult(t, tracer.Payloads().Spans, expect{ + spanCount: 2, + compositeCount: 2, + compressionStrategy: "same_kind", + compositeSum: 0.002, + }) + }) + + // 2. The parent ends before the last of the children span are finished. + // The tx holds the compression cache in this test case. + // | tx | + // | parent | <--- The parent ends before the last child ends. + // | child | <--- compressed + // | child | <--- compressed + // | child | <--- NOT compressed + t.Run("ParentFromTx", func(t *testing.T) { + tracer := apmtest.NewRecordingTracer() + tracer.SetSpanCompressionEnabled(true) + + tx := tracer.StartTransaction("name", "type") + parent := tx.StartSpan("parent", "internal", nil) + for i := 0; i < 3; i++ { + child := tx.StartSpanOptions(fmt.Sprint(i), "type", apm.SpanOptions{ + Parent: parent.TraceContext(), + ExitSpan: true, + }) + child.Duration = time.Microsecond + child.End() + if i == 1 { + parent.End() + } + } + tx.End() + tracer.Flush(nil) + assertResult(t, tracer.Payloads().Spans, expect{ + spanCount: 3, + compositeCount: 2, + compressionStrategy: "same_kind", + compositeSum: 0.002, + }) + }) +} + func TestExitSpanDoesNotOverwriteDestinationServiceResource(t *testing.T) { _, spans, _ := apmtest.WithTransaction(func(ctx context.Context) { span, _ := apm.StartSpanOptions(ctx, "name", "type", apm.SpanOptions{ExitSpan: true}) diff --git a/tracer.go b/tracer.go index e21c432ae..f65d2faca 100644 --- a/tracer.go +++ b/tracer.go @@ -117,6 +117,8 @@ type TracerOptions struct { cpuProfileInterval time.Duration cpuProfileDuration time.Duration heapProfileInterval time.Duration + + compressionOptions compressionOptions } // initDefaults updates opts with default values. @@ -164,6 +166,21 @@ func (opts *TracerOptions) initDefaults(continueOnError bool) error { maxSpans = defaultMaxSpans } + spanCompressionEnabled, err := initialSpanCompressionEnabled() + if failed(err) { + spanCompressionEnabled = defaultSpanCompressionEnabled + } + + spanCompressionExactMatchMaxDuration, err := initialSpanCompressionExactMatchMaxDuration() + if failed(err) { + spanCompressionExactMatchMaxDuration = defaultSpanCompressionExactMatchMaxDuration + } + + spanCompressionSameKindMaxDuration, err := initialSpanCompressionSameKindMaxDuration() + if failed(err) { + spanCompressionSameKindMaxDuration = defaultSpanCompressionSameKindMaxDuration + } + sampler, err := initialSampler() if failed(err) { sampler = nil @@ -244,6 +261,11 @@ func (opts *TracerOptions) initDefaults(continueOnError bool) error { opts.bufferSize = bufferSize opts.metricsBufferSize = metricsBufferSize opts.maxSpans = maxSpans + opts.compressionOptions = compressionOptions{ + enabled: spanCompressionEnabled, + exactMatchMaxDuration: spanCompressionExactMatchMaxDuration, + sameKindMaxDuration: spanCompressionSameKindMaxDuration, + } opts.sampler = sampler opts.sanitizedFieldNames = initialSanitizedFieldNames() opts.disabledMetrics = initialDisabledMetrics() @@ -284,6 +306,12 @@ func (opts *TracerOptions) initDefaults(continueOnError bool) error { return nil } +type compressionOptions struct { + enabled bool + exactMatchMaxDuration time.Duration + sameKindMaxDuration time.Duration +} + // Tracer manages the sampling and sending of transactions to // Elastic APM. // @@ -413,6 +441,15 @@ func newTracer(opts TracerOptions) *Tracer { t.setLocalInstrumentationConfig(envMaxSpans, func(cfg *instrumentationConfigValues) { cfg.maxSpans = opts.maxSpans }) + t.setLocalInstrumentationConfig(envSpanCompressionEnabled, func(cfg *instrumentationConfigValues) { + cfg.compressionOptions.enabled = opts.compressionOptions.enabled + }) + t.setLocalInstrumentationConfig(envSpanCompressionExactMatchMaxDuration, func(cfg *instrumentationConfigValues) { + cfg.compressionOptions.exactMatchMaxDuration = opts.compressionOptions.exactMatchMaxDuration + }) + t.setLocalInstrumentationConfig(envSpanCompressionSameKindMaxDuration, func(cfg *instrumentationConfigValues) { + cfg.compressionOptions.sameKindMaxDuration = opts.compressionOptions.sameKindMaxDuration + }) t.setLocalInstrumentationConfig(envTransactionSampleRate, func(cfg *instrumentationConfigValues) { cfg.sampler = opts.sampler cfg.extendedSampler, _ = opts.sampler.(ExtendedSampler) @@ -708,6 +745,29 @@ func (t *Tracer) SetMaxSpans(n int) { }) } +// SetSpanCompressionEnabled enables/disables the span compression feature. +func (t *Tracer) SetSpanCompressionEnabled(v bool) { + t.setLocalInstrumentationConfig(envSpanCompressionEnabled, func(cfg *instrumentationConfigValues) { + cfg.compressionOptions.enabled = v + }) +} + +// SetSpanCompressionExactMatchMaxDuration sets the maximum duration for a span +// to be compressed with `compression_strategy` == `exact_match`. +func (t *Tracer) SetSpanCompressionExactMatchMaxDuration(v time.Duration) { + t.setLocalInstrumentationConfig(envSpanCompressionExactMatchMaxDuration, func(cfg *instrumentationConfigValues) { + cfg.compressionOptions.exactMatchMaxDuration = v + }) +} + +// SetSpanCompressionSameKindMaxDuration sets the maximum duration for a span +// to be compressed with `compression_strategy` == `same_kind`. +func (t *Tracer) SetSpanCompressionSameKindMaxDuration(v time.Duration) { + t.setLocalInstrumentationConfig(envSpanCompressionSameKindMaxDuration, func(cfg *instrumentationConfigValues) { + cfg.compressionOptions.sameKindMaxDuration = v + }) +} + // SetSpanFramesMinDuration sets the minimum duration for a span after which // we will capture its stack frames. func (t *Tracer) SetSpanFramesMinDuration(d time.Duration) { diff --git a/transaction.go b/transaction.go index 04aa7cb0b..5c1e054ca 100644 --- a/transaction.go +++ b/transaction.go @@ -69,6 +69,7 @@ func (t *Tracer) StartTransactionOptions(name, transactionType string, opts Tran } tx.maxSpans = instrumentationConfig.maxSpans + tx.compressedSpan.options = instrumentationConfig.compressionOptions tx.spanFramesMinDuration = instrumentationConfig.spanFramesMinDuration tx.stackTraceLimit = instrumentationConfig.stackTraceLimit tx.Context.captureHeaders = instrumentationConfig.captureHeaders @@ -286,6 +287,13 @@ func (tx *Transaction) End() { if tx.Outcome == "" { tx.Outcome = tx.Context.outcome() } + // Hold the transaction data lock to check if the transaction has any + // compressed spans in its cache, if so, evict cache and end the span. + tx.TransactionData.mu.Lock() + if evictedSpan := tx.compressedSpan.evict(); evictedSpan != nil { + evictedSpan.end() + } + tx.TransactionData.mu.Unlock() tx.enqueue() } else { tx.reset(tx.tracer) @@ -366,6 +374,8 @@ type TransactionData struct { spanTimings spanTimingsMap droppedSpansStats droppedSpanTimingsMap rand *rand.Rand // for ID generation + + compressedSpan compressedSpan } // reset resets the TransactionData back to its zero state and places it back