Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 46 additions & 8 deletions lib/datadog/tracing/trace_operation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ class TraceOperation
:sampled,
:service

# Creates a new TraceOperation.
#
# @param auto_finish [Boolean] when true, automatically finishes the trace when the local root span finishes.
# When false, the trace remains unfinished until {#finish!} is called.
# This is useful when this {TraceOperation} represents the continuation of a remote {TraceDigest},
# in which case local root spans in this {TraceOperation} are children of the {TraceDigest}'s last active span.
def initialize(
logger: Datadog.logger,
agent_sample_rate: nil,
Expand All @@ -80,7 +86,8 @@ def initialize(
trace_state_unknown_fields: nil,
remote_parent: false,
tracer: nil, # DEV-3.0: deprecated, remove in 3.0
baggage: nil
baggage: nil,
auto_finish: true
)
@logger = logger

Expand Down Expand Up @@ -119,6 +126,7 @@ def initialize(
@events = events || Events.new
@finished = false
@spans = []
@auto_finish = auto_finish
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@auto_finish = auto_finish
@auto_finish = !!auto_finish

end

def full?
Expand Down Expand Up @@ -318,6 +326,29 @@ def flush!
build_trace(spans, !finished)
end

# When automatic context management is disabled (@auto_finish is false),
# this method finishes the trace, marking it as completed.
#
# The trace will **not** automatically finish when its local root span
# when @auto_finish is false, thus calling this method is mandatory
# in such scenario.
#
# Unfinished spans are discarded.
#
# This method is idempotent and safe to call after the trace is finished.
# It is also a no-op when @auto_finish is true, to prevent misuse.
#
# @!visibility private
def finish!
return if @auto_finish || finished?

@finished = true
@active_span = nil
@active_span_count = 0

events.trace_finished.publish(self)
end

# Returns a set of trace headers used for continuing traces.
# Used for propagation across execution contexts.
# Data should reflect the active state of the trace.
Expand Down Expand Up @@ -349,7 +380,7 @@ def to_digest
trace_state: @trace_state,
trace_state_unknown_fields: @trace_state_unknown_fields,
span_remote: @remote_parent && @active_span.nil?,
baggage: (@baggage.nil? || @baggage.empty?) ? nil : @baggage
baggage: @baggage.nil? || @baggage.empty? ? nil : @baggage
).freeze
end

Expand Down Expand Up @@ -460,7 +491,7 @@ def activate_span!(span_op)

@active_span = span_op

set_root_span!(span_op) unless root_span
set_local_root_span!(span_op)
end

def deactivate_span!(span_op)
Expand All @@ -483,15 +514,22 @@ def start_span(span_op)
logger.debug { "Error starting span on trace: #{e} Backtrace: #{e.backtrace.first(3)}" }
end

# For traces with automatic context management (auto_finish),
# when the local root span finishes, the trace also finishes.
# The trace cannot receive new spans after finished.
#
# Without auto_finish, the trace can still receive spans
# until explicitly finished.
def finish_span(span, span_op, parent)
# Save finished span & root span
@spans << span unless span.nil?

# Deactivate the span, re-activate parent.
deactivate_span!(span_op)

# Set finished, to signal root span has completed.
@finished = true if span_op == root_span
# Finish if the local root span is finished and automatic
# context management is enabled.
@finished = true if span_op == root_span && @auto_finish

# Update active span count
@active_span_count -= 1
Expand All @@ -505,8 +543,8 @@ def finish_span(span, span_op, parent)
logger.debug { "Error finishing span on trace: #{e} Backtrace: #{e.backtrace.first(3)}" }
end

# Track the root span
def set_root_span!(span)
# Track the root {SpanOperation} object from the current execution context.
def set_local_root_span!(span)
return if span.nil? || root_span

@root_span = span
Expand All @@ -531,7 +569,7 @@ def build_trace(spans, partial = false)
service: service,
tags: meta,
metrics: metrics,
root_span_id: (!partial) ? root_span&.id : nil,
root_span_id: !partial ? root_span&.id : nil,
profiling_enabled: @profiling_enabled,
apm_tracing_enabled: @apm_tracing_enabled
)
Expand Down
84 changes: 56 additions & 28 deletions lib/datadog/tracing/tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ def trace(
context = call_context
active_trace = context.active_trace
trace = if continue_from || active_trace.nil?
start_trace(continue_from: continue_from)
else
active_trace
end
start_trace(continue_from: continue_from)
else
active_trace
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this rubocop autocorrected? If yes this branch needs master merged into it I think.

rescue => e
logger.debug { "Failed to trace: #{e}" }

Expand Down Expand Up @@ -241,10 +241,21 @@ def active_correlation(key = nil)
trace.to_correlation
end

# Setup a new trace to continue from where another
# Setup a new trace execution context to continue from where another
# trace left off.
# This is useful to continue distributed or async traces.
#
# The first span created in the restored context is a direct child of the
# active span from when the {Datadog::Tracing::TraceDigest} was created.
#
# When no block is given, the trace context is restored in the current thread.
# It remains active until the first span created in this restored context is finished.
# After that, if a new span is created, it start a new, unrelated trace.
#
# Used to continue distributed or async traces.
# When a block is given, the trace context is restored inside the block execution.
# It remains active until the block ends, even when the first span created inside
# the block finishes. This means that multiple spans can be direct children of the
# active span from when the {Datadog::Tracing::TraceDigest} was created.
#
# @param [Datadog::Tracing::TraceDigest] digest continue from the {Datadog::Tracing::TraceDigest}.
# @param [Thread] key Thread to retrieve trace from. Defaults to current thread. For internal use only.
Expand All @@ -260,13 +271,32 @@ def continue_trace!(digest, key = nil, &block)
# Start a new trace from the digest
context = call_context(key)
original_trace = active_trace(key)
trace = start_trace(continue_from: digest)
# When we want the trace to be bound to a block, we cannot let
# it auto finish when the local root span finishes. This would
# create mutiple traces inside the block. Instead, we'll
# expliclity finish the trace after the block finishes.
auto_finish = !block

trace = start_trace(continue_from: digest, auto_finish: auto_finish)

# If block hasn't been given; we need to manually deactivate
# this trace. Subscribe to the trace finished event to do this.
subscribe_trace_deactivation!(context, trace, original_trace) unless block

context.activate!(trace, &block)
if block
# When a block is given, the trace will be active until the block finishes.
context.activate!(trace) do
yield
ensure # We have to flush even when an error occurs
# On block completion, force the trace to finish and flush its finished spans.
# Unfinished spans are lost as the {TraceOperation} has ended.
trace.finish!
flush_trace(trace)
end
else
# Otherwise, the trace will be bound to the current thread after this point
context.activate!(trace)
end
Comment on lines +286 to +299
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be streamlined with a guard-clause

Suggested change
if block
# When a block is given, the trace will be active until the block finishes.
context.activate!(trace) do
yield
ensure # We have to flush even when an error occurs
# On block completion, force the trace to finish and flush its finished spans.
# Unfinished spans are lost as the {TraceOperation} has ended.
trace.finish!
flush_trace(trace)
end
else
# Otherwise, the trace will be bound to the current thread after this point
context.activate!(trace)
end
# The trace will be bound to the current thread if no block is given
return context.activate!(trace) unless block
# When a block is given, the trace will be active until the block finishes.
context.activate!(trace) do
yield
ensure # We have to flush even when an error occurs
# On block completion, force the trace to finish and flush its finished spans.
# Unfinished spans are lost as the {TraceOperation} has ended.
trace.finish!
flush_trace(trace)
end

end

# Sample a span, tagging the trace as appropriate.
Expand Down Expand Up @@ -329,15 +359,15 @@ def call_context(key = nil)
@provider.context(key)
end

def build_trace(digest = nil)
def build_trace(digest, auto_finish)
# Resolve hostname if configured
hostname = Core::Environment::Socket.hostname if Datadog.configuration.tracing.report_hostname
hostname = (hostname && !hostname.empty?) ? hostname : nil
hostname = hostname && !hostname.empty? ? hostname : nil

if digest
sampling_priority = if propagate_sampling_priority?(upstream_tags: digest.trace_distributed_tags)
digest.trace_sampling_priority
end
digest.trace_sampling_priority
end
TraceOperation.new(
logger: logger,
hostname: hostname,
Expand All @@ -353,7 +383,8 @@ def build_trace(digest = nil)
trace_state_unknown_fields: digest.trace_state_unknown_fields,
remote_parent: digest.span_remote,
tracer: self,
baggage: digest.baggage
baggage: digest.baggage,
auto_finish: auto_finish
)
else
TraceOperation.new(
Expand All @@ -362,13 +393,13 @@ def build_trace(digest = nil)
profiling_enabled: profiling_enabled,
apm_tracing_enabled: apm_tracing_enabled,
remote_parent: false,
tracer: self
tracer: self,
auto_finish: auto_finish
)
end
end
# rubocop:enable Metrics/MethodLength

# rubocop:disable Metrics/MethodLength
def bind_trace_events!(trace_op)
events = trace_op.send(:events)

Expand All @@ -387,13 +418,12 @@ def bind_trace_events!(trace_op)
flush_trace(event_trace_op)
end
end
# rubocop:enable Metrics/MethodLength

# Creates a new TraceOperation, with events bounds to this Tracer instance.
# @return [TraceOperation]
def start_trace(continue_from: nil)
def start_trace(continue_from: nil, auto_finish: true)
# Build a new trace using digest if provided.
trace = build_trace(continue_from)
trace = build_trace(continue_from, auto_finish)

# Bind trace events: sample trace, set default service, flush spans.
bind_trace_events!(trace)
Expand All @@ -402,7 +432,6 @@ def start_trace(continue_from: nil)
end

# rubocop:disable Lint/UnderscorePrefixedVariableName
# rubocop:disable Metrics/MethodLength
def start_span(
name,
continue_from: nil,
Expand Down Expand Up @@ -454,18 +483,17 @@ def start_span(
span
end
end
# rubocop:enable Lint/UnderscorePrefixedVariableName
# rubocop:enable Metrics/MethodLength

# rubocop:enable Lint/UnderscorePrefixedVariableName
def resolve_tags(tags, service)
merged_tags = if @tags.any? && tags
# Combine default tags with provided tags,
# preferring provided tags.
@tags.merge(tags)
else
# Use provided tags or default tags if none.
tags || @tags.dup
end
# Combine default tags with provided tags,
# preferring provided tags.
@tags.merge(tags)
else
# Use provided tags or default tags if none.
tags || @tags.dup
end
# Remove version tag if service is not the default service
if merged_tags.key?(Core::Environment::Ext::TAG_VERSION) && service && service != @default_service
merged_tags.delete(Core::Environment::Ext::TAG_VERSION)
Expand Down
12 changes: 9 additions & 3 deletions sig/datadog/tracing/trace_operation.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ module Datadog
include Metadata::Tagging

DEFAULT_MAX_LENGTH: ::Integer

@logger: Core::Logger

attr_reader logger: Core::Logger

attr_accessor agent_sample_rate: untyped
Expand All @@ -17,17 +17,23 @@ module Datadog
attr_accessor sample_rate: untyped
attr_accessor remote_parent: untyped
attr_accessor sampling_priority: untyped
attr_accessor baggage: untyped
attr_reader active_span_count: untyped
attr_reader active_span: untyped
attr_reader id: untyped
attr_reader max_length: untyped
attr_reader parent_span_id: untyped
attr_reader trace_state: untyped
attr_reader trace_state_unknown_fields: untyped
attr_writer name: untyped
attr_writer resource: untyped
attr_writer sampled: untyped
attr_writer service: untyped

def initialize: (?agent_sample_rate: untyped?, ?events: untyped?, ?hostname: untyped?, ?id: untyped?, ?max_length: untyped, ?name: untyped?, ?origin: untyped?, ?parent_span_id: untyped?, ?rate_limiter_rate: untyped?, ?resource: untyped?, ?rule_sample_rate: untyped?, ?sample_rate: untyped?, ?sampled: untyped?, ?sampling_priority: untyped?, ?service: untyped?, ?profiling_enabled: untyped?, ?apm_tracing_enabled: untyped?, ?tags: untyped?, ?metrics: untyped?, ?remote_parent: untyped?) -> void
def initialize: (?logger: untyped, ?agent_sample_rate: untyped?, ?events: untyped?, ?hostname: untyped?, ?id: untyped?, ?max_length: untyped, ?name: untyped?, ?origin: untyped?, ?parent_span_id: untyped?, ?rate_limiter_rate: untyped?, ?resource: untyped?, ?rule_sample_rate: untyped?, ?sample_rate: untyped?, ?sampled: untyped?, ?sampling_priority: untyped?, ?service: untyped?, ?profiling_enabled: untyped?, ?apm_tracing_enabled: untyped?, ?tags: untyped?, ?metrics: untyped?, ?trace_state: untyped?, ?trace_state_unknown_fields: untyped?, ?remote_parent: untyped?, ?tracer: untyped?, ?baggage: untyped?, ?auto_finish: bool) -> void

def finish!: -> void

def full?: () -> untyped
def finished_span_count: () -> untyped
def finished?: () -> untyped
Expand Down
Loading
Loading