Skip to content

Commit

Permalink
[distributed sampling] transition from is_sampled to sampling_priority
Browse files Browse the repository at this point in the history
  • Loading branch information
ufoot committed Sep 27, 2017
1 parent e1e1825 commit 442f96e
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 57 deletions.
58 changes: 19 additions & 39 deletions tracer/contrib/tracegrpc/grpc.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package tracegrpc

import (
"fmt"
"strconv"

"github.com/DataDog/dd-trace-go/tracer"
Expand All @@ -14,9 +13,9 @@ import (

// pass trace ids with these headers
const (
traceIDKey = "x-datadog-trace-id"
parentIDKey = "x-datadog-parent-id"
isSampledKey = "x-datadog-is-sampled"
traceIDKey = "x-datadog-trace-id"
parentIDKey = "x-datadog-parent-id"
samplingPriorityKey = "x-datadog-sampling-priority"
)

// UnaryServerInterceptor will trace requests to the given grpc server.
Expand Down Expand Up @@ -69,64 +68,48 @@ func serverSpan(t *tracer.Tracer, ctx context.Context, method, service string) *
span.SetMeta("gprc.method", method)
span.Type = "go"

traceID, parentID := getIDs(ctx)
traceID, parentID, samplingPriority := getCtxMeta(ctx)
if traceID != 0 && parentID != 0 {
span.TraceID = traceID
t.Sample(span) // depends on trace ID so needs to be updated to maximize the chances we get complete traces
span.ParentID = parentID
if isSampled, ok := getIsSampled(ctx); ok {
span.DistributedSampled = isSampled
}
span.SetSamplingPriority(samplingPriority)
}

return span
}

// setCtxMeta will set the trace ids and the IsSampled attribute on the context.
// setCtxMeta will set the trace ids and the sampling priority on the context.
func setCtxMeta(span *tracer.Span, ctx context.Context) context.Context {
if span == nil || span.TraceID == 0 {
return ctx
}

isSampled := "0"
if span.DistributedSampled {
isSampled = "1"
}
md := metadata.New(map[string]string{
traceIDKey: fmt.Sprint(span.TraceID),
parentIDKey: fmt.Sprint(span.ParentID),
isSampledKey: isSampled,
traceIDKey: strconv.FormatUint(span.TraceID, 10),
parentIDKey: strconv.FormatUint(span.ParentID, 10),
samplingPriorityKey: strconv.Itoa(span.GetSamplingPriority()),
})
if existing, ok := metadata.FromContext(ctx); ok {
md = metadata.Join(existing, md)
}
return metadata.NewContext(ctx, md)
}

// getIDs will return ids embedded in a context.
func getIDs(ctx context.Context) (traceID, parentID uint64) {
// getCtxMeta will return ids and sampling priority embedded in a context.
func getCtxMeta(ctx context.Context) (traceID, parentID uint64, samplingPriority int) {
if md, ok := metadata.FromContext(ctx); ok {
if id := getID(md, traceIDKey); id > 0 {
traceID = id
}
if id := getID(md, parentIDKey); id > 0 {
parentID = id
}
}
return traceID, parentID
}

// getIsSampled will return the isSampled embedded in a context.
func getIsSampled(ctx context.Context) (isSampled, ok bool) {
if md, ok := metadata.FromContext(ctx); ok {
if b, found := getBool(md, isSampledKey); found {
ok = true
isSampled = b
} else {
isSampled = true
if v := getInt(md, samplingPriorityKey); v > 0 {
samplingPriority = v
}
}
return isSampled, ok
return traceID, parentID, samplingPriority
}

// getID parses an id from the metadata.
Expand All @@ -141,15 +124,12 @@ func getID(md metadata.MD, name string) uint64 {
}

// getBool gets a bool from the metadata (0 or 1 converted to bool).
func getBool(md metadata.MD, name string) (bool, bool) {
func getInt(md metadata.MD, name string) int {
for _, str := range md[name] {
if str == "0" {
return false, true
}
n, err := strconv.Atoi(str)
if err == nil && n > 0 {
return true, true
v, err := strconv.Atoi(str)
if err == nil {
return int(v)
}
}
return true, false
return 0
}
46 changes: 30 additions & 16 deletions tracer/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"reflect"
"runtime/debug"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -14,6 +15,8 @@ const (
errorMsgKey = "error.msg"
errorTypeKey = "error.type"
errorStackKey = "error.stack"

samplingPriorityKey = "sampling.priority"
)

// Span represents a computation. Callers must call Finish when a span is
Expand Down Expand Up @@ -56,11 +59,6 @@ type Span struct {
ParentID uint64 `json:"parent_id"` // identifier of the span's direct parent
Error int32 `json:"error"` // error status of the span; 0 means no errors
Sampled bool `json:"-"` // if this span is sampled (and should be kept/recorded) or not
// Ideally we would put distributed_sampled on root spans only. But this requires either
// writing the marshaller manually or use hacky "hiding" of the field,
// see http://attilaolah.eu/2014/09/10/json-and-struct-composition-in-go/ on how to do this.
// For now, keep it as is, send it all the time, it's minor network overhead.
DistributedSampled bool `json:"distributed_sampled"` // if this span is sampled (as in distributed tracing, all parts must be kept) or not

sync.RWMutex
tracer *Tracer // the tracer that generated this span
Expand All @@ -77,17 +75,16 @@ type Span struct {
// Most of the time one should prefer the Tracer NewRootSpan or NewChildSpan methods.
func NewSpan(name, service, resource string, spanID, traceID, parentID uint64, tracer *Tracer) *Span {
return &Span{
Name: name,
Service: service,
Resource: resource,
Meta: tracer.getAllMeta(),
SpanID: spanID,
TraceID: traceID,
ParentID: parentID,
Start: now(),
Sampled: true,
DistributedSampled: true,
tracer: tracer,
Name: name,
Service: service,
Resource: resource,
Meta: tracer.getAllMeta(),
SpanID: spanID,
TraceID: traceID,
ParentID: parentID,
Start: now(),
Sampled: true,
tracer: tracer,
}
}

Expand Down Expand Up @@ -309,6 +306,23 @@ func (s *Span) Tracer() *Tracer {
return s.tracer
}

// SetSamplingPriority sets the sampling priority.
// Default is 0, any higher value is interpreted as a hint on
// how interesting this span is, and should be kept by the backend.
func (s *Span) SetSamplingPriority(priority int) {
if priority > 0 {
s.SetMeta(samplingPriorityKey, strconv.Itoa(priority))
} else {
delete(s.Meta, samplingPriorityKey)
}
}

// GetSamplingPriority gets the sampling priority.
func (s *Span) GetSamplingPriority() int {
priority, _ := strconv.Atoi(s.GetMeta(samplingPriorityKey))
return priority
}

// NextSpanID returns a new random span id.
func NextSpanID() uint64 {
return uint64(randGen.Int63())
Expand Down
57 changes: 55 additions & 2 deletions tracer/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,42 @@ func TestSpanSetMetas(t *testing.T) {
assert := assert.New(t)
tracer := NewTracer()
span := tracer.NewRootSpan("pylons.request", "pylons", "/")
span.SetSamplingPriority(0) // avoid interferences with "sampling.priority" meta
metas := map[string]string{
"error.msg": "Something wrong",
"error.type": "*errors.errorString",
"status.code": "200",
"system.pid": "29176",
}
extraMetas := map[string]string{
"custom.1": "something custom",
"custom.2": "something even more special",
}
nopMetas := map[string]string{
"nopKey1": "nopValue1",
"nopKey2": "nopValue2",
}

// check the map is properly initialized
span.SetMetas(metas)
assert.Equal(len(span.Meta), len(metas))
assert.Equal(len(metas), len(span.Meta))
for k := range metas {
assert.Equal(metas[k], span.Meta[k])
}

// check a second call adds the new metas, but does not remove old ones
span.SetMetas(extraMetas)
assert.Equal(len(metas)+len(extraMetas), len(span.Meta))
for k := range extraMetas {
assert.Equal(extraMetas[k], span.Meta[k])
}

assert.Equal(span.Meta["status.code"], "200")

// operating on a finished span is a no-op
span.Finish()
span.SetMetas(nopMetas)
assert.Equal(len(span.Meta), len(metas))
assert.Equal(len(metas)+len(extraMetas), len(span.Meta))
for k := range nopMetas {
assert.Equal("", span.Meta[k])
}
Expand Down Expand Up @@ -238,6 +251,46 @@ func TestSpanModifyWhileFlushing(t *testing.T) {
}
}

func TestSpanSamplingPriority(t *testing.T) {
assert := assert.New(t)
tracer := NewTracer()

span := tracer.NewRootSpan("my.name", "my.service", "my.resource")
assert.Equal("1", span.GetMeta("sampling.priority"), "default sampling priority for root spans is 0")
assert.Equal(1, span.GetSamplingPriority(), "default sampling priority for root spans is 1")
childSpan := tracer.NewChildSpan("my.child", span)
assert.Equal(span.GetMeta("sampling.priority"), childSpan.GetMeta("sampling.priority"))
assert.Equal(span.GetSamplingPriority(), childSpan.GetSamplingPriority())

span.SetSamplingPriority(0)
assert.Equal("", span.GetMeta("sampling.priority"), "key has been deleted")
assert.Equal(0, span.GetSamplingPriority(), "by default, sampling priority is 0")
childSpan = tracer.NewChildSpan("my.child", span)
assert.Equal(span.GetMeta("sampling.priority"), childSpan.GetMeta("sampling.priority"))
assert.Equal(span.GetSamplingPriority(), childSpan.GetSamplingPriority())

span.SetSamplingPriority(-1)
assert.Equal("", span.GetMeta("sampling.priority"), "key has been deleted")
assert.Equal(0, span.GetSamplingPriority(), "by default, sampling priority can't be negative")
childSpan = tracer.NewChildSpan("my.child", span)
assert.Equal(span.GetMeta("sampling.priority"), childSpan.GetMeta("sampling.priority"))
assert.Equal(span.GetSamplingPriority(), childSpan.GetSamplingPriority())

span.SetSamplingPriority(1)
assert.Equal("1", span.GetMeta("sampling.priority"), "sampling priority is now 1")
assert.Equal(1, span.GetSamplingPriority(), "sampling priority is now 1")
childSpan = tracer.NewChildSpan("my.child", span)
assert.Equal(span.GetMeta("sampling.priority"), childSpan.GetMeta("sampling.priority"))
assert.Equal(span.GetSamplingPriority(), childSpan.GetSamplingPriority())

span.SetSamplingPriority(42)
assert.Equal("42", span.GetMeta("sampling.priority"), "sampling priority works for values above 1")
assert.Equal(42, span.GetSamplingPriority(), "sampling priority works for values above 1")
childSpan = tracer.NewChildSpan("my.child", span)
assert.Equal(span.GetMeta("sampling.priority"), childSpan.GetMeta("sampling.priority"))
assert.Equal(span.GetSamplingPriority(), childSpan.GetSamplingPriority())
}

type boomError struct{}

func (e *boomError) Error() string { return "boom" }
5 changes: 5 additions & 0 deletions tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ func (t *Tracer) NewRootSpan(name, service, resource string) *Span {

span.buffer = newSpanBuffer(t.channels, 0, 0)
t.Sample(span)
span.SetSamplingPriority(1) // [TODO:christian] introduce distributed sampling here

span.buffer.Push(span)

// Add the process id to all root spans
Expand Down Expand Up @@ -207,8 +209,11 @@ func (t *Tracer) NewChildSpan(name string, parent *Span) *Span {
parent.RLock()
// child that is correctly configured
span := NewSpan(name, parent.Service, name, spanID, parent.TraceID, parent.SpanID, parent.tracer)

// child sampling same as the parent
span.Sampled = parent.Sampled
span.SetSamplingPriority(parent.GetSamplingPriority())

span.parent = parent
span.buffer = parent.buffer
parent.RUnlock()
Expand Down

0 comments on commit 442f96e

Please sign in to comment.