Skip to content

Commit 923a9fc

Browse files
committed
move some shared code around to top level test_base and finish test that validates process_data_packet
1 parent 7573e2e commit 923a9fc

File tree

7 files changed

+85
-37
lines changed

7 files changed

+85
-37
lines changed

Diff for: src/sentry/workflow_engine/models/data_source.py

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ class DataSource(DefaultFieldsModel):
2929
__relocation_scope__ = RelocationScope.Organization
3030

3131
organization = FlexibleForeignKey("sentry.Organization")
32+
33+
# Should this be a string so we can support UUID / ints?
3234
query_id = BoundedBigIntegerField()
3335

3436
# TODO - Add a type here

Diff for: src/sentry/workflow_engine/processors/__init__.py

-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,5 @@
55
"process_data_packet",
66
]
77

8-
from .data_packet import process_data_packet
98
from .data_source import process_data_sources
109
from .detector import process_detectors
11-
from .workflow import process_workflows

Diff for: src/sentry/workflow_engine/processors/data_packet.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from sentry.workflow_engine.handlers.detector import DetectorEvaluationResult
22
from sentry.workflow_engine.models import DataPacket, Detector
3-
from sentry.workflow_engine.processors import process_data_sources, process_detectors
3+
from sentry.workflow_engine.processors.data_source import process_data_sources
4+
from sentry.workflow_engine.processors.detector import process_detectors
45
from sentry.workflow_engine.types import DetectorGroupKey
56

67

