diff --git a/src/sentry/workflow_engine/models/data_source.py b/src/sentry/workflow_engine/models/data_source.py index ebfbc63fd18f14..320a01709a290e 100644 --- a/src/sentry/workflow_engine/models/data_source.py +++ b/src/sentry/workflow_engine/models/data_source.py @@ -29,7 +29,11 @@ class DataSource(DefaultFieldsModel): __relocation_scope__ = RelocationScope.Organization organization = FlexibleForeignKey("sentry.Organization") + + # Should this be a string so we can support UUID / ints? query_id = BoundedBigIntegerField() + + # TODO - Add a type here type = models.TextField() detectors = models.ManyToManyField("workflow_engine.Detector", through=DataSourceDetector) diff --git a/src/sentry/workflow_engine/processors/__init__.py b/src/sentry/workflow_engine/processors/__init__.py index 700cd48361de44..0dca1394898aeb 100644 --- a/src/sentry/workflow_engine/processors/__init__.py +++ b/src/sentry/workflow_engine/processors/__init__.py @@ -1,6 +1,8 @@ __all__ = [ "process_data_sources", "process_detectors", + "process_workflows", + "process_data_packet", ] from .data_source import process_data_sources diff --git a/src/sentry/workflow_engine/processors/data_packet.py b/src/sentry/workflow_engine/processors/data_packet.py new file mode 100644 index 00000000000000..35997e02f627e3 --- /dev/null +++ b/src/sentry/workflow_engine/processors/data_packet.py @@ -0,0 +1,24 @@ +from sentry.workflow_engine.handlers.detector import DetectorEvaluationResult +from sentry.workflow_engine.models import DataPacket, Detector +from sentry.workflow_engine.processors.data_source import process_data_sources +from sentry.workflow_engine.processors.detector import process_detectors +from sentry.workflow_engine.types import DetectorGroupKey + + +def process_data_packets( + data_packets: list[DataPacket], query_type: str +) -> list[tuple[Detector, dict[DetectorGroupKey, DetectorEvaluationResult]]]: + """ + This method ties the two main pre-processing methods together to process + the incoming data and create issue occurrences. + """ + processed_sources = process_data_sources(data_packets, query_type) + + results: list[tuple[Detector, dict[DetectorGroupKey, DetectorEvaluationResult]]] = [] + for data_packet, detectors in processed_sources: + detector_results = process_detectors(data_packet, detectors) + + for detector, detector_state in detector_results: + results.append((detector, detector_state)) + + return results diff --git a/src/sentry/workflow_engine/processors/data_source.py b/src/sentry/workflow_engine/processors/data_source.py index 52709c302a331f..5df9711f4b7775 100644 --- a/src/sentry/workflow_engine/processors/data_source.py +++ b/src/sentry/workflow_engine/processors/data_source.py @@ -14,6 +14,7 @@ def process_data_sources( ) -> list[tuple[DataPacket, list[Detector]]]: metrics.incr("sentry.workflow_engine.process_data_sources", tags={"query_type": query_type}) + # TODO - change data_source.query_id to be a string to support UUIDs data_packet_ids = {int(packet.query_id) for packet in data_packets} # Fetch all data sources and associated detectors for the given data packets diff --git a/tests/sentry/workflow_engine/processors/test_data_packet.py b/tests/sentry/workflow_engine/processors/test_data_packet.py new file mode 100644 index 00000000000000..cf98d7ba533a16 --- /dev/null +++ b/tests/sentry/workflow_engine/processors/test_data_packet.py @@ -0,0 +1,21 @@ +from sentry.workflow_engine.processors.data_packet import process_data_packets +from sentry.workflow_engine.types import DetectorPriorityLevel +from tests.sentry.workflow_engine.test_base import BaseWorkflowTest + + +class TestProcessDataPacket(BaseWorkflowTest): + def setUp(self): + self.snuba_query = self.create_snuba_query() + + (self.workflow, self.detector, self.detector_workflow, self.workflow_triggers) = ( + self.create_detector_and_workflow() + ) + + self.data_source, self.data_packet = self.create_test_query_data_source(self.detector) + + def test_single_data_packet(self): + results = process_data_packets([self.data_packet], "snuba_query_subscription") + assert len(results) == 1 + + detector, detector_evaluation_result = results[0] + assert detector_evaluation_result[None].priority == DetectorPriorityLevel.HIGH diff --git a/tests/sentry/workflow_engine/processors/test_workflow.py b/tests/sentry/workflow_engine/processors/test_workflow.py index 0b8c080a683b6c..91ddb9e0636b45 100644 --- a/tests/sentry/workflow_engine/processors/test_workflow.py +++ b/tests/sentry/workflow_engine/processors/test_workflow.py @@ -1,6 +1,6 @@ from unittest import mock -from sentry.incidents.grouptype import MetricAlertFire +from sentry.issues.grouptype import ErrorGroupType from sentry.workflow_engine.models import DataConditionGroup from sentry.workflow_engine.models.data_condition import Condition from sentry.workflow_engine.processors.workflow import evaluate_workflow_triggers, process_workflows @@ -14,12 +14,13 @@ def setUp(self): self.detector, self.detector_workflow, self.workflow_triggers, - ) = self.create_detector_and_workflow(detector_type=MetricAlertFire.slug) + ) = self.create_detector_and_workflow() self.error_workflow, self.error_detector, self.detector_workflow_error, _ = ( self.create_detector_and_workflow( name_prefix="error", workflow_triggers=self.create_data_condition_group(), + detector_type=ErrorGroupType.slug, ) ) diff --git a/tests/sentry/workflow_engine/test_base.py b/tests/sentry/workflow_engine/test_base.py index edf4cb531bc4b2..f91fd33c657ad8 100644 --- a/tests/sentry/workflow_engine/test_base.py +++ b/tests/sentry/workflow_engine/test_base.py @@ -1,8 +1,9 @@ -from datetime import datetime +from datetime import UTC, datetime from uuid import uuid4 from sentry.eventstore.models import Event, GroupEvent -from sentry.issues.grouptype import ErrorGroupType +from sentry.incidents.grouptype import MetricAlertFire +from sentry.incidents.utils.types import QuerySubscriptionUpdate from sentry.models.group import Group from sentry.snuba.models import SnubaQuery from sentry.testutils.cases import TestCase @@ -10,11 +11,14 @@ from sentry.workflow_engine.models import ( Action, DataConditionGroup, + DataPacket, + DataSource, Detector, DetectorWorkflow, Workflow, ) from sentry.workflow_engine.models.data_condition import Condition +from sentry.workflow_engine.types import DetectorPriorityLevel from tests.sentry.issues.test_utils import OccurrenceTestMixin @@ -66,9 +70,13 @@ def create_detector_and_workflow( self, name_prefix="test", workflow_triggers: DataConditionGroup | None = None, - detector_type: str = ErrorGroupType.slug, + detector_type: str = MetricAlertFire.slug, **kwargs, ) -> tuple[Workflow, Detector, DetectorWorkflow, DataConditionGroup]: + """ + Create a Worfkllow, Detector, DetectorWorkflow, and DataConditionGroup for testing. + These models are configured to work together to test the workflow engine. + """ workflow_triggers = workflow_triggers or self.create_data_condition_group() if not workflow_triggers.conditions.exists(): @@ -100,6 +108,46 @@ def create_detector_and_workflow( return workflow, detector, detector_workflow, workflow_triggers + def create_test_query_data_source(self, detector) -> tuple[DataSource, DataPacket]: + """ + Create a DataSource and DataPacket for testing; this will create a fake QuerySubscriptionUpdate and link it to a data_source. + + A detector is required to create this test data, so we can ensure that the detector + has a condition to evaluate for the data_packet that evalutes to true. + """ + subscription_update: QuerySubscriptionUpdate = { + "subscription_id": "123", + "values": {"foo": 1}, + "timestamp": datetime.now(UTC), + "entity": "test-entity", + } + + data_source = self.create_data_source( + query_id=subscription_update["subscription_id"], + organization=self.organization, + ) + + data_source.detectors.add(detector) + + if detector.workflow_condition_group is None: + detector.workflow_condition_group = self.create_data_condition_group(logic_type="any") + detector.save() + + self.create_data_condition( + condition_group=detector.workflow_condition_group, + type=Condition.EQUAL, + condition_result=DetectorPriorityLevel.HIGH, + comparison=1, + ) + + # Create a data_packet from the update for testing + data_packet = DataPacket[QuerySubscriptionUpdate]( + query_id=subscription_update["subscription_id"], + packet=subscription_update, + ) + + return data_source, data_packet + def create_workflow_action( self, workflow: Workflow, diff --git a/tests/sentry/workflow_engine/test_integration.py b/tests/sentry/workflow_engine/test_integration.py index 5bad851069246a..314bbfcce9fdcf 100644 --- a/tests/sentry/workflow_engine/test_integration.py +++ b/tests/sentry/workflow_engine/test_integration.py @@ -3,13 +3,11 @@ from sentry.eventstream.types import EventStreamEventType from sentry.incidents.grouptype import MetricAlertFire -from sentry.incidents.utils.types import QuerySubscriptionUpdate from sentry.issues.grouptype import ErrorGroupType from sentry.issues.ingest import save_issue_occurrence from sentry.models.group import Group from sentry.tasks.post_process import post_process_group from sentry.testutils.helpers.features import with_feature -from sentry.workflow_engine.models import DataPacket, DataSource from sentry.workflow_engine.models.data_condition import Condition from sentry.workflow_engine.processors import process_data_sources, process_detectors from sentry.workflow_engine.types import DetectorPriorityLevel @@ -77,28 +75,6 @@ def call_post_process_group( return cache_key - def create_test_data_source(self) -> DataSource: - self.subscription_update: QuerySubscriptionUpdate = { - "subscription_id": "123", - "values": {"foo": 1}, - "timestamp": datetime.utcnow(), - "entity": "test-entity", - } - - self.data_source = self.create_data_source( - query_id=self.subscription_update["subscription_id"], - organization=self.organization, - ) - self.data_source.detectors.add(self.detector) - - # Create a data_packet from the update for testing - self.data_packet = DataPacket[QuerySubscriptionUpdate]( - query_id=self.subscription_update["subscription_id"], - packet=self.subscription_update, - ) - - return self.data_source - class TestWorkflowEngineIntegrationToIssuePlatform(BaseWorkflowIntegrationTest): @with_feature("organizations:workflow-engine-metric-alert-processing") @@ -106,7 +82,7 @@ def test_workflow_engine__data_source__to_metric_issue_workflow(self): """ This test ensures that a data_source can create the correct event in Issue Platform """ - self.create_test_data_source() + self.data_source, self.data_packet = self.create_test_query_data_source(self.detector) with mock.patch( "sentry.workflow_engine.processors.detector.produce_occurrence_to_kafka" @@ -121,7 +97,7 @@ def test_workflow_engine__data_source__to_metric_issue_workflow(self): @with_feature("organizations:workflow-engine-metric-alert-processing") def test_workflow_engine__data_source__different_type(self): - self.create_test_data_source() + self.data_source, self.data_packet = self.create_test_query_data_source(self.detector) with mock.patch( "sentry.workflow_engine.processors.detector.produce_occurrence_to_kafka" @@ -134,7 +110,7 @@ def test_workflow_engine__data_source__different_type(self): @with_feature("organizations:workflow-engine-metric-alert-processing") def test_workflow_engine__data_source__no_detectors(self): - self.create_test_data_source() + self.data_source, self.data_packet = self.create_test_query_data_source(self.detector) self.detector.delete() with mock.patch(