Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

span (experimental): Compress short exit spans #1134

Merged
merged 24 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions apmtest/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package apmtest // import "go.elastic.co/apm/apmtest"

import (
"bytes"
"fmt"
"io"
"math"
"sort"
"text/tabwriter"
"time"
"unicode/utf8"

"go.elastic.co/apm/model"
)

// WriteTraceTable displays the trace as a table which can be used on tests to aid
// debugging.
func WriteTraceTable(writer io.Writer, tx model.Transaction, spans []model.Span) {
w := tabwriter.NewWriter(writer, 2, 4, 2, ' ', tabwriter.TabIndent)
fmt.Fprintln(w, "#\tNAME\tTYPE\tCOMP\tN\tDURATION\tOFFSET\tSPAN ID\tPARENT ID\tTRACE ID")

fmt.Fprintf(w, "TX\t%s\t%s\t-\t-\t%f\t%d\t%x\t%x\t%x\n", tx.Name,
tx.Type, tx.Duration,
0,
tx.ID, tx.ParentID, tx.TraceID,
)

sort.SliceStable(spans, func(i, j int) bool {
return time.Time(spans[i].Timestamp).Before(time.Time(spans[j].Timestamp))
})
for i, span := range spans {
count := 1
if span.Composite != nil {
count = span.Composite.Count
}

fmt.Fprintf(w, "%d\t%s\t%s\t%v\t%d\t%f\t+%d\t%x\t%x\t%x\n", i, span.Name,
span.Type, span.Composite != nil, count, span.Duration,
time.Time(span.Timestamp).Sub(time.Time(tx.Timestamp))/1e3,
span.ID, span.ParentID, span.TraceID,
)
}
w.Flush()
}

// WriteTraceWaterfall the trace waterfall "console output" to the specified
// writer sorted by timestamp.
func WriteTraceWaterfall(w io.Writer, tx model.Transaction, spans []model.Span) {
maxDuration := time.Duration(tx.Duration * float64(time.Millisecond))
if maxDuration == 0 {
for _, span := range spans {
maxDuration += time.Duration(span.Duration * float64(time.Millisecond))
}
}

maxWidth := int64(220)
buf := new(bytes.Buffer)
if tx.Duration > 0.0 {
writeSpan(buf, int(maxWidth), 0, fmt.Sprintf("transaction (%x) - %s", tx.ID, maxDuration.String()))
}

sort.SliceStable(spans, func(i, j int) bool {
return time.Time(spans[i].Timestamp).Before(time.Time(spans[j].Timestamp))
})

for _, span := range spans {
pos := int(math.Round(
float64(time.Time(span.Timestamp).Sub(time.Time(tx.Timestamp))) /
float64(maxDuration) * float64(maxWidth),
))
tDur := time.Duration(span.Duration * float64(time.Millisecond))
dur := float64(tDur) / float64(maxDuration)
width := int(math.Round(dur * float64(maxWidth)))
if width == int(maxWidth) {
width = int(maxWidth) - 1
}

spancontent := fmt.Sprintf("%s %s - %s",
span.Type, span.Name,
time.Duration(span.Duration*float64(time.Millisecond)).String(),
)
if span.Composite != nil {
spancontent = fmt.Sprintf("%d*%s - %s",
span.Composite.Count, span.Name,
time.Duration(span.Duration*float64(time.Millisecond)).String(),
)
}
writeSpan(buf, width, pos, spancontent)
}

io.Copy(w, buf)
}

