Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions lib/datadog/tracing/contrib/karafka/patcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,39 @@ def propagation
# (e.g. `my_batch_operation messages.payloads`)
# @see https://github.com/karafka/karafka/blob/b06d1f7c17818e1605f80c2bb573454a33376b40/README.md?plain=1#L29-L35
def each(&block)
parent_span = Datadog::Tracing.active_span
parent_trace_digest = Datadog::Tracing.active_trace&.to_digest

@messages_array.each do |message|
if configuration[:distributed_tracing]
trace_digest = if configuration[:distributed_tracing]
headers = if message.metadata.respond_to?(:raw_headers)
message.metadata.raw_headers
else
message.metadata.headers
end
trace_digest = Karafka.extract(headers)
Datadog::Tracing.continue_trace!(trace_digest) if trace_digest
Karafka.extract(headers)
end

Tracing.trace(Ext::SPAN_MESSAGE_CONSUME) do |span|
Tracing.trace(Ext::SPAN_MESSAGE_CONSUME, continue_from: trace_digest) do |span, trace|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I am not sure this is correct. Is continue_from really meant for parent/child relationship?

span.set_tag(Ext::TAG_OFFSET, message.metadata.offset)
span.set_tag(Contrib::Ext::Messaging::TAG_DESTINATION, message.topic)
span.set_tag(Contrib::Ext::Messaging::TAG_SYSTEM, Ext::TAG_SYSTEM)

span.resource = message.topic

# link the outer trace (where the messages batch was consumed)
# with the individual message's processing trace, so they're easier to
# correlate in the Datadog UI
if parent_span && span.parent_id != parent_span.id
# add a link from the parent trace to the message span
span_link = Tracing::SpanLink.new(parent_trace_digest)
span.links << span_link

# add a link from the current trace to the parent span
span_link = Tracing::SpanLink.new(trace.to_digest)
parent_span.links << span_link
end
Comment on lines +50 to +58
Copy link
Contributor Author

@Drowze Drowze Sep 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let me know your thoughts here! I understand that "span links" are not very widely used here in the ruby sdk, but one challenge we have once we continue a trace is co-relating the span with its "original parent". I think span links are a great fit for solving that problem.


yield message
end
end
Expand Down
91 changes: 91 additions & 0 deletions spec/datadog/tracing/contrib/karafka/patcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,97 @@
expect(span).to_not have_error
expect(span.resource).to eq 'topic_a'
end

context 'when the message has tracing headers' do
let(:message) do
headers = {}
producer_trace = nil
producer_span = nil
Datadog::Tracing.trace('producer') do |span, trace|
producer_span = span
producer_trace = trace
Datadog::Tracing::Contrib::Karafka.inject(trace.to_digest, headers)
end
metadata = ::Karafka::Messages::Metadata.new
metadata['offset'] = 412
metadata[headers_accessor] = headers
raw_payload = rand.to_s

message = ::Karafka::Messages::Message.new(raw_payload, metadata)
allow(message).to receive(:timestamp).and_return(Time.now)
allow(message).to receive(:topic).and_return('topic_a')
message
end
let(:headers_accessor) do
::Karafka::Messages::Metadata.members.include?(:raw_headers) ? 'raw_headers' : 'headers'
end

context 'when distributed tracing is enabled' do
it 'continues the span that produced the message' do
producer_trace_digest = Datadog::Tracing::Contrib::Karafka.extract(message.metadata[headers_accessor])

consumer_span = nil
consumer_trace = nil

Datadog::Tracing.trace('consumer') do
consumer_span = Datadog::Tracing.active_span
consumer_trace = Datadog::Tracing.active_trace

topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0))
messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now)
expect(messages).to all(be_a(::Karafka::Messages::Message))

# assert that the current trace re-set to the original trace after iterating the messages
expect(Datadog::Tracing.active_trace).to eq(consumer_trace)
expect(Datadog::Tracing.active_span).to eq(consumer_span)
end

expect(spans).to have(3).items

# assert that the message span is a continuation of the producer span
expect(span.parent_id).to eq producer_trace_digest.span_id
expect(span.trace_id).to eq producer_trace_digest.trace_id

expect(span.links.map { |l| [l.trace_id, l.span_id] }).to contain_exactly(
[consumer_trace.id, consumer_span.id]
)
expect(consumer_span.links.map { |l| [l.trace_id, l.span_id] }).to contain_exactly(
[span.trace_id, span.id]
)
end
end

context 'when distributed tracing is not enabled' do
let(:configuration_options) { { distributed_tracing: false } }

it 'does not continue the span that produced the message' do
consumer_span = nil
consumer_trace = nil

Datadog::Tracing.trace('consumer') do
consumer_span = Datadog::Tracing.active_span
consumer_trace = Datadog::Tracing.active_trace

topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0))
messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now)
expect(messages).to all(be_a(::Karafka::Messages::Message))

# assert that the current trace re-set to the original trace after iterating the messages
expect(Datadog::Tracing.active_trace).to eq(consumer_trace)
expect(Datadog::Tracing.active_span).to eq(consumer_span)
end

expect(spans).to have(3).items

# assert that the message span is not continuation of the producer span
expect(span.parent_id).to eq(consumer_span.id)
expect(span.trace_id).to eq(consumer_trace.id)

expect(span.links).to be_empty
expect(consumer_span.links).to be_empty
end
end
end
end

describe 'worker.processed' do
Expand Down