@@ -15,7 +16,9 @@ def process_data_packets(
1516

1617
results: list[tuple[Detector, dict[DetectorGroupKey, DetectorEvaluationResult]]] = []
1718
for data_packet, detectors in processed_sources:
18-
detector_result = process_detectors(data_packet, detectors)
19-
results.append(detector_result)
19+
detector_results = process_detectors(data_packet, detectors)
20+
21+
for detector, detector_state in detector_results:
22+
results.append((detector, detector_state))
2023

2124
return results

Diff for: src/sentry/workflow_engine/processors/data_source.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ def process_data_sources(
1414
) -> list[tuple[DataPacket, list[Detector]]]:
1515
metrics.incr("sentry.workflow_engine.process_data_sources", tags={"query_type": query_type})
1616

17-
data_packet_ids = {packet.query_id for packet in data_packets}
17+
# TODO - change data_source.query_id to be a string to support UUIDs
18+
data_packet_ids = {int(packet.query_id) for packet in data_packets}
1819

1920
# Fetch all data sources and associated detectors for the given data packets
2021
with sentry_sdk.start_span(op="sentry.workflow_engine.process_data_sources.fetch_data_sources"):
@@ -23,12 +24,12 @@ def process_data_sources(
2324
).prefetch_related(Prefetch("detectors"))
2425

2526
# Build a lookup dict for query_id to detectors
26-
query_id_to_detectors = {str(ds.query_id): list(ds.detectors.all()) for ds in data_sources}
27+
query_id_to_detectors = {int(ds.query_id): list(ds.detectors.all()) for ds in data_sources}
2728

2829
# Create the result tuples
2930
result = []
3031
for packet in data_packets:
31-
detectors = query_id_to_detectors.get(packet.query_id)
32+
detectors = query_id_to_detectors.get(int(packet.query_id))
3233

3334
if detectors:
3435
data_packet_tuple = (packet, detectors)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from sentry.workflow_engine.processors.data_packet import process_data_packets
2+
from sentry.workflow_engine.types import DetectorPriorityLevel
3+
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest
4+
5+
6+
class TestProcessDataPacket(BaseWorkflowTest):
7+
def setUp(self):
8+
self.snuba_query = self.create_snuba_query()
9+
10+
(self.workflow, self.detector, self.detector_workflow, self.workflow_triggers) = (
11+
self.create_detector_and_workflow()
12+
)
13+
14+
self.data_source, self.data_packet = self.create_test_query_data_source(self.detector)
15+
16+
def test_single_data_packet(self):
17+
results = process_data_packets([self.data_packet], "snuba_query_subscription")
18+
assert len(results) == 1
19+
20+
detector, detector_evaluation_result = results[0]
21+
assert detector_evaluation_result[None].priority == DetectorPriorityLevel.HIGH

Diff for: tests/sentry/workflow_engine/test_base.py

+50-3
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
1-
from datetime import datetime
1+
from datetime import UTC, datetime
22
from uuid import uuid4
33

44
from sentry.eventstore.models import Event, GroupEvent
5+
from sentry.incidents.utils.types import QuerySubscriptionUpdate
56
from sentry.models.group import Group
67
from sentry.snuba.models import SnubaQuery
78
from sentry.testutils.cases import TestCase
89
from sentry.testutils.factories import EventType
910
from sentry.workflow_engine.models import (
1011
Action,
1112
DataConditionGroup,
13+
DataPacket,
14+
DataSource,
1215
Detector,
1316
DetectorWorkflow,
1417
Workflow,
1518
)
1619
from sentry.workflow_engine.models.data_condition import Condition
17-
from sentry.workflow_engine.types import DetectorType
20+
from sentry.workflow_engine.types import DetectorPriorityLevel, DetectorType
1821
from tests.sentry.issues.test_utils import OccurrenceTestMixin
1922

2023

@@ -66,9 +69,13 @@ def create_detector_and_workflow(
6669
self,
6770
name_prefix="test",
6871
workflow_triggers: DataConditionGroup | None = None,
69-
detector_type: DetectorType | str = "TestDetector",
72+
detector_type: DetectorType | str = DetectorType.METRIC_ALERT_FIRE,
7073
**kwargs,
7174
) -> tuple[Workflow, Detector, DetectorWorkflow, DataConditionGroup]:
75+
"""
76+
Create a Worfkllow, Detector, DetectorWorkflow, and DataConditionGroup for testing.
77+
These models are configuerd to be related to each other.
78+
"""
7279
workflow_triggers = workflow_triggers or self.create_data_condition_group()
7380

7481
if not workflow_triggers.conditions.exists():
@@ -100,6 +107,46 @@ def create_detector_and_workflow(
100107

101108
return workflow, detector, detector_workflow, workflow_triggers
102109

110+
def create_test_query_data_source(self, detector) -> tuple[DataSource, DataPacket]:
111+
"""
112+
Create a DataSource and DataPacket for testing; this will create a fake QuerySubscriptionUpdate and link it to a data_source.
113+
114+
A detector is required to create this test data, so we can ensure that the detector
115+
has a condition to evaluate for the data_packet that evalutes to true.
116+
"""
117+
subscription_update: QuerySubscriptionUpdate = {
118+
"subscription_id": "123",
119+
"values": {"foo": 1},
120+
"timestamp": datetime.now(UTC),
121+
"entity": "test-entity",
122+
}
123+
124+
data_source = self.create_data_source(
125+
query_id=subscription_update["subscription_id"],
126+
organization=self.organization,
127+
)
128+
129+
data_source.detectors.add(detector)
130+
131+
if detector.workflow_condition_group is None:
132+
detector.workflow_condition_group = self.create_data_condition_group(logic_type="any")
133+
detector.save()
134+
135+
self.create_data_condition(
136+
condition_group=detector.workflow_condition_group,
137+
type=Condition.EQUAL,
138+
condition_result=DetectorPriorityLevel.HIGH,
139+
comparison=1,
140+
)
141+
142+
# Create a data_packet from the update for testing
143+
data_packet = DataPacket[QuerySubscriptionUpdate](
144+
query_id=subscription_update["subscription_id"],
145+
packet=subscription_update,
146+
)
147+
148+
return data_source, data_packet
149+
103150
def create_workflow_action(
104151
self,
105152
workflow: Workflow,

Diff for: tests/sentry/workflow_engine/test_integration.py

+2-26
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,11 @@
33

44
from sentry.eventstream.types import EventStreamEventType
55
from sentry.incidents.grouptype import MetricAlertFire
6-
from sentry.incidents.utils.types import QuerySubscriptionUpdate
76
from sentry.issues.grouptype import ErrorGroupType
87
from sentry.issues.ingest import save_issue_occurrence
98
from sentry.models.group import Group
109
from sentry.tasks.post_process import post_process_group
1110
from sentry.testutils.helpers.features import with_feature
12-
from sentry.workflow_engine.models import DataPacket, DataSource
1311
from sentry.workflow_engine.models.data_condition import Condition
1412
from sentry.workflow_engine.processors import process_data_sources, process_detectors
1513
from sentry.workflow_engine.types import DetectorPriorityLevel
@@ -77,36 +75,14 @@ def call_post_process_group(
7775

7876
return cache_key
7977

80-
def create_test_data_source(self) -> DataSource:
81-
self.subscription_update: QuerySubscriptionUpdate = {
82-
"subscription_id": "123",
83-
"values": {"foo": 1},
84-
"timestamp": datetime.utcnow(),
85-
"entity": "test-entity",
86-
}
87-
88-
self.data_source = self.create_data_source(
89-
query_id=self.subscription_update["subscription_id"],
90-
organization=self.organization,
91-
)
92-
self.data_source.detectors.add(self.detector)
93-
94-
# Create a data_packet from the update for testing
95-
self.data_packet = DataPacket[QuerySubscriptionUpdate](
96-
query_id=self.subscription_update["subscription_id"],
97-
packet=self.subscription_update,
98-
)
99-
100-
return self.data_source
101-
10278

10379
class TestWorkflowEngineIntegrationToIssuePlatform(BaseWorkflowIntegrationTest):
10480
@with_feature("organizations:workflow-engine-metric-alert-processing")
10581
def test_workflow_engine__data_source__to_metric_issue_workflow(self):
10682
"""
10783
This test ensures that a data_source can create the correct event in Issue Platform
10884
"""
109-
self.create_test_data_source()
85+
self.data_source, self.data_packet = self.create_test_query_data_source(self.detector)
11086

11187
with mock.patch(
11288
"sentry.workflow_engine.processors.detector.produce_occurrence_to_kafka"
@@ -121,7 +97,7 @@ def test_workflow_engine__data_source__to_metric_issue_workflow(self):
12197

12298
@with_feature("organizations:workflow-engine-metric-alert-processing")
12399
def test_workflow_engine__data_source__different_type(self):
124-
self.create_test_data_source()
100+
self.data_source, self.data_packet = self.create_test_query_data_source(self.detector)
125101

126102
with mock.patch(
127103
"sentry.workflow_engine.processors.detector.produce_occurrence_to_kafka"

0 commit comments

Comments
 (0)