Skip to content

Commit

Permalink
span (experimental): Compress short exit spans (#1134)
Browse files Browse the repository at this point in the history
Implements the span compression algorithm described in elastic/apm#432.
The implementation pretty close to what the spec describes, with a few
modifications due to the differences in the Go agent implementation. The
span compression is experimental and is disabled by default with the
default thresholds for the different compression strategies set:

- `ELASTIC_APM_SPAN_COMPRESSION_ENABLED=false`
- `ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION=50ms`
- `ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION=5ms`

The implementation uses a new field (cache `*Span`) in `TransactionData`
and `SpanData` which is used to cache compressed or compression eligible
spans in that field. An additional `composite` field has also been added
to the `SpanData` and `TransactionData`.

The algorithm states that any exit span that is lower or equal duration
than the set `ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION` or
`ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION` with a destination
service set is compressable into a composite span using the appropriate
strategy. Compressable spans need to be "exit" and siblings which **have
not** propagated their context downstream. Sibling spans are spans that
share the same parent span (or transaction).

Spans can be compressed with `same_kind` or `exact_match` as a strategy,
and their sum is the aggregated duration of of all
When a compressed span has been compressed into a composite using the
`same_kind` strategy, its name is mutated to `Calls to <span.Type>`.

Spans will be compressed into a composite only when the siblings are
consecutive and any compression attempt that doesn't meet that,
will cause the cache to be evicted.

Additionally, two helper functions have been added under the `apmtest`:
package: `WriteTraceTable` and `WriteTraceWaterfall`, which help with
understanding why a test is failing if they are and debugging those.

Signed-off-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
marclop authored Oct 13, 2021
1 parent 36bdfef commit 27abca2
Show file tree
Hide file tree
Showing 19 changed files with 1,688 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ https://github.com/elastic/apm-agent-go/compare/v1.14.0...master[View commits]
- Deprecate `http.request.socket.encrypted` and stop recording it in `module/apmhttp`, `module/apmgrpc` and `module/apmfiber`. {pull}1129[#(1129)]
- Collect and send span destination service timing statistics about the dropped spans to the apm-server. {pull}1132[#(1132)]
- Experimental support to compress short exit spans into a composite span. Disabled by default. {pull}1134[#(1134)]
[[release-notes-1.x]]
=== Go Agent version 1.x
Expand Down
161 changes: 161 additions & 0 deletions apmtest/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package apmtest // import "go.elastic.co/apm/apmtest"

import (
"bytes"
"fmt"
"io"
"sort"
"text/tabwriter"
"time"
"unicode/utf8"

"go.elastic.co/apm/internal/apmmath"
"go.elastic.co/apm/model"
)

// WriteTraceTable displays the trace as a table which can be used on tests to aid
// debugging.
func WriteTraceTable(writer io.Writer, tx model.Transaction, spans []model.Span) {
w := tabwriter.NewWriter(writer, 2, 4, 2, ' ', tabwriter.TabIndent)
fmt.Fprintln(w, "#\tNAME\tTYPE\tCOMP\tN\tDURATION\tOFFSET\tSPAN ID\tPARENT ID\tTRACE ID")

fmt.Fprintf(w, "TX\t%s\t%s\t-\t-\t%f\t%d\t%x\t%x\t%x\n", tx.Name,
tx.Type, tx.Duration,
0,
tx.ID, tx.ParentID, tx.TraceID,
)

sort.SliceStable(spans, func(i, j int) bool {
return time.Time(spans[i].Timestamp).Before(time.Time(spans[j].Timestamp))
})
for i, span := range spans {
count := 1
if span.Composite != nil {
count = span.Composite.Count
}

fmt.Fprintf(w, "%d\t%s\t%s\t%v\t%d\t%f\t+%d\t%x\t%x\t%x\n", i, span.Name,
span.Type, span.Composite != nil, count, span.Duration,
time.Time(span.Timestamp).Sub(time.Time(tx.Timestamp))/1e3,
span.ID, span.ParentID, span.TraceID,
)
}
w.Flush()
}

// WriteTraceWaterfall the trace waterfall "console output" to the specified
// writer sorted by timestamp.
func WriteTraceWaterfall(w io.Writer, tx model.Transaction, spans []model.Span) {
maxDuration := time.Duration(tx.Duration * float64(time.Millisecond))
if maxDuration == 0 {
for _, span := range spans {
maxDuration += time.Duration(span.Duration * float64(time.Millisecond))
}
}

maxWidth := int64(72)
buf := new(bytes.Buffer)
if tx.Duration > 0.0 {
writeSpan(buf, int(maxWidth), 0, fmt.Sprintf("transaction (%x) - %s", tx.ID, maxDuration.String()))
}

sort.SliceStable(spans, func(i, j int) bool {
return time.Time(spans[i].Timestamp).Before(time.Time(spans[j].Timestamp))
})

for _, span := range spans {
pos := int(apmmath.Round(
float64(time.Time(span.Timestamp).Sub(time.Time(tx.Timestamp))) /
float64(maxDuration) * float64(maxWidth),
))
tDur := time.Duration(span.Duration * float64(time.Millisecond))
dur := float64(tDur) / float64(maxDuration)
width := int(apmmath.Round(dur * float64(maxWidth)))
if width == int(maxWidth) {
width = int(maxWidth) - 1
}

spancontent := fmt.Sprintf("%s %s - %s",
span.Type, span.Name,
time.Duration(span.Duration*float64(time.Millisecond)).String(),
)
if span.Composite != nil {
spancontent = fmt.Sprintf("%d %s - %s",
span.Composite.Count, span.Name,
time.Duration(span.Duration*float64(time.Millisecond)).String(),
)
}
writeSpan(buf, width, pos, spancontent)
}

io.Copy(w, buf)
}

func writeSpan(buf *bytes.Buffer, width, pos int, content string) {
spaceRune := ' '
fillRune := '_'
startRune := '|'
endRune := '|'

// Prevent the spans from going out of bounds.
if pos == width {
pos = pos - 2
} else if pos >= width {
pos = pos - 1
}

for i := 0; i < int(pos); i++ {
buf.WriteRune(spaceRune)
}

if width <= 1 {
width = 1
// Write the first letter of the span type when the width is too small.
startRune, _ = utf8.DecodeRuneInString(content)
}

var written int
written, _ = buf.WriteRune(startRune)
if len(content) >= int(width)-1 {
content = content[:int(width)-1]
}

spacing := (width - len(content) - 2) / 2
for i := 0; i < spacing; i++ {
n, _ := buf.WriteRune(fillRune)
written += n
}

n, _ := buf.WriteString(content)
written += n
for i := 0; i < spacing; i++ {
n, _ := buf.WriteRune(fillRune)
written += n
}

if written < width {
buf.WriteRune(fillRune)
}
if width > 1 {
buf.WriteRune(endRune)
}

buf.WriteString("\n")
}
64 changes: 64 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ const (
envUseElasticTraceparentHeader = "ELASTIC_APM_USE_ELASTIC_TRACEPARENT_HEADER"
envCloudProvider = "ELASTIC_APM_CLOUD_PROVIDER"

// NOTE(marclop) Experimental settings
// span_compression (default `false`)
envSpanCompressionEnabled = "ELASTIC_APM_SPAN_COMPRESSION_ENABLED"
// span_compression_exact_match_max_duration (default `50ms`)
envSpanCompressionExactMatchMaxDuration = "ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION"
// span_compression_same_kind_max_duration (default `5ms`)
envSpanCompressionSameKindMaxDuration = "ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION"

// NOTE(axw) profiling environment variables are experimental.
// They may be removed in a future minor version without being
// considered a breaking change.
Expand All @@ -87,6 +95,11 @@ const (
maxAPIRequestSize = 5 * configutil.MByte
minMetricsBufferSize = 10 * configutil.KByte
maxMetricsBufferSize = 100 * configutil.MByte

// Experimental Span Compressions default setting values
defaultSpanCompressionEnabled = false
defaultSpanCompressionExactMatchMaxDuration = 50 * time.Millisecond
defaultSpanCompressionSameKindMaxDuration = 5 * time.Millisecond
)

var (
Expand Down Expand Up @@ -298,6 +311,26 @@ func initialUseElasticTraceparentHeader() (bool, error) {
return configutil.ParseBoolEnv(envUseElasticTraceparentHeader, true)
}

func initialSpanCompressionEnabled() (bool, error) {
return configutil.ParseBoolEnv(envSpanCompressionEnabled,
defaultSpanCompressionEnabled,
)
}

func initialSpanCompressionExactMatchMaxDuration() (time.Duration, error) {
return configutil.ParseDurationEnv(
envSpanCompressionExactMatchMaxDuration,
defaultSpanCompressionExactMatchMaxDuration,
)
}

func initialSpanCompressionSameKindMaxDuration() (time.Duration, error) {
return configutil.ParseDurationEnv(
envSpanCompressionSameKindMaxDuration,
defaultSpanCompressionSameKindMaxDuration,
)
}

func initialCPUProfileIntervalDuration() (time.Duration, time.Duration, error) {
interval, err := configutil.ParseDurationEnv(envCPUProfileInterval, 0)
if err != nil || interval <= 0 {
Expand Down Expand Up @@ -430,6 +463,36 @@ func (t *Tracer) updateRemoteConfig(logger WarningLogger, old, attrs map[string]
delete(attrs, k)
continue
}
case envSpanCompressionEnabled:
val, err := strconv.ParseBool(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
}
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.compressionOptions.enabled = val
})
case envSpanCompressionExactMatchMaxDuration:
duration, err := configutil.ParseDuration(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
}
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.compressionOptions.exactMatchMaxDuration = duration
})
case envSpanCompressionSameKindMaxDuration:
duration, err := configutil.ParseDuration(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
}
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.compressionOptions.sameKindMaxDuration = duration
})
default:
warningf("central config failure: unsupported config: %s", k)
delete(attrs, k)
Expand Down Expand Up @@ -532,4 +595,5 @@ type instrumentationConfigValues struct {
propagateLegacyHeader bool
sanitizedFieldNames wildcard.Matchers
ignoreTransactionURLs wildcard.Matchers
compressionOptions compressionOptions
}
52 changes: 52 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package apm_test
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -127,6 +128,57 @@ func TestTracerCentralConfigUpdate(t *testing.T) {
require.NoError(t, err)
return tracer.IgnoredTransactionURL(u)
})
run("span_compression_enabled", "true", func(tracer *apmtest.RecordingTracer) bool {
tracer.ResetPayloads()
tx := tracer.StartTransaction("name", "type")
exitSpanOpts := apm.SpanOptions{ExitSpan: true}
for i := 0; i < 2; i++ {
span := tx.StartSpanOptions("name", "request", exitSpanOpts)
span.Duration = 50 * time.Millisecond
span.End()
}
tx.End()
tracer.Flush(nil)
return len(tracer.Payloads().Spans) == 1
})
run("span_compression_exact_match_max_duration", "100ms", func(tracer *apmtest.RecordingTracer) bool {
tracer.ResetPayloads()
tracer.SetSpanCompressionEnabled(true)
defer tracer.SetSpanCompressionEnabled(false)
tx := tracer.StartTransaction("name", "type")
exitSpanOpts := apm.SpanOptions{ExitSpan: true}
for i := 0; i < 2; i++ {
span := tx.StartSpanOptions("name", "request", exitSpanOpts)
span.Duration = 100 * time.Millisecond
span.End()
}
// Third span does not get compressed since it exceeds the max duration.
span := tx.StartSpanOptions("name", "request", exitSpanOpts)
span.Duration = 101 * time.Millisecond
span.End()
tx.End()
tracer.Flush(nil)
return len(tracer.Payloads().Spans) == 2
})
run("span_compression_same_kind_max_duration", "10ms", func(tracer *apmtest.RecordingTracer) bool {
tracer.ResetPayloads()
tracer.SetSpanCompressionEnabled(true)
defer tracer.SetSpanCompressionEnabled(false)
tx := tracer.StartTransaction("name", "type")
exitSpanOpts := apm.SpanOptions{ExitSpan: true}
for i := 0; i < 2; i++ {
span := tx.StartSpanOptions(fmt.Sprint(i), "request", exitSpanOpts)
span.Duration = 10 * time.Millisecond
span.End()
}
// Third span does not get compressed since it exceeds the max duration.
span := tx.StartSpanOptions("name", "request", exitSpanOpts)
span.Duration = 11 * time.Millisecond
span.End()
tx.End()
tracer.Flush(nil)
return len(tracer.Payloads().Spans) == 2
})
}

func testTracerCentralConfigUpdate(t *testing.T, logger apm.Logger, serverResponse string, isRemote func(*apmtest.RecordingTracer) bool) {
Expand Down
Loading

0 comments on commit 27abca2

Please sign in to comment.