Skip to content

Commit

Permalink
Merge pull request #132 from palazzem/distributed-tracing
Browse files Browse the repository at this point in the history
[opentracing] Inject and Extract method for Distributed Tracing
  • Loading branch information
Emanuele Palazzetti authored Dec 6, 2017
2 parents bdc55a7 + 6b79fc3 commit 66d30ec
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 6 deletions.
86 changes: 86 additions & 0 deletions opentracing/propagators.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package opentracing

import (
"strconv"
"strings"

ot "github.com/opentracing/opentracing-go"
)

type textMapPropagator struct{}

const (
prefixBaggage = "ot-baggage-"
prefixTracerState = "x-datadog-"

fieldNameTraceID = prefixTracerState + "trace-id"
fieldNameParentID = prefixTracerState + "parent-id"
)

// Inject defines the textMapPropagator to propagate SpanContext data
// out of the current process. The implementation propagates the
// TraceID and the current active SpanID, as well as the Span baggage.
func (p *textMapPropagator) Inject(context ot.SpanContext, carrier interface{}) error {
ctx, ok := context.(SpanContext)
if !ok {
return ot.ErrInvalidSpanContext
}
writer, ok := carrier.(ot.TextMapWriter)
if !ok {
return ot.ErrInvalidCarrier
}

// propagate the TraceID and the current active SpanID
writer.Set(fieldNameTraceID, strconv.FormatUint(ctx.traceID, 16))
writer.Set(fieldNameParentID, strconv.FormatUint(ctx.spanID, 16))

// propagate OpenTracing baggage
for k, v := range ctx.baggage {
writer.Set(prefixBaggage+k, v)
}
return nil
}

// Extract does
func (p *textMapPropagator) Extract(carrier interface{}) (ot.SpanContext, error) {
reader, ok := carrier.(ot.TextMapReader)
if !ok {
return nil, ot.ErrInvalidCarrier
}
var err error
var traceID, parentID uint64
decodedBaggage := make(map[string]string)

// extract SpanContext fields
err = reader.ForeachKey(func(k, v string) error {
switch strings.ToLower(k) {
case fieldNameTraceID:
traceID, err = strconv.ParseUint(v, 16, 64)
if err != nil {
return ot.ErrSpanContextCorrupted
}
case fieldNameParentID:
parentID, err = strconv.ParseUint(v, 16, 64)
if err != nil {
return ot.ErrSpanContextCorrupted
}
default:
lowercaseK := strings.ToLower(k)
if strings.HasPrefix(lowercaseK, prefixBaggage) {
decodedBaggage[strings.TrimPrefix(lowercaseK, prefixBaggage)] = v
}
}

return nil
})

if err != nil {
return nil, err
}

return SpanContext{
traceID: traceID,
spanID: parentID,
baggage: decodedBaggage,
}, nil
}
37 changes: 31 additions & 6 deletions opentracing/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (
// propagation. In the current state, this Tracer is a compatibility layer
// that wraps the Datadog Tracer implementation.
type Tracer struct {
impl *datadog.Tracer // a Datadog Tracer implementation
serviceName string // default Service Name defined in the configuration
impl *datadog.Tracer // a Datadog Tracer implementation
serviceName string // default Service Name defined in the configuration
textPropagator *textMapPropagator // injector for Context propagation
}

// StartSpan creates, starts, and returns a new Span with the given `operationName`
Expand All @@ -34,6 +35,8 @@ func (t *Tracer) startSpanWithOptions(operationName string, options ot.StartSpan
options.StartTime = time.Now().UTC()
}

var context SpanContext
var hasParent bool
var parent *Span
var span *datadog.Span

Expand All @@ -46,13 +49,23 @@ func (t *Tracer) startSpanWithOptions(operationName string, options ot.StartSpan

// if we have parenting define it
if ref.Type == ot.ChildOfRef {
hasParent = true
context = ctx
parent = ctx.span
}
}

if parent == nil {
// create a root Span with the default service name and resource
span = t.impl.NewRootSpan(operationName, t.serviceName, operationName)

if hasParent {
// the Context doesn't have a Span reference because it
// has been propagated from another process, so we set these
// values manually
span.TraceID = context.traceID
span.ParentID = context.spanID
}
} else {
// create a child Span that inherits from a parent
span = t.impl.NewChildSpan(operationName, parent.Span)
Expand Down Expand Up @@ -96,14 +109,26 @@ func (t *Tracer) startSpanWithOptions(operationName string, options ot.StartSpan

// Inject takes the `sm` SpanContext instance and injects it for
// propagation within `carrier`. The actual type of `carrier` depends on
// the value of `format`.
func (t *Tracer) Inject(sp ot.SpanContext, format interface{}, carrier interface{}) error {
return nil
// the value of `format`. Currently supported Injectors are:
// * `TextMap`
// * `HTTPHeaders`
func (t *Tracer) Inject(ctx ot.SpanContext, format interface{}, carrier interface{}) error {
switch format {
case ot.TextMap, ot.HTTPHeaders:
return t.textPropagator.Inject(ctx, carrier)
}

return ot.ErrUnsupportedFormat
}

// Extract returns a SpanContext instance given `format` and `carrier`.
func (t *Tracer) Extract(format interface{}, carrier interface{}) (ot.SpanContext, error) {
return nil, nil
switch format {
case ot.TextMap, ot.HTTPHeaders:
return t.textPropagator.Extract(carrier)
}

return nil, ot.ErrUnsupportedFormat
}

// Close method implements `io.Closer` interface to graceful shutdown the Datadog
Expand Down
42 changes: 42 additions & 0 deletions opentracing/tracer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package opentracing

import (
"net/http"
"testing"
"time"

Expand Down Expand Up @@ -115,3 +116,44 @@ func TestTracerSpanStartTime(t *testing.T) {

assert.Equal(startTime.UnixNano(), span.Span.Start)
}

func TestTracerPropagation(t *testing.T) {
assert := assert.New(t)

config := NewConfiguration()
tracer, _, _ := NewTracer(config)

root := tracer.StartSpan("web.request")
ctx := root.Context()
headers := http.Header{}

// inject the SpanContext
carrier := opentracing.HTTPHeadersCarrier(headers)
err := tracer.Inject(ctx, opentracing.HTTPHeaders, carrier)
assert.Nil(err)

// retrieve the SpanContext
propagated, err := tracer.Extract(opentracing.HTTPHeaders, carrier)
assert.Nil(err)

tCtx, ok := ctx.(SpanContext)
assert.True(ok)
tPropagated, ok := propagated.(SpanContext)
assert.True(ok)

// compare if there is a Context match
assert.Equal(tCtx.traceID, tPropagated.traceID)
assert.Equal(tCtx.spanID, tPropagated.spanID)

// ensure a child can be created
child := tracer.StartSpan("db.query", opentracing.ChildOf(propagated))
tRoot, ok := root.(*Span)
assert.True(ok)
tChild, ok := child.(*Span)
assert.True(ok)

assert.NotEqual(uint64(0), tChild.Span.TraceID)
assert.NotEqual(uint64(0), tChild.Span.SpanID)
assert.Equal(tRoot.Span.SpanID, tChild.Span.ParentID)
assert.Equal(tRoot.Span.TraceID, tChild.Span.ParentID)
}

0 comments on commit 66d30ec

Please sign in to comment.