Skip to content

Commit bbd2a5d

Browse files
Igrehigorekantonpirkersentrivana
authored
feat(integrations): Add tracing to DramatiqIntegration (#4571)
Adds tracing support to DramatiqIntegration #3454 --------- Co-authored-by: igorek <[email protected]> Co-authored-by: Anton Pirker <[email protected]> Co-authored-by: Ivana Kellyer <[email protected]>
1 parent f3e8a5c commit bbd2a5d

File tree

4 files changed

+146
-36
lines changed

4 files changed

+146
-36
lines changed

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,10 @@ ignore_missing_imports = true
179179
module = "agents.*"
180180
ignore_missing_imports = true
181181

182+
[[tool.mypy.overrides]]
183+
module = "dramatiq.*"
184+
ignore_missing_imports = true
185+
182186
#
183187
# Tool: Ruff (linting and formatting)
184188
#

sentry_sdk/consts.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -839,6 +839,7 @@ class OP:
839839
QUEUE_TASK_HUEY = "queue.task.huey"
840840
QUEUE_SUBMIT_RAY = "queue.submit.ray"
841841
QUEUE_TASK_RAY = "queue.task.ray"
842+
QUEUE_TASK_DRAMATIQ = "queue.task.dramatiq"
842843
SUBPROCESS = "subprocess"
843844
SUBPROCESS_WAIT = "subprocess.wait"
844845
SUBPROCESS_COMMUNICATE = "subprocess.communicate"

sentry_sdk/integrations/dramatiq.py

Lines changed: 89 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,31 @@
11
import json
22

33
import sentry_sdk
4-
from sentry_sdk.integrations import Integration
4+
from sentry_sdk.consts import OP, SPANSTATUS
5+
from sentry_sdk.api import continue_trace, get_baggage, get_traceparent
6+
from sentry_sdk.integrations import Integration, DidNotEnable
57
from sentry_sdk.integrations._wsgi_common import request_body_within_bounds
8+
from sentry_sdk.tracing import (
9+
BAGGAGE_HEADER_NAME,
10+
SENTRY_TRACE_HEADER_NAME,
11+
TransactionSource,
12+
)
613
from sentry_sdk.utils import (
714
AnnotatedValue,
815
capture_internal_exceptions,
916
event_from_exception,
1017
)
18+
from typing import TypeVar
19+
20+
R = TypeVar("R")
1121

12-
from dramatiq.broker import Broker # type: ignore
13-
from dramatiq.message import Message # type: ignore
14-
from dramatiq.middleware import Middleware, default_middleware # type: ignore
15-
from dramatiq.errors import Retry # type: ignore
22+
try:
23+
from dramatiq.broker import Broker
24+
from dramatiq.middleware import Middleware, default_middleware
25+
from dramatiq.errors import Retry
26+
from dramatiq.message import Message
27+
except ImportError:
28+
raise DidNotEnable("Dramatiq is not installed")
1629

1730
from typing import TYPE_CHECKING
1831

@@ -34,10 +47,12 @@ class DramatiqIntegration(Integration):
3447
"""
3548

3649
identifier = "dramatiq"
50+
origin = f"auto.queue.{identifier}"
3751

3852
@staticmethod
3953
def setup_once():
4054
# type: () -> None
55+
4156
_patch_dramatiq_broker()
4257

4358

@@ -85,50 +100,93 @@ class SentryMiddleware(Middleware): # type: ignore[misc]
85100
DramatiqIntegration.
86101
"""
87102

88-
def before_process_message(self, broker, message):
89-
# type: (Broker, Message) -> None
103+
SENTRY_HEADERS_NAME = "_sentry_headers"
104+
105+
def before_enqueue(self, broker, message, delay):
106+
# type: (Broker, Message[R], int) -> None
90107
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
91108
if integration is None:
92109
return
93110

94-
message._scope_manager = sentry_sdk.new_scope()
95-
message._scope_manager.__enter__()
111+
message.options[self.SENTRY_HEADERS_NAME] = {
112+
BAGGAGE_HEADER_NAME: get_baggage(),
113+
SENTRY_TRACE_HEADER_NAME: get_traceparent(),
114+
}
115+
116+
def before_process_message(self, broker, message):
117+
# type: (Broker, Message[R]) -> None
118+
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
119+
if integration is None:
120+
return
96121

97-
scope = sentry_sdk.get_current_scope()
98-
scope.set_transaction_name(message.actor_name)
122+
message._scope_manager = sentry_sdk.isolation_scope()
123+
scope = message._scope_manager.__enter__()
124+
scope.clear_breadcrumbs()
99125
scope.set_extra("dramatiq_message_id", message.message_id)
100126
scope.add_event_processor(_make_message_event_processor(message, integration))
101127

128+
sentry_headers = message.options.get(self.SENTRY_HEADERS_NAME) or {}
129+
if "retries" in message.options:
130+
# start new trace in case of retrying
131+
sentry_headers = {}
132+
133+
transaction = continue_trace(
134+
sentry_headers,
135+
name=message.actor_name,
136+
op=OP.QUEUE_TASK_DRAMATIQ,
137+
source=TransactionSource.TASK,
138+
origin=DramatiqIntegration.origin,
139+
)
140+
transaction.set_status(SPANSTATUS.OK)
141+
sentry_sdk.start_transaction(
142+
transaction,
143+
name=message.actor_name,
144+
op=OP.QUEUE_TASK_DRAMATIQ,
145+
source=TransactionSource.TASK,
146+
)
147+
transaction.__enter__()
148+
102149
def after_process_message(self, broker, message, *, result=None, exception=None):
103-
# type: (Broker, Message, Any, Optional[Any], Optional[Exception]) -> None
150+
# type: (Broker, Message[R], Optional[Any], Optional[Exception]) -> None
104151
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
105152
if integration is None:
106153
return
107154

108155
actor = broker.get_actor(message.actor_name)
109156
throws = message.options.get("throws") or actor.options.get("throws")
110157

111-
try:
112-
if (
113-
exception is not None
114-
and not (throws and isinstance(exception, throws))
115-
and not isinstance(exception, Retry)
116-
):
117-
event, hint = event_from_exception(
118-
exception,
119-
client_options=sentry_sdk.get_client().options,
120-
mechanism={
121-
"type": DramatiqIntegration.identifier,
122-
"handled": False,
123-
},
124-
)
125-
sentry_sdk.capture_event(event, hint=hint)
126-
finally:
127-
message._scope_manager.__exit__(None, None, None)
158+
scope_manager = message._scope_manager
159+
transaction = sentry_sdk.get_current_scope().transaction
160+
if not transaction:
161+
return None
162+
163+
is_event_capture_required = (
164+
exception is not None
165+
and not (throws and isinstance(exception, throws))
166+
and not isinstance(exception, Retry)
167+
)
168+
if not is_event_capture_required:
169+
# normal transaction finish
170+
transaction.__exit__(None, None, None)
171+
scope_manager.__exit__(None, None, None)
172+
return
173+
174+
event, hint = event_from_exception(
175+
exception, # type: ignore[arg-type]
176+
client_options=sentry_sdk.get_client().options,
177+
mechanism={
178+
"type": DramatiqIntegration.identifier,
179+
"handled": False,
180+
},
181+
)
182+
sentry_sdk.capture_event(event, hint=hint)
183+
# transaction error
184+
transaction.__exit__(type(exception), exception, None)
185+
scope_manager.__exit__(type(exception), exception, None)
128186

129187

130188
def _make_message_event_processor(message, integration):
131-
# type: (Message, DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]]
189+
# type: (Message[R], DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]]
132190

133191
def inner(event, hint):
134192
# type: (Event, Hint) -> Optional[Event]
@@ -142,7 +200,7 @@ def inner(event, hint):
142200

143201
class DramatiqMessageExtractor:
144202
def __init__(self, message):
145-
# type: (Message) -> None
203+
# type: (Message[R]) -> None
146204
self.message_data = dict(message.asdict())
147205

148206
def content_length(self):

tests/integrations/dramatiq/test_dramatiq.py

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,21 @@
55
from dramatiq.brokers.stub import StubBroker
66

77
import sentry_sdk
8+
from sentry_sdk.tracing import TransactionSource
9+
from sentry_sdk import start_transaction
10+
from sentry_sdk.consts import SPANSTATUS
811
from sentry_sdk.integrations.dramatiq import DramatiqIntegration
12+
from sentry_sdk.integrations.logging import ignore_logger
913

14+
ignore_logger("dramatiq.worker.WorkerThread")
1015

11-
@pytest.fixture
12-
def broker(sentry_init):
13-
sentry_init(integrations=[DramatiqIntegration()])
16+
17+
@pytest.fixture(scope="function")
18+
def broker(request, sentry_init):
19+
sentry_init(
20+
integrations=[DramatiqIntegration()],
21+
traces_sample_rate=getattr(request, "param", None),
22+
)
1423
broker = StubBroker()
1524
broker.emit_after("process_boot")
1625
dramatiq.set_broker(broker)
@@ -44,19 +53,57 @@ def dummy_actor(x, y):
4453
assert exception["type"] == "ZeroDivisionError"
4554

4655

47-
def test_that_actor_name_is_set_as_transaction(broker, worker, capture_events):
56+
@pytest.mark.parametrize(
57+
"broker,expected_span_status",
58+
[
59+
(1.0, SPANSTATUS.INTERNAL_ERROR),
60+
(1.0, SPANSTATUS.OK),
61+
],
62+
ids=["error", "success"],
63+
indirect=["broker"],
64+
)
65+
def test_task_transaction(broker, worker, capture_events, expected_span_status):
4866
events = capture_events()
67+
task_fails = expected_span_status == SPANSTATUS.INTERNAL_ERROR
4968

5069
@dramatiq.actor(max_retries=0)
5170
def dummy_actor(x, y):
5271
return x / y
5372

54-
dummy_actor.send(1, 0)
73+
dummy_actor.send(1, int(not task_fails))
5574
broker.join(dummy_actor.queue_name)
5675
worker.join()
5776

77+
if task_fails:
78+
error_event = events.pop(0)
79+
exception = error_event["exception"]["values"][0]
80+
assert exception["type"] == "ZeroDivisionError"
81+
assert exception["mechanism"]["type"] == DramatiqIntegration.identifier
82+
5883
(event,) = events
84+
assert event["type"] == "transaction"
5985
assert event["transaction"] == "dummy_actor"
86+
assert event["transaction_info"] == {"source": TransactionSource.TASK}
87+
assert event["contexts"]["trace"]["status"] == expected_span_status
88+
89+
90+
@pytest.mark.parametrize("broker", [1.0], indirect=True)
91+
def test_dramatiq_propagate_trace(broker, worker, capture_events):
92+
events = capture_events()
93+
94+
@dramatiq.actor(max_retries=0)
95+
def propagated_trace_task():
96+
pass
97+
98+
with start_transaction() as outer_transaction:
99+
propagated_trace_task.send()
100+
broker.join(propagated_trace_task.queue_name)
101+
worker.join()
102+
103+
assert (
104+
events[0]["transaction"] == "propagated_trace_task"
105+
) # the "inner" transaction
106+
assert events[0]["contexts"]["trace"]["trace_id"] == outer_transaction.trace_id
60107

61108

62109
def test_that_dramatiq_message_id_is_set_as_extra(broker, worker, capture_events):

0 commit comments

Comments
 (0)