-
Notifications
You must be signed in to change notification settings - Fork 394
Add WaterDrop integration (i.e.: trace messages produced by Karafka) #4874
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
8903281
to
47f6f12
Compare
Co-authored-by: Esther Kim <[email protected]>
topics = payload[:messages].map { |m| m[:topic] }.uniq | ||
span.set_tag(Contrib::Ext::Messaging::TAG_DESTINATION, topics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit unsure about TAG_DESTINATION
... this tag can either be an array of kafka topics (when produce_many_...
is called) or a simple string (when generating a single message at a time)
Same goes for the TAG_PARTITION
- can be either an array or a number, depending on whether we call batch produce methods or not
30f2cb9
to
6b654d4
Compare
def inject(trace_digest, message) | ||
message[:headers] ||= {} | ||
WaterDrop.inject(trace_digest, message[:headers]) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit on the fence about using a custom the custom monitor to inject the trace context 🤔
WaterDrop producers do have a middleware
interface... They just don't have a way to automatically add some middleware to all new instances.
I suppose an alternative approach might be patching the WaterDrop::Producer#setup
method, so a middleware is automatically added (if it wasn't already) right after configuring a producer. E.g.:
module Datadog
module Tracing
module Contrib
module WaterDrop
class Middleware
def self.call(message)
message[:headers] ||= {}
WaterDrop.inject(Tracing.active_trace&.to_digest, message[:headers])
message
end
end
module ProducerPatch
def setup(&block)
super.tap do
current_middlewares = middleware.instance_variable_get(:@steps)
if current_middlewares.none? { |middleware| middleware == Middleware }
middleware.append(Middleware)
end
end
end
end
module Patcher
def patch
# ...
::WaterDrop::Producer.prepend(ProducerPatch)
end
end
end
end
end
end
def patch | ||
require_relative 'monitor' | ||
|
||
::WaterDrop::Instrumentation::Monitor.prepend(Monitor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing to keep in mind: the WaterDrop monitor is configurable (just like Karafka's). That means that this would bypass our monitor patch:
::WaterDrop::Producer.new do |c|
c.kafka = { "bootstrap.servers": "127.0.0.1:9092" }
c.monitor = ActiveSupport::Notifications
end
As this is also the case in Karafka, the karafka integration suffers from the same problem (see here)
Perhaps one approach that we can take is patching the WaterDrop::Producer#setup
method, so we replace the monitor right after configuring a producer (or the Karafka::App
) (and also we could trigger a warning if the user had changed the monitor from the default one)
Thank you, @Drowze! We'll take a look in the next few days. |
I think we oculd tackle this by adding a process wide monitor that would react on builder events. |
Yes I will expose porper instrumentation events on the WaterDrop layer for you to use. Something like: WaterDrop.instrumentation.subscribe('producer.created') do |event|
event.producer.middleware...
end |
tadam karafka/waterdrop#680 |
Thanks a lot for this change @mensfeld 😄 I'll make use of this new notification asap! In order to support versions prior to your PR though I suppose we'll end up with something like this (as we've discussed already - but just to keep Datadog folks on the same page): class Integration
MINIMUM_CLASS_LEVEL_INSTRUMENTATION_VERSION = Gem::Version.new('2.8.8')
def self.supports_class_level_instrumentation?
version >= MINIMUM_CLASS_LEVEL_INSTRUMENTATION_VERSION
end
end
module Patcher
def patch
if Integration.supports_class_level_instrumentation?
require_relative 'middleware'
::WaterDrop.instrumentation.subscribe('producer.configured') do |event|
producer = event[:producer]
producer.config.middleware.append(Middleware)
end
else
require_relative 'monitor'
::WaterDrop::Instrumentation::Monitor.prepend(Monitor)
end
end
end By the way I suppose we can do something similar for the Karafka integration (but I'll leave this for separate PR): def patch
::Karafka.monitor.subscribe("app.initialized") do
if Karafka::App.config.monitor.is_a?(Module) # e.g.: ActiveSupport::Notifications or any other class/module
Karafka::App.config.monitor.prepend(Monitor)
else # e.g.: a monitor instance from karafka-core
Karafka::App.config.monitor.singleton_class.prepend(Monitor)
end
end
end |
Np. I should craft the releases with this in the upcoming days. Need to make sure docs are aligned before that. |
👋 @Drowze, with the super fast improvements that @mensfeld did (thank you so much!), do your use cases need to instrument older versions of WaterDrop? I'm trying to save your time, and keep only the nice new instrumentation, using the newly exposed API, if possible. |
Hey @marcotc! Thanks for reaching :) While I agree it'd be nice to just stick with the new API (which was not even released yet), I'm not sure about breaking compatibility with all past Waterdrop versions 😕 One thing to keep in mind: the Karafka ecosystem does not follow strict semantic versioning. Instead it usually employs breaking change in minor versions and reserve major versions major architectural changes. For context, see the Karafka docs on Versions Lifecycle and EOL Personally I don't see a good reason for not supporting any Waterdrop version before @mensfeld's changes.... I imagine other Karafka users might get confused and frustrated when e.g.: they try to use dd-trace-rb's While I'd prefer to keep the support of karafka/waterdrop versions more or less concise, I understand if Datadog team would prefer to only support those versions with a more supported API to instrument on. Please let me know your thoughts and I'll update the PR 😃 ! That being said... Out of curiosity I just compiled the waterdrop requirement on all Karafka releases (see below). Nothing too interesting there, but might be a good reference. Waterdrop requirement on all karafka git tags# script used for compiling this list (using fish)
for karafka_version in (git tag | sort -V)
echo "$karafka_version" | string match -rq '(?<major>\d+)\.(?<minor>\d+)\.(?<revision>\d+)(?:\.(?<prerelease>[0-9a-zA-Z]+))?'
set -l waterdrop_version (git show "$karafka_version:karafka.gemspec" | grep 'waterdrop' | xargs)
echo "$karafka_version | $waterdrop_version"
end
|
This change (if not used) is backwards compatible. WaterDrop had no breaking changes for years aside of one super edge case in transactions and one deprecated setting early 2024.
Yes. While Karafka can be complex, WaterDrop upgrades are not. I bump dependency of waterdrop version in karafka framework mostly to support weird edge cases and sync APIs. If you are not using Karafka, WaterDrop standalone is in my opinion low effort tool with extremely low (so far) maintenance burden. New version with the introduced changes to support this will be released at most next week as I am traveling at the moment. |
@Drowze I will release rc1 for you today of waterdrop for testing. |
NOTE: producer.configured is a new feature, which should be present on WaterDrop 2.8.8 (it's only present in the 2.8.8.rc1 for now).
What does this PR do?
Adds support for WaterDrop (i.e. Karafka producers). Fixes #4868
Motivation:
We use Datadog and Karafka. For a long time we've been using a custom patch to have some distributed tracing support, but since @nvh0412 got the ball rolling by adding traces for Karafka consumers, I've decided to continue the work there and also add proper tracing support for Karafka producers.
Hope we can get an ever-improving good support for Karafka in Datadog! 😄
(Thanks @mensfeld for the amazing Kafka framework!)
Change log entry
Additional Notes:
Although WaterDrop is part of the Karafka framework, it can be used in isolation (which is handy if the application doesn't need any message consumer).
How to test the change?
See:
Karafka.producer
is a WaterDrop instance)