-
-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(workflow_engine): Add process_data_packets
method
#82002
Changes from all commits
ddbe3f3
473ccac
039b04c
b43c0a1
9078278
b623aa7
4670587
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
from sentry.workflow_engine.handlers.detector import DetectorEvaluationResult | ||
from sentry.workflow_engine.models import DataPacket, Detector | ||
from sentry.workflow_engine.processors.data_source import process_data_sources | ||
from sentry.workflow_engine.processors.detector import process_detectors | ||
from sentry.workflow_engine.types import DetectorGroupKey | ||
|
||
|
||
def process_data_packets( | ||
data_packets: list[DataPacket], query_type: str | ||
) -> list[tuple[Detector, dict[DetectorGroupKey, DetectorEvaluationResult]]]: | ||
Comment on lines
+8
to
+10
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this idea that the snuba subscription result consumer will call this with each packet? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep! That said, it could also work as a batch processor since it takes a list of packets. |
||
""" | ||
This method ties the two main pre-processing methods together to process | ||
the incoming data and create issue occurrences. | ||
""" | ||
processed_sources = process_data_sources(data_packets, query_type) | ||
|
||
results: list[tuple[Detector, dict[DetectorGroupKey, DetectorEvaluationResult]]] = [] | ||
for data_packet, detectors in processed_sources: | ||
detector_results = process_detectors(data_packet, detectors) | ||
|
||
for detector, detector_state in detector_results: | ||
results.append((detector, detector_state)) | ||
|
||
return results |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
from sentry.workflow_engine.processors.data_packet import process_data_packets | ||
from sentry.workflow_engine.types import DetectorPriorityLevel | ||
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest | ||
|
||
|
||
class TestProcessDataPacket(BaseWorkflowTest): | ||
def setUp(self): | ||
self.snuba_query = self.create_snuba_query() | ||
|
||
(self.workflow, self.detector, self.detector_workflow, self.workflow_triggers) = ( | ||
self.create_detector_and_workflow() | ||
) | ||
|
||
self.data_source, self.data_packet = self.create_test_query_data_source(self.detector) | ||
|
||
def test_single_data_packet(self): | ||
results = process_data_packets([self.data_packet], "snuba_query_subscription") | ||
assert len(results) == 1 | ||
|
||
detector, detector_evaluation_result = results[0] | ||
assert detector_evaluation_result[None].priority == DetectorPriorityLevel.HIGH |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,24 @@ | ||
from datetime import datetime | ||
from datetime import UTC, datetime | ||
from uuid import uuid4 | ||
|
||
from sentry.eventstore.models import Event, GroupEvent | ||
from sentry.issues.grouptype import ErrorGroupType | ||
from sentry.incidents.grouptype import MetricAlertFire | ||
from sentry.incidents.utils.types import QuerySubscriptionUpdate | ||
from sentry.models.group import Group | ||
from sentry.snuba.models import SnubaQuery | ||
from sentry.testutils.cases import TestCase | ||
from sentry.testutils.factories import EventType | ||
from sentry.workflow_engine.models import ( | ||
Action, | ||
DataConditionGroup, | ||
DataPacket, | ||
DataSource, | ||
Detector, | ||
DetectorWorkflow, | ||
Workflow, | ||
) | ||
from sentry.workflow_engine.models.data_condition import Condition | ||
from sentry.workflow_engine.types import DetectorPriorityLevel | ||
from tests.sentry.issues.test_utils import OccurrenceTestMixin | ||
|
||
|
||
|
@@ -66,9 +70,13 @@ def create_detector_and_workflow( | |
self, | ||
name_prefix="test", | ||
workflow_triggers: DataConditionGroup | None = None, | ||
detector_type: str = ErrorGroupType.slug, | ||
detector_type: str = MetricAlertFire.slug, | ||
**kwargs, | ||
) -> tuple[Workflow, Detector, DetectorWorkflow, DataConditionGroup]: | ||
""" | ||
Create a Worfkllow, Detector, DetectorWorkflow, and DataConditionGroup for testing. | ||
These models are configured to work together to test the workflow engine. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wouldn't normally nitpick typos like this, but since some tests are failing anyway I thought I would point out this docstring 😅 |
||
""" | ||
workflow_triggers = workflow_triggers or self.create_data_condition_group() | ||
|
||
if not workflow_triggers.conditions.exists(): | ||
|
@@ -100,6 +108,46 @@ def create_detector_and_workflow( | |
|
||
return workflow, detector, detector_workflow, workflow_triggers | ||
|
||
def create_test_query_data_source(self, detector) -> tuple[DataSource, DataPacket]: | ||
""" | ||
Create a DataSource and DataPacket for testing; this will create a fake QuerySubscriptionUpdate and link it to a data_source. | ||
|
||
A detector is required to create this test data, so we can ensure that the detector | ||
has a condition to evaluate for the data_packet that evalutes to true. | ||
""" | ||
subscription_update: QuerySubscriptionUpdate = { | ||
"subscription_id": "123", | ||
"values": {"foo": 1}, | ||
"timestamp": datetime.now(UTC), | ||
"entity": "test-entity", | ||
} | ||
|
||
data_source = self.create_data_source( | ||
query_id=subscription_update["subscription_id"], | ||
organization=self.organization, | ||
) | ||
|
||
data_source.detectors.add(detector) | ||
|
||
if detector.workflow_condition_group is None: | ||
detector.workflow_condition_group = self.create_data_condition_group(logic_type="any") | ||
detector.save() | ||
|
||
self.create_data_condition( | ||
condition_group=detector.workflow_condition_group, | ||
type=Condition.EQUAL, | ||
condition_result=DetectorPriorityLevel.HIGH, | ||
comparison=1, | ||
) | ||
|
||
# Create a data_packet from the update for testing | ||
data_packet = DataPacket[QuerySubscriptionUpdate]( | ||
query_id=subscription_update["subscription_id"], | ||
packet=subscription_update, | ||
) | ||
|
||
return data_source, data_packet | ||
|
||
def create_workflow_action( | ||
self, | ||
workflow: Workflow, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Colleen creates an enum for this in #81953