Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Dramatiq #101

Open
santigandolfo opened this issue May 20, 2024 · 1 comment
Open

Support for Dramatiq #101

santigandolfo opened this issue May 20, 2024 · 1 comment
Labels
enhancement New feature or request

Comments

@santigandolfo
Copy link

Is your feature request related to a problem? Please describe.
Please consider adding dramatiq to the supported technologies. It currently has around 4.1k stars on Github, so it is certainly a popular and widely used library.

Describe the solution you'd like
Adding support for Dramatiq.

Describe alternatives you've considered

Additional context

@santigandolfo santigandolfo added the enhancement New feature or request label May 20, 2024
@santigandolfo
Copy link
Author

I created this Middleware:

from autodynatrace.sdk import sdk
from dramatiq import Broker
from dramatiq import Message
from dramatiq.middleware import Middleware
from oneagent.common import ChannelType
from oneagent.common import MessagingDestinationType
from oneagent.sdk import Channel
from oneagent.sdk.tracers import Tracer


class DynatraceMiddleware(Middleware):
    def __init__(self):
        self.tracers = {}

    @staticmethod
    def _create_msi_handle(message: Message):
        return sdk.create_messaging_system_info(
            'Dramatiq',
            message.actor_name,
            MessagingDestinationType.QUEUE,
            Channel(ChannelType.OTHER, 'dramatiq'),
        )

    def _start_tracer(self, message: Message, tracer: Tracer):
        tracer.start()
        self.tracers[message.message_id] = tracer

    def _end_tracer(self, message: Message):
        tracer: Tracer | None = self.tracers.get(message.message_id, None)
        if tracer is not None:
            tracer.end()
        self.tracers.pop(message.message_id, None)

    def before_enqueue(self, broker: Broker, message: Message, delay: int):
        msi_handle = self._create_msi_handle(message=message)
        with msi_handle:
            tracer = sdk.trace_outgoing_message(msi_handle)
            self._start_tracer(message=message, tracer=tracer)

    def before_process_message(self, broker: Broker, message: Message):
        msi_handle = self._create_msi_handle(message=message)
        with msi_handle:
            with sdk.trace_incoming_message_receive(msi_handle):
                tracer = sdk.trace_incoming_message_process(msi_handle)
                self._start_tracer(message=message, tracer=tracer)

    def after_enqueue(self, broker: Broker, message: Message, delay: int):
        self._end_tracer(message=message)

    def after_process_message(
        self, broker: Broker, message: Message, *, result=None, exception=None
    ):
        self._end_tracer(message=message)

    def after_skip_message(self, broker: Broker, message: Message):
        self._end_tracer(message=message)

How should it have to be modified to work as a wrapper?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant