Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions lib/datadog/tracing/contrib/karafka/framework.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,26 @@ module Karafka
# - instrument parts of the framework when needed
module Framework
def self.setup
karafka_configurations = Datadog.configuration.tracing.fetch_integration(:karafka).configurations
waterdrop_configurations = Datadog.configuration.tracing.fetch_integration(:waterdrop).configurations

Datadog.configure do |datadog_config|
karafka_config = datadog_config.tracing[:karafka]
activate_waterdrop!(datadog_config, karafka_config)
karafka_configurations.each do |name, karafka_config|
# do not override user configuration
next if name != :default && waterdrop_configurations.key?(name)
activate_waterdrop!(datadog_config, karafka_config, name)
end
end
end

# Apply relevant configuration from Karafka to WaterDrop
def self.activate_waterdrop!(datadog_config, karafka_config)
def self.activate_waterdrop!(datadog_config, karafka_config, name)
datadog_config.tracing.instrument(
:waterdrop,
enabled: karafka_config[:enabled],
service_name: karafka_config[:service_name],
distributed_tracing: karafka_config[:distributed_tracing],
describes: name
)
end
end
Expand Down
4 changes: 4 additions & 0 deletions lib/datadog/tracing/contrib/karafka/integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ def new_configuration
def patcher
Patcher
end

def resolver
@resolver ||= Contrib::Configuration::Resolvers::PatternResolver.new
end
end
end
end
Expand Down
5 changes: 3 additions & 2 deletions lib/datadog/tracing/contrib/karafka/patcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ module Contrib
module Karafka
# Patch to add tracing to Karafka::Messages::Messages
module MessagesPatch
def configuration
Datadog.configuration.tracing[:karafka]
def datadog_configuration(topic)
Datadog.configuration.tracing[:karafka, topic]
end

def propagation
Expand All @@ -28,6 +28,7 @@ def each(&block)
parent_trace_digest = Datadog::Tracing.active_trace&.to_digest

@messages_array.each do |message|
configuration = datadog_configuration(message.topic)
trace_digest = if configuration[:distributed_tracing]
headers = if message.metadata.respond_to?(:raw_headers)
message.metadata.raw_headers
Expand Down
4 changes: 4 additions & 0 deletions lib/datadog/tracing/contrib/waterdrop/integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ def new_configuration
def patcher
Patcher
end

def resolver
@resolver ||= Contrib::Configuration::Resolvers::PatternResolver.new
end
end
end
end
Expand Down
16 changes: 10 additions & 6 deletions lib/datadog/tracing/contrib/waterdrop/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ module Monitor
message.produced_sync
].freeze

def configuration
Datadog.configuration.tracing[:waterdrop]
end

def instrument(event_id, payload = {}, &block)
return super unless TRACEABLE_EVENTS.include?(event_id)

Expand All @@ -40,15 +36,15 @@ def instrument(event_id, payload = {}, &block)

span.set_tag(Contrib::Karafka::Ext::TAG_MESSAGE_COUNT, payload[:messages].size)

payload[:messages].each { |message| inject(trace_digest, message) } if configuration[:distributed_tracing]
payload[:messages].each { |message| inject(trace_digest, message) }
else
action = event_id.sub('message.produced', 'produce')

span.set_tag(Contrib::Ext::Messaging::TAG_DESTINATION, payload[:message][:topic])
span.set_tag(Contrib::Karafka::Ext::TAG_PARTITION, payload[:message][:partition])
span.set_tag(Contrib::Karafka::Ext::TAG_MESSAGE_COUNT, 1)

inject(trace_digest, payload[:message]) if configuration[:distributed_tracing]
inject(trace_digest, payload[:message])
end

span.resource = "waterdrop.#{action}"
Expand All @@ -63,9 +59,17 @@ def instrument(event_id, payload = {}, &block)
private

def inject(trace_digest, message)
return unless datadog_configuration(message[:topic])[:distributed_tracing]

message[:headers] ||= {}
WaterDrop.inject(trace_digest, message[:headers])
end

# cache the configuration resolution per topic to avoid repeated lookups in message batches
def datadog_configuration(topic)
@datadog_configuration ||= {}
@datadog_configuration[topic] ||= Datadog.configuration.tracing[:waterdrop, topic]
end
end
end
end
Expand Down
99 changes: 81 additions & 18 deletions spec/datadog/tracing/contrib/karafka/patcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
before do
Datadog.configure do |c|
c.tracing.instrument :karafka, configuration_options
c.tracing.instrument :karafka, describes: /special_/, distributed_tracing: false
end
end

Expand All @@ -31,16 +32,12 @@
let(:span_name) { Datadog::Tracing::Contrib::Karafka::Ext::SPAN_MESSAGE_CONSUME }

it 'is expected to send a span' do
metadata = ::Karafka::Messages::Metadata.new
metadata['offset'] = 412
metadata = ::Karafka::Messages::Metadata.new(offset: 412, timestamp: Time.now, topic: 'topic_a')
raw_payload = rand.to_s

message = ::Karafka::Messages::Message.new(raw_payload, metadata)
allow(message).to receive(:timestamp).and_return(Time.now)
allow(message).to receive(:topic).and_return('topic_a')

topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0))

topic = ::Karafka::Routing::Topic.new(message.topic, double(id: 0))
messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now)

expect(messages).to all(be_a(::Karafka::Messages::Message))
Expand All @@ -55,6 +52,7 @@
end

