Skip to content

Commit

Permalink
Distributed tracing updates (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgillum authored Nov 1, 2023
1 parent 6644796 commit ba82393
Show file tree
Hide file tree
Showing 7 changed files with 3,065 additions and 956 deletions.
5 changes: 2 additions & 3 deletions internal/helpers/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,8 @@ func NewTimerFiredEvent(
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_TimerFired{
TimerFired: &protos.TimerFiredEvent{
TimerId: timerID,
FireAt: fireAt,
ParentTraceContext: parentTraceContext,
TimerId: timerID,
FireAt: fireAt,
},
},
}
Expand Down
54 changes: 32 additions & 22 deletions internal/helpers/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package helpers

import (
"context"
"encoding/hex"
"reflect"
"strings"
"time"
"unsafe"

Expand Down Expand Up @@ -54,22 +56,13 @@ func StartNewActivitySpan(
}

func StartAndEndNewTimerSpan(ctx context.Context, tf *protos.TimerFiredEvent, createdTime time.Time, instanceID string) error {
if tf.ParentTraceContext == nil {
return nil
}

attributes := []attribute.KeyValue{
{Key: "durabletask.type", Value: attribute.StringValue("timer")},
{Key: "durabletask.fire_at", Value: attribute.StringValue(tf.FireAt.AsTime().Format(time.RFC3339))}, // time.RFC3339 most closely maps to ISO 8601
{Key: "durabletask.task.task_id", Value: attribute.Int64Value(int64(tf.TimerId))},
{Key: "durabletask.task.instance_id", Value: attribute.StringValue(instanceID)},
}

ctx, err := ContextFromTraceContext(ctx, tf.ParentTraceContext)
if err != nil {
return err
}

_, span := startNewSpan(ctx, "timer", "", "", attributes, trace.SpanKindInternal, createdTime)
span.End()
return nil
Expand Down Expand Up @@ -142,27 +135,45 @@ func ContextFromTraceContext(ctx context.Context, tc *protos.TraceContext) (cont
}

func SpanContextFromTraceContext(tc *protos.TraceContext) (trace.SpanContext, error) {
var traceID trace.TraceID
var decodedTraceID trace.TraceID
var err error
traceID, err = trace.TraceIDFromHex(tc.TraceID)
var traceID string
var spanID string
var traceFlags string

parts := strings.Split(tc.TraceParent, "-")
if len(parts) == 4 {
traceID = parts[1]
spanID = parts[2]
traceFlags = parts[3]
} else {
// backwards compatibility with older versions of the protobuf
traceID = tc.GetTraceParent()
spanID = tc.GetSpanID()
traceFlags = "01" // sampled
}

decodedTraceID, err = trace.TraceIDFromHex(traceID)
if err != nil {
return trace.SpanContext{}, err
}

var decodedSpanID trace.SpanID
decodedSpanID, err = trace.SpanIDFromHex(spanID)
if err != nil {
return trace.SpanContext{}, err
}

var spanID trace.SpanID
spanID, err = trace.SpanIDFromHex(tc.SpanID)
var decodedTraceFlags []byte
decodedTraceFlags, err = hex.DecodeString(traceFlags)
if err != nil {
return trace.SpanContext{}, err
}

// Note that we assume that trace context objects are created only for sampled spans.
// This assumption allows us to set TraceFlags to trace.FlagsSampled. Without this
// assumption, we'd need to add a property to the trace context indicating whether
// the span is being sampled.
spanContextConfig := trace.SpanContextConfig{
TraceID: traceID,
SpanID: spanID,
TraceFlags: trace.FlagsSampled,
TraceID: decodedTraceID,
SpanID: decodedSpanID,
TraceFlags: trace.TraceFlags(decodedTraceFlags[0]),
}

// Trace state is optional
Expand Down Expand Up @@ -192,8 +203,7 @@ func TraceContextFromSpan(span trace.Span) *protos.TraceContext {
spanContext := span.SpanContext()
if spanContext.IsValid() {
tc = &protos.TraceContext{
TraceID: spanContext.TraceID().String(),
SpanID: spanContext.SpanID().String(),
TraceParent: "00-" + spanContext.TraceID().String() + "-" + spanContext.SpanID().String() + "-" + spanContext.TraceFlags().String(),
}
if ts := spanContext.TraceState().String(); ts != "" {
tc.TraceState = wrapperspb.String(ts)
Expand Down
Loading

0 comments on commit ba82393

Please sign in to comment.