Skip to content

Commit

Permalink
ddtrace/tracer: added priority sampler (#371)
Browse files Browse the repository at this point in the history
* ddtrace/tracer: added priority sampler

* ddtrace/tracer: more tests to ensure context is also updated
  • Loading branch information
gbbr authored Dec 7, 2018
1 parent 26227cf commit 5a81392
Show file tree
Hide file tree
Showing 9 changed files with 394 additions and 45 deletions.
14 changes: 7 additions & 7 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,22 @@ jobs:

- restore_cache:
keys:
- v1-librdkafka-v0.11.5
- v1-librdkafka-v1.0.0-RC4
- run:
name: Install rdkafka
command: |
if [ ! -d /tmp/librdkafka-v0.11.5 ] ; then
if [ ! -d /tmp/librdkafka-v1.0.0-RC4 ] ; then
echo "building librdkafka"
git clone --branch v0.11.5 https://github.com/edenhill/librdkafka.git /tmp/librdkafka-v0.11.5
(cd /tmp/librdkafka-v0.11.5 && ./configure && make)
git clone --branch v1.0.0-RC4 https://github.com/edenhill/librdkafka.git /tmp/librdkafka-v1.0.0-RC4
(cd /tmp/librdkafka-v1.0.0-RC4 && ./configure && make)
fi
echo "installing librdkafka"
(cd /tmp/librdkafka-v0.11.5 && sudo make install)
(cd /tmp/librdkafka-v1.0.0-RC4 && sudo make install)
sudo ldconfig
- save_cache:
key: v1-librdkafka-v0.11.5
key: v1-librdkafka-v1.0.0-RC4
paths:
- /tmp/librdkafka-v0.11.5
- /tmp/librdkafka-v1.0.0-RC4

- run:
name: Vendor gRPC v1.2.0
Expand Down
13 changes: 13 additions & 0 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ type config struct {
// sampler specifies the sampler that will be used for sampling traces.
sampler Sampler

// prioritySampling will be non-nil when priority sampling is enabled.
prioritySampling *prioritySampler

// agentAddr specifies the hostname and of the agent where the traces
// are sent to.
agentAddr string
Expand Down Expand Up @@ -49,6 +52,16 @@ func defaults(c *config) {
c.agentAddr = defaultAddress
}

// WithPrioritySampling enables priority sampling on the active tracer instance. When using
// distributed tracing, this option must be enabled in order to get all the parts of a distributed
// trace sampled. To learn more about priority sampling, please visit:
// https://docs.datadoghq.com/tracing/getting_further/trace_sampling_and_storage/#priority-sampling-for-distributed-tracing
func WithPrioritySampling() StartOption {
return func(c *config) {
c.prioritySampling = newPrioritySampler()
}
}

// WithDebugMode enables debug mode on the tracer, resulting in more verbose logging.
func WithDebugMode(enabled bool) StartOption {
return func(c *config) {
Expand Down
72 changes: 70 additions & 2 deletions ddtrace/tracer/sampler.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package tracer

import (
"encoding/json"
"io"
"math"
"sync"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
)

// Sampler is the generic interface of any sampler. It must be safe for concurrent use.
Expand Down Expand Up @@ -65,8 +68,73 @@ func (r *rateSampler) Sample(spn ddtrace.Span) bool {
}
r.RLock()
defer r.RUnlock()
if r.rate < 1 {
return s.TraceID*knuthFactor < uint64(r.rate*math.MaxUint64)
return sampledByRate(s.TraceID, r.rate)
}

// sampledByRate verifies if the number n should be sampled at the specified
// rate.
func sampledByRate(n uint64, rate float64) bool {
if rate < 1 {
return n*knuthFactor < uint64(rate*math.MaxUint64)
}
return true
}

// prioritySampler holds a set of per-service sampling rates and applies
// them to spans.
type prioritySampler struct {
mu sync.RWMutex
rates map[string]float64
defaultRate float64
}

func newPrioritySampler() *prioritySampler {
return &prioritySampler{
rates: make(map[string]float64),
defaultRate: 1.,
}
}

// readRatesJSON will try to read the rates as JSON from the given io.ReadCloser.
func (ps *prioritySampler) readRatesJSON(rc io.ReadCloser) error {
var payload struct {
Rates map[string]float64 `json:"rate_by_service"`
}
if err := json.NewDecoder(rc).Decode(&payload); err != nil {
return err
}
rc.Close()
const defaultRateKey = "service:,env:"
ps.mu.Lock()
defer ps.mu.Unlock()
ps.rates = payload.Rates
if v, ok := ps.rates[defaultRateKey]; ok {
ps.defaultRate = v
delete(ps.rates, defaultRateKey)
}
return nil
}

// getRate returns the sampling rate to be used for the given span. Callers must
// guard the span.
func (ps *prioritySampler) getRate(spn *span) float64 {
key := "service:" + spn.Service + ",env:" + spn.Meta[ext.Environment]
ps.mu.RLock()
defer ps.mu.RUnlock()
if rate, ok := ps.rates[key]; ok {
return rate
}
return ps.defaultRate
}

// apply applies sampling priority to the given span. Caller must ensure it is safe
// to modify the span.
func (ps *prioritySampler) apply(spn *span) {
rate := ps.getRate(spn)
if sampledByRate(spn.TraceID, rate) {
spn.SetTag(ext.SamplingPriority, ext.PriorityAutoKeep)
} else {
spn.SetTag(ext.SamplingPriority, ext.PriorityAutoReject)
}
spn.SetTag(samplingPriorityRateKey, rate)
}
151 changes: 151 additions & 0 deletions ddtrace/tracer/sampler_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,164 @@
package tracer

import (
"io/ioutil"
"math"
"strings"
"sync"
"testing"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"

"github.com/stretchr/testify/assert"
)

func TestPrioritySampler(t *testing.T) {
// create a new span with given service/env
mkSpan := func(svc, env string) *span {
s := &span{Service: svc, Meta: map[string]string{}}
if env != "" {
s.Meta["env"] = env
}
return s
}

t.Run("mkspan", func(t *testing.T) {
assert := assert.New(t)
s := mkSpan("my-service", "my-env")
assert.Equal("my-service", s.Service)
assert.Equal("my-env", s.Meta[ext.Environment])

s = mkSpan("my-service2", "")
assert.Equal("my-service2", s.Service)
_, ok := s.Meta[ext.Environment]
assert.False(ok)
})

t.Run("ops", func(t *testing.T) {
ps := newPrioritySampler()
assert := assert.New(t)

type key struct{ service, env string }
for _, tt := range []struct {
in string
out map[key]float64
}{
{
in: `{}`,
out: map[key]float64{
key{"some-service", ""}: 1,
key{"obfuscate.http", "none"}: 1,
},
},
{
in: `{
"rate_by_service":{
"service:,env:":0.8,
"service:obfuscate.http,env:":0.9,
"service:obfuscate.http,env:none":0.9
}
}`,
out: map[key]float64{
key{"obfuscate.http", ""}: 0.9,
key{"obfuscate.http", "none"}: 0.9,
key{"obfuscate.http", "other"}: 0.8,
key{"some-service", ""}: 0.8,
},
},
{
in: `{
"rate_by_service":{
"service:my-service,env:":0.2,
"service:my-service,env:none":0.2
}
}`,
out: map[key]float64{
key{"my-service", ""}: 0.2,
key{"my-service", "none"}: 0.2,
key{"obfuscate.http", ""}: 0.8,
key{"obfuscate.http", "none"}: 0.8,
key{"obfuscate.http", "other"}: 0.8,
key{"some-service", ""}: 0.8,
},
},
} {
assert.NoError(ps.readRatesJSON(ioutil.NopCloser(strings.NewReader(tt.in))))
for k, v := range tt.out {
assert.Equal(v, ps.getRate(mkSpan(k.service, k.env)), k)
}
}
})

t.Run("race", func(t *testing.T) {
ps := newPrioritySampler()
assert := assert.New(t)

var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 500; i++ {
assert.NoError(ps.readRatesJSON(
ioutil.NopCloser(strings.NewReader(
`{
"rate_by_service":{
"service:,env:":0.8,
"service:obfuscate.http,env:none":0.9
}
}`,
)),
))
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 500; i++ {
ps.getRate(mkSpan("obfuscate.http", "none"))
ps.getRate(mkSpan("other.service", "none"))
}
}()

wg.Wait()
})

t.Run("apply", func(t *testing.T) {
ps := newPrioritySampler()
assert := assert.New(t)
assert.NoError(ps.readRatesJSON(
ioutil.NopCloser(strings.NewReader(
`{
"rate_by_service":{
"service:obfuscate.http,env:":0.5,
"service:obfuscate.http,env:none":0.5
}
}`,
)),
))

testSpan1 := newBasicSpan("http.request")
testSpan1.Service = "obfuscate.http"
testSpan1.TraceID = math.MaxUint64 - (math.MaxUint64 / 4)

ps.apply(testSpan1)
assert.EqualValues(ext.PriorityAutoKeep, testSpan1.Metrics[samplingPriorityKey])
assert.EqualValues(0.5, testSpan1.Metrics[samplingPriorityRateKey])

testSpan1.TraceID = math.MaxUint64 - (math.MaxUint64 / 3)
ps.apply(testSpan1)
assert.EqualValues(ext.PriorityAutoReject, testSpan1.Metrics[samplingPriorityKey])
assert.EqualValues(0.5, testSpan1.Metrics[samplingPriorityRateKey])

testSpan1.Service = "other-service"
testSpan1.TraceID = 1
assert.EqualValues(ext.PriorityAutoReject, testSpan1.Metrics[samplingPriorityKey])
assert.EqualValues(0.5, testSpan1.Metrics[samplingPriorityRateKey])
})
}

func TestRateSampler(t *testing.T) {
assert := assert.New(t)
assert.True(NewRateSampler(1).Sample(newBasicSpan("test")))
Expand Down
5 changes: 4 additions & 1 deletion ddtrace/tracer/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,4 +235,7 @@ func (s *span) String() string {
return strings.Join(lines, "\n")
}

const samplingPriorityKey = "_sampling_priority_v1"
const (
samplingPriorityKey = "_sampling_priority_v1"
samplingPriorityRateKey = "_sampling_priority_rate_v1"
)
16 changes: 13 additions & 3 deletions ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,16 +247,16 @@ func (t *tracer) StartSpan(operationName string, options ...ddtrace.StartSpanOpt
span.Metrics[samplingPriorityKey] = float64(context.samplingPriority())
}
if context.span != nil {
// it has a local parent, inherit the service
context.span.RLock()
span.Service = context.span.Service
context.span.RUnlock()
}
}
span.context = newSpanContext(span, context)
if context == nil || context.span == nil {
// this is either a global root span or a process-level root span
// this is either a root span or it has a remote parent, we should add the PID.
span.SetTag(ext.Pid, strconv.Itoa(os.Getpid()))
t.sample(span)
}
// add tags from options
for k, v := range opts.Tags {
Expand All @@ -266,6 +266,10 @@ func (t *tracer) StartSpan(operationName string, options ...ddtrace.StartSpanOpt
for k, v := range t.config.globalTags {
span.SetTag(k, v)
}
if context == nil {
// this is a brand new trace, sample it
t.sample(span)
}
return span
}

Expand Down Expand Up @@ -299,10 +303,13 @@ func (t *tracer) flushTraces() {
if t.config.debug {
log.Printf("Sending payload: size: %d traces: %d\n", size, count)
}
err := t.config.transport.send(t.payload)
rc, err := t.config.transport.send(t.payload)
if err != nil {
t.pushError(&dataLossError{context: err, count: count})
}
if err == nil && t.config.prioritySampling != nil {
t.config.prioritySampling.readRatesJSON(rc) // TODO: handle error?
}
t.payload.reset()
}

Expand Down Expand Up @@ -367,4 +374,7 @@ func (t *tracer) sample(span *span) {
}
span.Metrics[sampleRateMetricKey] = rs.Rate()
}
if t.config.prioritySampling != nil {
t.config.prioritySampling.apply(span)
}
}
Loading

0 comments on commit 5a81392

Please sign in to comment.