Skip to content

Commit 67d137e

Browse files
committed
add enqueue workflow with a list of condition group ids
1 parent 1a1c839 commit 67d137e

File tree

2 files changed

+46
-57
lines changed

2 files changed

+46
-57
lines changed

src/sentry/workflow_engine/processors/workflow.py

+46-52
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
import logging
2-
from collections import defaultdict
2+
from enum import StrEnum
33

44
import sentry_sdk
55

66
from sentry import buffer
77
from sentry.db.models.manager.base_query_set import BaseQuerySet
8+
from sentry.eventstore.models import GroupEvent
89
from sentry.utils import json, metrics
910
from sentry.workflow_engine.models import (
1011
Action,
1112
DataCondition,
1213
DataConditionGroup,
1314
Detector,
1415
Workflow,
15-
WorkflowDataConditionGroup,
1616
)
1717
from sentry.workflow_engine.processors.action import filter_recently_fired_workflow_actions
1818
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group
@@ -24,70 +24,51 @@
2424
WORKFLOW_ENGINE_BUFFER_LIST_KEY = "workflow_engine_delayed_processing_buffer"
2525

2626

27-
# TODO remove this method
28-
def get_data_condition_groups_to_fire(
29-
workflows: set[Workflow], job: WorkflowJob
30-
) -> dict[int, list[int]]:
31-
workflow_action_groups: dict[int, list[int]] = defaultdict(list)
27+
class WorkflowDataConditionGroupType(StrEnum):
28+
ACTION_FILTER = "action_filter"
29+
WORKFLOW_TRIGGER = "workflow_trigger"
3230

33-
workflow_ids = {workflow.id for workflow in workflows}
34-
35-
workflow_dcgs = WorkflowDataConditionGroup.objects.filter(
36-
workflow_id__in=workflow_ids
37-
).select_related("condition_group", "workflow")
38-
39-
for workflow_dcg in workflow_dcgs:
40-
action_condition = workflow_dcg.condition_group
41-
evaluation, result, _ = evaluate_condition_group(action_condition, job)
42-
43-
if evaluation:
44-
workflow_action_groups[workflow_dcg.workflow_id].append(action_condition.id)
4531

