Skip to content

Commit

Permalink
feat(workflow_engine): Split fast and slow condition evaluation (#84275)
Browse files Browse the repository at this point in the history
## Description
This PR splits the fast / slow conditions when we evaluate the condition
group. This method will use the condition group logic type to decide
when to return slow conditions or if we can short circuit those
conditions.

Will Create a follow-up PR for enqueuing the slow conditions to the
redis buffer (this was getting large)
- Here's the draft of that PR, just need to add tests
#84283
  • Loading branch information
saponifi3d authored and andrewshie-sentry committed Feb 5, 2025
1 parent 959ab36 commit 4826b4f
Show file tree
Hide file tree
Showing 11 changed files with 383 additions and 127 deletions.
3 changes: 2 additions & 1 deletion src/sentry/workflow_engine/handlers/detector/stateful.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,9 @@ def evaluate_group_key_value(
# store these in `DetectorStateData.counter_updates`, but we don't have anywhere to set the required
# thresholds at the moment. Probably should be a field on the Detector? Could also be on the condition
# level, but usually we want to set this at a higher level.
# TODO 2: Validate that we will never have slow conditions here.
new_status = DetectorPriorityLevel.OK
is_group_condition_met, condition_results = evaluate_condition_group(
is_group_condition_met, condition_results, _ = evaluate_condition_group(
self.condition_group, value
)

Expand Down
10 changes: 6 additions & 4 deletions src/sentry/workflow_engine/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,19 @@ class Meta:
)
]

def evaluate_trigger_conditions(self, job: WorkflowJob) -> bool:
def evaluate_trigger_conditions(self, job: WorkflowJob) -> tuple[bool, list[DataCondition]]:
"""
Evaluate the conditions for the workflow trigger and return if the evaluation was successful.
If there aren't any workflow trigger conditions, the workflow is considered triggered.
"""
if self.when_condition_group is None:
return True
return True, []

job["workflow"] = self
evaluation, _ = evaluate_condition_group(self.when_condition_group, job)
return evaluation
evaluation, _, remaining_conditions = evaluate_condition_group(
self.when_condition_group, job
)
return evaluation, remaining_conditions


def get_slow_conditions(workflow: Workflow) -> list[DataCondition]:
Expand Down
37 changes: 8 additions & 29 deletions src/sentry/workflow_engine/processors/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@

from sentry.db.models.manager.base_query_set import BaseQuerySet
from sentry.models.group import Group
from sentry.workflow_engine.models import Action, ActionGroupStatus, DataConditionGroup, Workflow
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group
from sentry.workflow_engine.types import WorkflowJob
from sentry.workflow_engine.models import (
Action,
ActionGroupStatus,
DataCondition,
DataConditionGroup,
)

EnqueuedAction = tuple[DataConditionGroup, list[DataCondition]]


def get_action_last_updated_statuses(now: datetime, actions: BaseQuerySet[Action], group: Group):
Expand Down Expand Up @@ -74,29 +79,3 @@ def filter_recently_fired_workflow_actions(
filtered_actions = actions.filter(id__in=actions_to_include | actions_without_statuses_ids)

return filtered_actions


def evaluate_workflow_action_filters(
workflows: set[Workflow], job: WorkflowJob
) -> BaseQuerySet[Action]:
filtered_action_groups: set[DataConditionGroup] = set()

# gets the list of the workflow ids, and then get the workflow_data_condition_groups for those workflows
workflow_ids = {workflow.id for workflow in workflows}

action_conditions = DataConditionGroup.objects.filter(
workflowdataconditiongroup__workflow_id__in=workflow_ids
).distinct()

for action_condition in action_conditions:
evaluation, result = evaluate_condition_group(action_condition, job)

if evaluation:
filtered_action_groups.add(action_condition)

# get the actions for any of the triggered data condition groups
actions = Action.objects.filter(
dataconditiongroupaction__condition_group__in=filtered_action_groups
).distinct()

return filter_recently_fired_workflow_actions(actions, job["event"].group)
16 changes: 16 additions & 0 deletions src/sentry/workflow_engine/processors/data_condition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from sentry.workflow_engine.models.data_condition import DataCondition, is_slow_condition


def split_conditions_by_speed(
conditions: list[DataCondition],
) -> tuple[list[DataCondition], list[DataCondition]]:
fast_conditions: list[DataCondition] = []
slow_conditions: list[DataCondition] = []

for condition in conditions:
if is_slow_condition(condition):
slow_conditions.append(condition)
else:
fast_conditions.append(condition)

return fast_conditions, slow_conditions
85 changes: 59 additions & 26 deletions src/sentry/workflow_engine/processors/data_condition_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@

from sentry.utils.function_cache import cache_func_for_models
from sentry.workflow_engine.models import DataCondition, DataConditionGroup
from sentry.workflow_engine.types import ProcessedDataConditionResult
from sentry.workflow_engine.processors.data_condition import split_conditions_by_speed
from sentry.workflow_engine.types import DataConditionResult, ProcessedDataConditionResult

logger = logging.getLogger(__name__)

T = TypeVar("T")

DataConditionGroupResult = tuple[bool, list[DataConditionResult], list[DataCondition]]


@cache_func_for_models(
[(DataCondition, lambda condition: (condition.condition_group_id,))],
Expand All @@ -18,19 +21,54 @@ def get_data_conditions_for_group(data_condition_group_id: int) -> list[DataCond
return list(DataCondition.objects.filter(condition_group_id=data_condition_group_id))


def process_condition_group_results(
results: list[tuple[bool, DataConditionResult]],
logic_type: str,
) -> ProcessedDataConditionResult:
logic_result = False
condition_results = []

if logic_type == DataConditionGroup.Type.NONE:
# if we get to this point, no conditions were met
# because we would have short-circuited
logic_result = True

elif logic_type == DataConditionGroup.Type.ANY:
logic_result = any([result[0] for result in results])

if logic_result:
condition_results = [result[1] for result in results if result[0]]

elif logic_type == DataConditionGroup.Type.ALL:
conditions_met = [result[0] for result in results]
logic_result = all(conditions_met)

if logic_result:
condition_results = [result[1] for result in results if result[0]]

return logic_result, condition_results


def evaluate_condition_group(
data_condition_group: DataConditionGroup,
value: T,
) -> ProcessedDataConditionResult:
is_fast: bool = True,
) -> DataConditionGroupResult:
"""
Evaluate the conditions for a given group and value.
"""
results = []
results: list[tuple[bool, DataConditionResult]] = []
conditions = get_data_conditions_for_group(data_condition_group.id)

if is_fast:
conditions, remaining_conditions = split_conditions_by_speed(conditions)
else:
_, conditions = split_conditions_by_speed(conditions)
remaining_conditions = []

if len(conditions) == 0:
# if we don't have any conditions, always return True
return True, []
return True, [], remaining_conditions

for condition in conditions:
evaluation_result = condition.evaluate_value(value)
Expand All @@ -39,46 +77,41 @@ def evaluate_condition_group(
if is_condition_triggered:
# Check for short-circuiting evaluations
if data_condition_group.logic_type == data_condition_group.Type.ANY_SHORT_CIRCUIT:
return is_condition_triggered, [evaluation_result]
return is_condition_triggered, [evaluation_result], []

if data_condition_group.logic_type == data_condition_group.Type.NONE:
return False, []
return False, [], []

results.append((is_condition_triggered, evaluation_result))

if data_condition_group.logic_type == data_condition_group.Type.NONE:
# if we get to this point, no conditions were met
return True, []

elif data_condition_group.logic_type == data_condition_group.Type.ANY:
is_any_condition_met = any([result[0] for result in results])
logic_type = data_condition_group.logic_type
logic_result, condition_results = process_condition_group_results(
results,
logic_type,
)

if is_any_condition_met:
condition_results = [result[1] for result in results if result[0]]
return is_any_condition_met, condition_results

elif data_condition_group.logic_type == data_condition_group.Type.ALL:
conditions_met = [result[0] for result in results]
is_all_conditions_met = all(conditions_met)
if (not logic_result and logic_type == DataConditionGroup.Type.ALL) or (
logic_result and logic_type == DataConditionGroup.Type.ANY
):
# if we have a logic type of all and a False result,
# or if we have a logic type of any and a True result
# then we can short-circuit any remaining conditions since we have a completd logic result
remaining_conditions = []

if is_all_conditions_met:
condition_results = [result[1] for result in results if result[0]]
return is_all_conditions_met, condition_results

return False, []
return logic_result, condition_results, remaining_conditions


def process_data_condition_group(
data_condition_group_id: int,
value: Any,
) -> ProcessedDataConditionResult:
) -> DataConditionGroupResult:
try:
group = DataConditionGroup.objects.get_from_cache(id=data_condition_group_id)
except DataConditionGroup.DoesNotExist:
logger.exception(
"DataConditionGroup does not exist",
extra={"id": data_condition_group_id},
)
return False, []
return False, [], []

return evaluate_condition_group(group, value)
63 changes: 53 additions & 10 deletions src/sentry/workflow_engine/processors/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@
import sentry_sdk

from sentry import buffer
from sentry.db.models.manager.base_query_set import BaseQuerySet
from sentry.utils import json, metrics
from sentry.workflow_engine.models import Detector, Workflow, WorkflowDataConditionGroup
from sentry.workflow_engine.models.workflow import get_slow_conditions
from sentry.workflow_engine.processors.action import evaluate_workflow_action_filters
from sentry.workflow_engine.models import (
Action,
DataCondition,
DataConditionGroup,
Detector,
Workflow,
WorkflowDataConditionGroup,
)
from sentry.workflow_engine.processors.action import filter_recently_fired_workflow_actions
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group
from sentry.workflow_engine.processors.detector import get_detector_by_event
from sentry.workflow_engine.types import WorkflowJob
Expand All @@ -17,6 +24,7 @@
WORKFLOW_ENGINE_BUFFER_LIST_KEY = "workflow_engine_delayed_processing_buffer"


# TODO remove this method
def get_data_condition_groups_to_fire(
workflows: set[Workflow], job: WorkflowJob
) -> dict[int, list[int]]:
Expand All @@ -30,7 +38,7 @@ def get_data_condition_groups_to_fire(

for workflow_dcg in workflow_dcgs:
action_condition = workflow_dcg.condition_group
evaluation, result = evaluate_condition_group(action_condition, job)
evaluation, result, _ = evaluate_condition_group(action_condition, job)

if evaluation:
workflow_action_groups[workflow_dcg.workflow_id].append(action_condition.id)
Expand Down Expand Up @@ -69,19 +77,54 @@ def evaluate_workflow_triggers(workflows: set[Workflow], job: WorkflowJob) -> se
workflows_to_enqueue: set[Workflow] = set()

for workflow in workflows:
if workflow.evaluate_trigger_conditions(job):
triggered_workflows.add(workflow)
evaluation, remaining_conditions = workflow.evaluate_trigger_conditions(job)
if remaining_conditions:
workflows_to_enqueue.add(workflow)
else:
if get_slow_conditions(workflow):
# enqueue to be evaluated later
workflows_to_enqueue.add(workflow)
if evaluation:
# Only add workflows that have no remaining conditions to check
triggered_workflows.add(workflow)

if workflows_to_enqueue:
enqueue_workflows(workflows_to_enqueue, job)

return triggered_workflows


def evaluate_workflows_action_filters(
workflows: set[Workflow],
job: WorkflowJob,
) -> BaseQuerySet[Action]:
filtered_action_groups: set[DataConditionGroup] = set()
enqueued_conditions: list[DataCondition] = []

# gets the list of the workflow ids, and then get the workflow_data_condition_groups for those workflows
workflow_ids = {workflow.id for workflow in workflows}

action_conditions = DataConditionGroup.objects.filter(
workflowdataconditiongroup__workflow_id__in=workflow_ids
).distinct()

for action_condition in action_conditions:
evaluation, result, remaining_conditions = evaluate_condition_group(action_condition, job)

if remaining_conditions:
# If there are remaining conditions for the action filter to evaluate,
# then return the list of conditions to enqueue
enqueued_conditions.extend(remaining_conditions)
else:
# if we don't have any other conditions to evaluate, add the action to the list
if evaluation:
filtered_action_groups.add(action_condition)

# get the actions for any of the triggered data condition groups
actions = Action.objects.filter(
dataconditiongroupaction__condition_group__in=filtered_action_groups
).distinct()

return filter_recently_fired_workflow_actions(actions, job["event"].group)


def process_workflows(job: WorkflowJob) -> set[Workflow]:
"""
This method will get the detector based on the event, and then gather the associated workflows.
Expand All @@ -101,7 +144,7 @@ def process_workflows(job: WorkflowJob) -> set[Workflow]:
# Get the workflows, evaluate the when_condition_group, finally evaluate the actions for workflows that are triggered
workflows = set(Workflow.objects.filter(detectorworkflow__detector_id=detector.id).distinct())
triggered_workflows = evaluate_workflow_triggers(workflows, job)
actions = evaluate_workflow_action_filters(triggered_workflows, job)
actions = evaluate_workflows_action_filters(triggered_workflows, job)

with sentry_sdk.start_span(op="workflow_engine.process_workflows.trigger_actions"):
for action in actions:
Expand Down
20 changes: 17 additions & 3 deletions tests/sentry/workflow_engine/models/test_workflow.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.types import WorkflowJob
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest

Expand All @@ -12,19 +13,32 @@ def setUp(self):
self.job = WorkflowJob({"event": self.group_event})

def test_evaluate_trigger_conditions__condition_new_event__True(self):
evaluation = self.workflow.evaluate_trigger_conditions(self.job)
evaluation, _ = self.workflow.evaluate_trigger_conditions(self.job)
assert evaluation is True

def test_evaluate_trigger_conditions__condition_new_event__False(self):
# Update event to have been seen before
self.group_event.group.times_seen = 5

evaluation = self.workflow.evaluate_trigger_conditions(self.job)
evaluation, _ = self.workflow.evaluate_trigger_conditions(self.job)
assert evaluation is False

def test_evaluate_trigger_conditions__no_conditions(self):
self.workflow.when_condition_group = None
self.workflow.save()

evaluation = self.workflow.evaluate_trigger_conditions(self.job)
evaluation, _ = self.workflow.evaluate_trigger_conditions(self.job)
assert evaluation is True

def test_evaluate_trigger_conditions__slow_condition(self):
# Update group to _all_, since the fast condition is met
self.data_condition_group.update(logic_type="all")

slow_condition = self.create_data_condition(
type=Condition.EVENT_FREQUENCY_COUNT, comparison={"interval": "1d", "value": 7}
)
self.data_condition_group.conditions.add(slow_condition)
evaluation, remaining_conditions = self.workflow.evaluate_trigger_conditions(self.job)

assert evaluation is True
assert remaining_conditions == [slow_condition]
Loading

0 comments on commit 4826b4f

Please sign in to comment.