context 'when the message has tracing headers' do
let(:topic_name) { "topic_a" }
let(:message) do
headers = {}
producer_trace = nil
Expand All @@ -64,15 +62,15 @@
producer_trace = trace
Datadog::Tracing::Contrib::Karafka.inject(trace.to_digest, headers)
end
metadata = ::Karafka::Messages::Metadata.new
metadata['offset'] = 412
metadata[headers_accessor] = headers
metadata = ::Karafka::Messages::Metadata.new(
offset: 412,
headers_accessor => headers,
topic: topic_name,
timestamp: Time.now
)
raw_payload = rand.to_s

message = ::Karafka::Messages::Message.new(raw_payload, metadata)
allow(message).to receive(:timestamp).and_return(Time.now)
allow(message).to receive(:topic).and_return('topic_a')
message
::Karafka::Messages::Message.new(raw_payload, metadata)
end
let(:headers_accessor) do
::Karafka::Messages::Metadata.members.include?(:raw_headers) ? 'raw_headers' : 'headers'
Expand All @@ -89,7 +87,7 @@
consumer_span = Datadog::Tracing.active_span
consumer_trace = Datadog::Tracing.active_trace

topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0))
topic = ::Karafka::Routing::Topic.new(topic_name, double(id: 0))
messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now)
expect(messages).to all(be_a(::Karafka::Messages::Message))

Expand All @@ -113,6 +111,37 @@
end
end

context "when distributed tracing is disabled for the topic in particular" do
let(:topic_name) { "special_topic" }

it 'does not continue the span that produced the message' do
consumer_span = nil
consumer_trace = nil

Datadog::Tracing.trace('consumer') do
consumer_span = Datadog::Tracing.active_span
consumer_trace = Datadog::Tracing.active_trace

topic = ::Karafka::Routing::Topic.new(topic_name, double(id: 0))
messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now)
expect(messages).to all(be_a(::Karafka::Messages::Message))

# assert that the current trace re-set to the original trace after iterating the messages
expect(Datadog::Tracing.active_trace).to eq(consumer_trace)
expect(Datadog::Tracing.active_span).to eq(consumer_span)
end

expect(spans).to have(3).items

# assert that the message span is not continuation of the producer span
expect(span.parent_id).to eq(consumer_span.id)
expect(span.trace_id).to eq(consumer_trace.id)

expect(span.links).to be_empty
expect(consumer_span.links).to be_empty
end
end

context 'when distributed tracing is not enabled' do
let(:configuration_options) { { distributed_tracing: false } }

Expand All @@ -124,7 +153,7 @@
consumer_span = Datadog::Tracing.active_span
consumer_trace = Datadog::Tracing.active_trace

topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0))
topic = ::Karafka::Routing::Topic.new(topic_name, double(id: 0))
messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now)
expect(messages).to all(be_a(::Karafka::Messages::Message))

Expand All @@ -150,12 +179,11 @@
let(:span_name) { Datadog::Tracing::Contrib::Karafka::Ext::SPAN_WORKER_PROCESS }

it 'is expected to send a span' do
metadata = ::Karafka::Messages::Metadata.new
metadata['offset'] = 412
metadata = ::Karafka::Messages::Metadata.new(offset: 412, topic: 'topic_a')
raw_payload = rand.to_s

message = ::Karafka::Messages::Message.new(raw_payload, metadata)
job = double(executor: double(topic: double(name: 'topic_a', consumer: 'ABC'), partition: 0), messages: [message])
job = double(executor: double(topic: double(name: message.topic, consumer: 'ABC'), partition: 0), messages: [message])

Karafka.monitor.instrument('worker.processed', { job: job }) do
# Noop
Expand All @@ -171,4 +199,39 @@
expect(span.resource).to eq 'ABC#consume'
end
end

describe "framework auto-instrumentation" do
around do |example|
# Reset before and after each example; don't allow global state to linger.
Datadog.registry[:waterdrop].reset_configuration!
example.run
Datadog.registry[:waterdrop].reset_configuration!

# reset Karafka internal state as well
Karafka::App.config.internal.status.reset!
Karafka.refresh!
end

before do
Datadog.configure do |c|
c.tracing.instrument :karafka, describes: "conflicting_topic", distributed_tracing: true
c.tracing.instrument :waterdrop, describes: "conflicting_topic", distributed_tracing: false
end
end

it "automatically enables waterdrop instrumentation" do
Karafka::App.setup do |c|
c.kafka = { 'bootstrap.servers': '127.0.0.1:9092' }
end

expect(Datadog.configuration.tracing[:waterdrop][:enabled]).to be true
expect(Datadog.configuration.tracing[:waterdrop][:distributed_tracing]).to be true

expect(Datadog.configuration.tracing[:waterdrop, "special_topic"][:enabled]).to be true
expect(Datadog.configuration.tracing[:waterdrop, "special_topic"][:distributed_tracing]).to be false

expect(Datadog.configuration.tracing[:waterdrop, "conflicting_topic"][:enabled]).to be true
expect(Datadog.configuration.tracing[:waterdrop, "conflicting_topic"][:distributed_tracing]).to be false
end
end
end
2 changes: 0 additions & 2 deletions spec/datadog/tracing/contrib/waterdrop/monitor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
end
require 'datadog'

puts "waterdrop version: #{WaterDrop::VERSION}"

RSpec.describe 'Waterdrop monitor' do
before do
Datadog.configure do |c|
Expand Down
Loading