46-
return workflow_action_groups
47-
48-
49-
def enqueue_workflows(
50-
workflows: set[Workflow],
51-
job: WorkflowJob,
32+
def enqueue_workflow(
33+
workflow: Workflow,
34+
delayed_conditions: list[DataCondition],
35+
event: GroupEvent,
36+
source: WorkflowDataConditionGroupType,
5237
) -> None:
53-
event = job["event"]
5438
project_id = event.group.project.id
55-
workflow_action_groups = get_data_condition_groups_to_fire(workflows, job)
5639

57-
for workflow in workflows:
58-
buffer.backend.push_to_sorted_set(key=WORKFLOW_ENGINE_BUFFER_LIST_KEY, value=project_id)
59-
60-
action_filters = workflow_action_groups.get(workflow.id, [])
61-
if not action_filters:
62-
continue
40+
buffer.backend.push_to_sorted_set(key=WORKFLOW_ENGINE_BUFFER_LIST_KEY, value=project_id)
6341

64-
action_filter_fields = ":".join(map(str, action_filters))
42+
condition_groups = ",".join(
43+
str(condition.condition_group_id) for condition in delayed_conditions
44+
)
6545

66-
value = json.dumps({"event_id": event.event_id, "occurrence_id": event.occurrence_id})
67-
buffer.backend.push_to_hash(
68-
model=Workflow,
69-
filters={"project": project_id},
70-
field=f"{workflow.id}:{event.group.id}:{action_filter_fields}",
71-
value=value,
72-
)
46+
value = json.dumps({"event_id": event.event_id, "occurrence_id": event.occurrence_id})
47+
buffer.backend.push_to_hash(
48+
model=Workflow,
49+
filters={"project": project_id},
50+
field=f"{workflow.id}:{event.group.id}:{condition_groups}:{source}",
51+
value=value,
52+
)
7353

7454

7555
def evaluate_workflow_triggers(workflows: set[Workflow], job: WorkflowJob) -> set[Workflow]:
7656
triggered_workflows: set[Workflow] = set()
77-
workflows_to_enqueue: set[Workflow] = set()
7857

7958
for workflow in workflows:
8059
evaluation, remaining_conditions = workflow.evaluate_trigger_conditions(job)
60+
8161
if remaining_conditions:
82-
workflows_to_enqueue.add(workflow)
62+
enqueue_workflow(
63+
workflow,
64+
remaining_conditions,
65+
job["event"],
66+
WorkflowDataConditionGroupType.WORKFLOW_TRIGGER,
67+
)
8368
else:
8469
if evaluation:
85-
# Only add workflows that have no remaining conditions to check
8670
triggered_workflows.add(workflow)
8771

88-
if workflows_to_enqueue:
89-
enqueue_workflows(workflows_to_enqueue, job)
90-
9172
return triggered_workflows
9273

9374

@@ -96,7 +77,6 @@ def evaluate_workflows_action_filters(
9677
job: WorkflowJob,
9778
) -> BaseQuerySet[Action]:
9879
filtered_action_groups: set[DataConditionGroup] = set()
99-
enqueued_conditions: list[DataCondition] = []
10080

10181
# gets the list of the workflow ids, and then get the workflow_data_condition_groups for those workflows
10282
workflow_ids = {workflow.id for workflow in workflows}
@@ -111,9 +91,15 @@ def evaluate_workflows_action_filters(
11191
if remaining_conditions:
11292
# If there are remaining conditions for the action filter to evaluate,
11393
# then return the list of conditions to enqueue
114-
enqueued_conditions.extend(remaining_conditions)
94+
condition_group = action_condition.workflowdataconditiongroup_set.first()
95+
if condition_group:
96+
enqueue_workflow(
97+
condition_group.workflow,
98+
remaining_conditions,
99+
job["event"],
100+
WorkflowDataConditionGroupType.ACTION_FILTER,
101+
)
115102
else:
116-
# if we don't have any other conditions to evaluate, add the action to the list
117103
if evaluation:
118104
filtered_action_groups.add(action_condition)
119105

@@ -131,6 +117,8 @@ def process_workflows(job: WorkflowJob) -> set[Workflow]:
131117
Next, it will evaluate the "when" (or trigger) conditions for each workflow, if the conditions are met,
132118
the workflow will be added to a unique list of triggered workflows.
133119
120+
TODO @saponifi3d add metrics for this method
121+
134122
Finally, each of the triggered workflows will have their actions evaluated and executed.
135123
"""
136124
# Check to see if the GroupEvent has an issue occurrence
@@ -143,8 +131,14 @@ def process_workflows(job: WorkflowJob) -> set[Workflow]:
143131

144132
# Get the workflows, evaluate the when_condition_group, finally evaluate the actions for workflows that are triggered
145133
workflows = set(Workflow.objects.filter(detectorworkflow__detector_id=detector.id).distinct())
146-
triggered_workflows = evaluate_workflow_triggers(workflows, job)
147-
actions = evaluate_workflows_action_filters(triggered_workflows, job)
134+
135+
with sentry_sdk.start_span(op="workflow_engine.process_workflows.evaluate_workflow_triggers"):
136+
triggered_workflows = evaluate_workflow_triggers(workflows, job)
137+
138+
with sentry_sdk.start_span(
139+
op="workflow_engine.process_workflows.evaluate_workflows_action_filters"
140+
):
141+
actions = evaluate_workflows_action_filters(triggered_workflows, job)
148142

149143
with sentry_sdk.start_span(op="workflow_engine.process_workflows.trigger_actions"):
150144
for action in actions:

tests/sentry/workflow_engine/processors/test_workflow.py

-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
from datetime import timedelta
22
from unittest import mock
33

4-
import pytest
5-
64
from sentry import buffer
75
from sentry.eventstream.base import GroupState
86
from sentry.grouping.grouptype import ErrorGroupType
@@ -169,7 +167,6 @@ def test_delays_slow_conditions(self):
169167
assert triggered_workflows == set()
170168

171169

172-
@pytest.mark.skip(reason="Skipping this test until enqueue is refactored")
173170
@freeze_time(FROZEN_TIME)
174171
class TestEnqueueWorkflow(BaseWorkflowTest):
175172
buffer_timestamp = (FROZEN_TIME + timedelta(seconds=1)).timestamp()
@@ -241,8 +238,6 @@ def test_enqueues_workflow_any_logic_type(self):
241238
triggered_workflows = evaluate_workflow_triggers({self.workflow}, self.job)
242239
assert not triggered_workflows
243240

244-
process_workflows(self.job)
245-
246241
project_ids = buffer.backend.get_sorted_set(
247242
WORKFLOW_ENGINE_BUFFER_LIST_KEY, 0, self.buffer_timestamp
248243
)

0 commit comments

Comments
 (0)