Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

span (experimental): Compress short exit spans #1134

Merged
merged 24 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ https://github.com/elastic/apm-agent-go/compare/v1.14.0...master[View commits]

- Deprecate `http.request.socket.encrypted` and stop recording it in `module/apmhttp`, `module/apmgrpc` and `module/apmfiber`. {pull}1129[#(1129)]
- Collect and send span destination service timing statistics about the dropped spans to the apm-server. {pull}1132[#(1132)]
- Experimental support to compress short exit spans into a composite span. Disabled by default. {pull}1134[#(1134)]

[[release-notes-1.x]]
=== Go Agent version 1.x
Expand Down
161 changes: 161 additions & 0 deletions apmtest/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package apmtest // import "go.elastic.co/apm/apmtest"

import (
"bytes"
"fmt"
"io"
"sort"
"text/tabwriter"
"time"
"unicode/utf8"

"go.elastic.co/apm/internal/apmmath"
"go.elastic.co/apm/model"
)

// WriteTraceTable displays the trace as a table which can be used on tests to aid
// debugging.
func WriteTraceTable(writer io.Writer, tx model.Transaction, spans []model.Span) {
w := tabwriter.NewWriter(writer, 2, 4, 2, ' ', tabwriter.TabIndent)
fmt.Fprintln(w, "#\tNAME\tTYPE\tCOMP\tN\tDURATION\tOFFSET\tSPAN ID\tPARENT ID\tTRACE ID")

fmt.Fprintf(w, "TX\t%s\t%s\t-\t-\t%f\t%d\t%x\t%x\t%x\n", tx.Name,
tx.Type, tx.Duration,
0,
tx.ID, tx.ParentID, tx.TraceID,
)

sort.SliceStable(spans, func(i, j int) bool {
return time.Time(spans[i].Timestamp).Before(time.Time(spans[j].Timestamp))
})
for i, span := range spans {
count := 1
if span.Composite != nil {
count = span.Composite.Count
}

fmt.Fprintf(w, "%d\t%s\t%s\t%v\t%d\t%f\t+%d\t%x\t%x\t%x\n", i, span.Name,
span.Type, span.Composite != nil, count, span.Duration,
time.Time(span.Timestamp).Sub(time.Time(tx.Timestamp))/1e3,
span.ID, span.ParentID, span.TraceID,
)
}
w.Flush()
}

// WriteTraceWaterfall the trace waterfall "console output" to the specified
// writer sorted by timestamp.
func WriteTraceWaterfall(w io.Writer, tx model.Transaction, spans []model.Span) {
maxDuration := time.Duration(tx.Duration * float64(time.Millisecond))
if maxDuration == 0 {
for _, span := range spans {
maxDuration += time.Duration(span.Duration * float64(time.Millisecond))
}
}

maxWidth := int64(72)
buf := new(bytes.Buffer)
if tx.Duration > 0.0 {
writeSpan(buf, int(maxWidth), 0, fmt.Sprintf("transaction (%x) - %s", tx.ID, maxDuration.String()))
}

sort.SliceStable(spans, func(i, j int) bool {
return time.Time(spans[i].Timestamp).Before(time.Time(spans[j].Timestamp))
})

for _, span := range spans {
pos := int(apmmath.Round(
float64(time.Time(span.Timestamp).Sub(time.Time(tx.Timestamp))) /
float64(maxDuration) * float64(maxWidth),
))
tDur := time.Duration(span.Duration * float64(time.Millisecond))
dur := float64(tDur) / float64(maxDuration)
width := int(apmmath.Round(dur * float64(maxWidth)))
if width == int(maxWidth) {
width = int(maxWidth) - 1
}

spancontent := fmt.Sprintf("%s %s - %s",
span.Type, span.Name,
time.Duration(span.Duration*float64(time.Millisecond)).String(),
)
if span.Composite != nil {
spancontent = fmt.Sprintf("%d %s - %s",
span.Composite.Count, span.Name,
time.Duration(span.Duration*float64(time.Millisecond)).String(),
)
}
writeSpan(buf, width, pos, spancontent)
}

io.Copy(w, buf)
}

func writeSpan(buf *bytes.Buffer, width, pos int, content string) {
spaceRune := ' '
fillRune := '_'
startRune := '|'
endRune := '|'

// Prevent the spans from going out of bounds.
if pos == width {
pos = pos - 2
} else if pos >= width {
pos = pos - 1
}

for i := 0; i < int(pos); i++ {
buf.WriteRune(spaceRune)
}

if width <= 1 {
width = 1
// Write the first letter of the span type when the width is too small.
startRune, _ = utf8.DecodeRuneInString(content)
}

var written int
written, _ = buf.WriteRune(startRune)
if len(content) >= int(width)-1 {
content = content[:int(width)-1]
}

spacing := (width - len(content) - 2) / 2
for i := 0; i < spacing; i++ {
n, _ := buf.WriteRune(fillRune)
written += n
}

n, _ := buf.WriteString(content)
written += n
for i := 0; i < spacing; i++ {
n, _ := buf.WriteRune(fillRune)
written += n
}

if written < width {
buf.WriteRune(fillRune)
}
if width > 1 {
buf.WriteRune(endRune)
}

buf.WriteString("\n")
}
64 changes: 64 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ const (
envUseElasticTraceparentHeader = "ELASTIC_APM_USE_ELASTIC_TRACEPARENT_HEADER"
envCloudProvider = "ELASTIC_APM_CLOUD_PROVIDER"

// NOTE(marclop) Experimental settings
// span_compression (default `false`)
envSpanCompressionEnabled = "ELASTIC_APM_SPAN_COMPRESSION_ENABLED"
// span_compression_exact_match_max_duration (default `50ms`)
envSpanCompressionExactMatchMaxDuration = "ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION"
// span_compression_same_kind_max_duration (default `5ms`)
envSpanCompressionSameKindMaxDuration = "ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION"

// NOTE(axw) profiling environment variables are experimental.
// They may be removed in a future minor version without being
// considered a breaking change.
Expand All @@ -87,6 +95,11 @@ const (
maxAPIRequestSize = 5 * configutil.MByte
minMetricsBufferSize = 10 * configutil.KByte
maxMetricsBufferSize = 100 * configutil.MByte

// Experimental Span Compressions default setting values
defaultSpanCompressionEnabled = false
defaultSpanCompressionExactMatchMaxDuration = 50 * time.Millisecond
defaultSpanCompressionSameKindMaxDuration = 5 * time.Millisecond
)

var (
Expand Down Expand Up @@ -298,6 +311,26 @@ func initialUseElasticTraceparentHeader() (bool, error) {
return configutil.ParseBoolEnv(envUseElasticTraceparentHeader, true)
}

func initialSpanCompressionEnabled() (bool, error) {
return configutil.ParseBoolEnv(envSpanCompressionEnabled,
defaultSpanCompressionEnabled,
)
}

func initialSpanCompressionExactMatchMaxDuration() (time.Duration, error) {
return configutil.ParseDurationEnv(
envSpanCompressionExactMatchMaxDuration,
defaultSpanCompressionExactMatchMaxDuration,
)
}

func initialSpanCompressionSameKindMaxDuration() (time.Duration, error) {
return configutil.ParseDurationEnv(
envSpanCompressionSameKindMaxDuration,
defaultSpanCompressionSameKindMaxDuration,
)
}

func initialCPUProfileIntervalDuration() (time.Duration, time.Duration, error) {
interval, err := configutil.ParseDurationEnv(envCPUProfileInterval, 0)
if err != nil || interval <= 0 {
Expand Down Expand Up @@ -430,6 +463,36 @@ func (t *Tracer) updateRemoteConfig(logger WarningLogger, old, attrs map[string]
delete(attrs, k)
continue
}
case envSpanCompressionEnabled:
val, err := strconv.ParseBool(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
}
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.compressionOptions.enabled = val
})
case envSpanCompressionExactMatchMaxDuration:
duration, err := configutil.ParseDuration(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
}
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.compressionOptions.exactMatchMaxDuration = duration
})
case envSpanCompressionSameKindMaxDuration:
duration, err := configutil.ParseDuration(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
}
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.compressionOptions.sameKindMaxDuration = duration
})
default:
warningf("central config failure: unsupported config: %s", k)
delete(attrs, k)
Expand Down Expand Up @@ -532,4 +595,5 @@ type instrumentationConfigValues struct {
propagateLegacyHeader bool
sanitizedFieldNames wildcard.Matchers
ignoreTransactionURLs wildcard.Matchers
compressionOptions compressionOptions
}
52 changes: 52 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package apm_test
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -127,6 +128,57 @@ func TestTracerCentralConfigUpdate(t *testing.T) {
require.NoError(t, err)
return tracer.IgnoredTransactionURL(u)
})
run("span_compression_enabled", "true", func(tracer *apmtest.RecordingTracer) bool {
tracer.ResetPayloads()
tx := tracer.StartTransaction("name", "type")
exitSpanOpts := apm.SpanOptions{ExitSpan: true}
for i := 0; i < 2; i++ {
span := tx.StartSpanOptions("name", "request", exitSpanOpts)
span.Duration = 50 * time.Millisecond
span.End()
}
tx.End()
tracer.Flush(nil)
return len(tracer.Payloads().Spans) == 1
})
run("span_compression_exact_match_max_duration", "100ms", func(tracer *apmtest.RecordingTracer) bool {
tracer.ResetPayloads()
tracer.SetSpanCompressionEnabled(true)
defer tracer.SetSpanCompressionEnabled(false)
tx := tracer.StartTransaction("name", "type")
exitSpanOpts := apm.SpanOptions{ExitSpan: true}
for i := 0; i < 2; i++ {
span := tx.StartSpanOptions("name", "request", exitSpanOpts)
span.Duration = 100 * time.Millisecond
span.End()
}
// Third span does not get compressed since it exceeds the max duration.
span := tx.StartSpanOptions("name", "request", exitSpanOpts)
span.Duration = 101 * time.Millisecond
span.End()
tx.End()
tracer.Flush(nil)
return len(tracer.Payloads().Spans) == 2
})
run("span_compression_same_kind_max_duration", "10ms", func(tracer *apmtest.RecordingTracer) bool {
tracer.ResetPayloads()
tracer.SetSpanCompressionEnabled(true)
defer tracer.SetSpanCompressionEnabled(false)
tx := tracer.StartTransaction("name", "type")
exitSpanOpts := apm.SpanOptions{ExitSpan: true}
for i := 0; i < 2; i++ {
span := tx.StartSpanOptions(fmt.Sprint(i), "request", exitSpanOpts)
span.Duration = 10 * time.Millisecond
span.End()
}
// Third span does not get compressed since it exceeds the max duration.
span := tx.StartSpanOptions("name", "request", exitSpanOpts)
span.Duration = 11 * time.Millisecond
span.End()
tx.End()
tracer.Flush(nil)
return len(tracer.Payloads().Spans) == 2
})
}

func testTracerCentralConfigUpdate(t *testing.T, logger apm.Logger, serverResponse string, isRemote func(*apmtest.RecordingTracer) bool) {
Expand Down
Loading