func writeSpan(buf *bytes.Buffer, width, pos int, content string) {
spaceRune := ' '
fillRune := ' '
startRune := '['
endRune := ']'

// Prevent the spans from going out of bounds.
if pos == width {
pos = pos - 2
} else if pos >= width {
pos = pos - 1
}

for i := 0; i < int(pos); i++ {
buf.WriteRune(spaceRune)
}

if width <= 1 {
width = 1
// Write the first letter of the span type when the width is too small.
startRune, _ = utf8.DecodeRuneInString(content)
}

var written int
written, _ = buf.WriteRune(startRune)
if len(content) >= int(width)-1 {
content = content[:int(width)-1]
}

spacing := (width - len(content) - 2) / 2
for i := 0; i < spacing; i++ {
n, _ := buf.WriteRune(fillRune)
written += n
}

n, _ := buf.WriteString(content)
written += n
for i := 0; i < spacing; i++ {
n, _ := buf.WriteRune(fillRune)
written += n
}

if written < width {
buf.WriteRune(fillRune)
}
if width > 1 {
buf.WriteRune(endRune)
}

buf.WriteString("\n")
}
38 changes: 38 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ const (
envUseElasticTraceparentHeader = "ELASTIC_APM_USE_ELASTIC_TRACEPARENT_HEADER"
envCloudProvider = "ELASTIC_APM_CLOUD_PROVIDER"

// NOTE(marclop) Experimental settings
// span_compression (default `false`)
envSpanCompressionEnabled = "ELASTIC_APM_SPAN_COMPRESSION_ENABLED"
// span_compression_exact_match_max_duration (default `50ms`)
envSpanCompressionExactMatchMaxDuration = "ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION"
// span_compression_same_kind_max_duration (default `5ms`)
envSpanCompressionSameKindMaxDuration = "ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION"

// NOTE(axw) profiling environment variables are experimental.
// They may be removed in a future minor version without being
// considered a breaking change.
Expand All @@ -87,6 +95,11 @@ const (
maxAPIRequestSize = 5 * configutil.MByte
minMetricsBufferSize = 10 * configutil.KByte
maxMetricsBufferSize = 100 * configutil.MByte

// Experinmental Span Compressions default setting values
defaultSpanCompressionEnabled = false
defaultSpanCompressionExactMatchMaxDuration = 50 * time.Millisecond
defaultSpanCompressionSameKindMaxDuration = 5 * time.Millisecond
)

var (
Expand Down Expand Up @@ -298,6 +311,26 @@ func initialUseElasticTraceparentHeader() (bool, error) {
return configutil.ParseBoolEnv(envUseElasticTraceparentHeader, true)
}

func initialSpanCompressionEnabled() (bool, error) {
return configutil.ParseBoolEnv(envSpanCompressionEnabled,
defaultSpanCompressionEnabled,
)
}

func initialSpanCompressionExactMatchMaxDuration() (time.Duration, error) {
return configutil.ParseDurationEnv(
envSpanCompressionExactMatchMaxDuration,
defaultSpanCompressionExactMatchMaxDuration,
)
}

func initialSpanCompressionSameKindMaxDuration() (time.Duration, error) {
return configutil.ParseDurationEnv(
envSpanCompressionSameKindMaxDuration,
defaultSpanCompressionSameKindMaxDuration,
)
}

func initialCPUProfileIntervalDuration() (time.Duration, time.Duration, error) {
interval, err := configutil.ParseDurationEnv(envCPUProfileInterval, 0)
if err != nil || interval <= 0 {
Expand Down Expand Up @@ -532,4 +565,9 @@ type instrumentationConfigValues struct {
propagateLegacyHeader bool
sanitizedFieldNames wildcard.Matchers
ignoreTransactionURLs wildcard.Matchers

// compressed spans.
spanCompressionEnabled bool
spanCompressionExactMatchMaxDuration time.Duration
spanCompressionSameKindMaxDuration time.Duration
}
18 changes: 18 additions & 0 deletions model/marshal_fastjson.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ type Span struct {

// Outcome holds the span outcome: success, failure, or unknown.
Outcome string `json:"outcome,omitempty"`

// Composite is set when the span is a composite span and represents an
// aggregated set of spans as defined by `composite.compression_strategy`.
Composite *CompositeSpan `json:"composite,omitempty"`
}

// SpanContext holds contextual information relating to the span.
Expand Down Expand Up @@ -432,6 +436,19 @@ type HTTPSpanContext struct {
StatusCode int `json:"status_code,omitempty"`
}

// CompositeSpan holds details on a group of spans represented by a single one.
type CompositeSpan struct {
// Count is the number of compressed spans the composite span represents.
// The minimum count is 2, as a composite span represents at least two spans.
Count int `json:"count"`
// Sum is the durations of all compressed spans this composite span
// represents in milliseconds.
Sum float64 `json:"sum"`
// A string value indicating which compression strategy was used. The valid
// values are `exact_match` and `same_kind`.
CompressionStrategy string `json:"compression_strategy"`
}

// Context holds contextual information relating to a transaction or error.
type Context struct {
// Custom holds custom context relating to the transaction or error.
Expand Down
3 changes: 3 additions & 0 deletions modelwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func (w *modelWriter) buildModelSpan(out *model.Span, span *Span, sd *SpanData)
out.Duration = sd.Duration.Seconds() * 1000
out.Outcome = normalizeOutcome(sd.Outcome)
out.Context = sd.Context.build()
if span.composite.count > 1 {
out.Composite = span.composite.build()
}

// Copy the span type to context.destination.service.type.
if out.Context != nil && out.Context.Destination != nil && out.Context.Destination.Service != nil {
Expand Down
20 changes: 20 additions & 0 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ type Span struct {
transactionID SpanID
parentID SpanID
exit bool
ctxPropagated bool
composite compositeSpan
buffer spanBuffer
marclop marked this conversation as resolved.
Show resolved Hide resolved

mu sync.RWMutex

Expand All @@ -266,6 +269,10 @@ func (s *Span) TraceContext() TraceContext {
if s == nil {
return TraceContext{}
}
// TODO(marclop) maybe use atomic
marclop marked this conversation as resolved.
Show resolved Hide resolved
s.mu.Lock()
defer s.mu.Unlock()
s.ctxPropagated = true
return s.traceContext
}

Expand Down Expand Up @@ -333,6 +340,12 @@ func (s *Span) End() {
if len(s.stacktrace) == 0 && s.Duration >= s.stackFramesMinDuration {
s.setStacktrace(1)
}

compressionEnabled := s.tracer.instrumentationConfig().spanCompressionEnabled
if s.attemptCompress(compressionEnabled) {
return
marclop marked this conversation as resolved.
Show resolved Hide resolved
}

if s.tx != nil {
s.reportSelfTime()
}
Expand Down Expand Up @@ -374,6 +387,13 @@ func (s *Span) reportSelfTime() {

s.tx.TransactionData.mu.Lock()
defer s.tx.TransactionData.mu.Unlock()
s.reportSelfTimeLockless(endTime)
}

// reportSelfTimeLockless is used by all spans (composite and regular spans) to
// report their timers. Since the composite end is triggered by a span of the
// same kind or its parent ending, the upstream locks are already acquired.
marclop marked this conversation as resolved.
Show resolved Hide resolved
func (s *Span) reportSelfTimeLockless(endTime time.Time) {
if s.parent != nil {
if !s.parent.ended() {
s.parent.childrenTimer.childEnded(endTime)
Expand Down
Loading