Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(workflow_engine): Split fast and slow condition evaluation #84275

Merged
merged 12 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/sentry/workflow_engine/handlers/detector/stateful.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,9 @@ def evaluate_group_key_value(
# store these in `DetectorStateData.counter_updates`, but we don't have anywhere to set the required
# thresholds at the moment. Probably should be a field on the Detector? Could also be on the condition
# level, but usually we want to set this at a higher level.
# TODO 2: Validate that we will never have slow conditions here.
new_status = DetectorPriorityLevel.OK
is_group_condition_met, condition_results = evaluate_condition_group(
is_group_condition_met, condition_results, _ = evaluate_condition_group(
self.condition_group, value
)

Expand Down
10 changes: 6 additions & 4 deletions src/sentry/workflow_engine/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,19 @@ class Meta:
)
]

def evaluate_trigger_conditions(self, job: WorkflowJob) -> bool:
def evaluate_trigger_conditions(self, job: WorkflowJob) -> tuple[bool, list[DataCondition]]:
"""
Evaluate the conditions for the workflow trigger and return if the evaluation was successful.
If there aren't any workflow trigger conditions, the workflow is considered triggered.
"""
if self.when_condition_group is None:
return True
return True, []

job["workflow"] = self
evaluation, _ = evaluate_condition_group(self.when_condition_group, job)
return evaluation
evaluation, _, remaining_conditions = evaluate_condition_group(
self.when_condition_group, job
)
return evaluation, remaining_conditions


def get_slow_conditions(workflow: Workflow) -> list[DataCondition]:
Expand Down
37 changes: 8 additions & 29 deletions src/sentry/workflow_engine/processors/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@

from sentry.db.models.manager.base_query_set import BaseQuerySet
from sentry.models.group import Group
from sentry.workflow_engine.models import Action, ActionGroupStatus, DataConditionGroup, Workflow
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group
from sentry.workflow_engine.types import WorkflowJob
from sentry.workflow_engine.models import (
Action,
ActionGroupStatus,
DataCondition,
DataConditionGroup,
)

EnqueuedAction = tuple[DataConditionGroup, list[DataCondition]]


def get_action_last_updated_statuses(now: datetime, actions: BaseQuerySet[Action], group: Group):
Expand Down Expand Up @@ -74,29 +79,3 @@ def filter_recently_fired_workflow_actions(
filtered_actions = actions.filter(id__in=actions_to_include | actions_without_statuses_ids)

return filtered_actions


def evaluate_workflow_action_filters(
workflows: set[Workflow], job: WorkflowJob
) -> BaseQuerySet[Action]:
filtered_action_groups: set[DataConditionGroup] = set()

# gets the list of the workflow ids, and then get the workflow_data_condition_groups for those workflows
workflow_ids = {workflow.id for workflow in workflows}

action_conditions = DataConditionGroup.objects.filter(
workflowdataconditiongroup__workflow_id__in=workflow_ids
).distinct()

for action_condition in action_conditions:
evaluation, result = evaluate_condition_group(action_condition, job)

if evaluation:
filtered_action_groups.add(action_condition)

# get the actions for any of the triggered data condition groups
actions = Action.objects.filter(
dataconditiongroupaction__condition_group__in=filtered_action_groups
).distinct()

return filter_recently_fired_workflow_actions(actions, job["event"].group)
16 changes: 16 additions & 0 deletions src/sentry/workflow_engine/processors/data_condition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from sentry.workflow_engine.models.data_condition import DataCondition, is_slow_condition


def split_conditions_by_speed(
conditions: list[DataCondition],
) -> tuple[list[DataCondition], list[DataCondition]]:
fast_conditions: list[DataCondition] = []
slow_conditions: list[DataCondition] = []

for condition in conditions:
if is_slow_condition(condition):
slow_conditions.append(condition)
else:
fast_conditions.append(condition)

return fast_conditions, slow_conditions
85 changes: 59 additions & 26 deletions src/sentry/workflow_engine/processors/data_condition_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@

from sentry.utils.function_cache import cache_func_for_models
from sentry.workflow_engine.models import DataCondition, DataConditionGroup
from sentry.workflow_engine.types import ProcessedDataConditionResult
from sentry.workflow_engine.processors.data_condition import split_conditions_by_speed
from sentry.workflow_engine.types import DataConditionResult, ProcessedDataConditionResult

logger = logging.getLogger(__name__)

T = TypeVar("T")

DataConditionGroupResult = tuple[bool, list[DataConditionResult], list[DataCondition]]


@cache_func_for_models(
[(DataCondition, lambda condition: (condition.condition_group_id,))],
Expand All @@ -18,19 +21,54 @@ def get_data_conditions_for_group(data_condition_group_id: int) -> list[DataCond
return list(DataCondition.objects.filter(condition_group_id=data_condition_group_id))


def process_condition_group_results(
results: list[tuple[bool, DataConditionResult]],
logic_type: str,
) -> ProcessedDataConditionResult:
logic_result = False
condition_results = []

if logic_type == DataConditionGroup.Type.NONE:
# if we get to this point, no conditions were met
# because we would have short-circuited
logic_result = True

elif logic_type == DataConditionGroup.Type.ANY:
logic_result = any([result[0] for result in results])

if logic_result:
condition_results = [result[1] for result in results if result[0]]

elif logic_type == DataConditionGroup.Type.ALL:
conditions_met = [result[0] for result in results]
logic_result = all(conditions_met)

if logic_result:
condition_results = [result[1] for result in results if result[0]]

return logic_result, condition_results


def evaluate_condition_group(
data_condition_group: DataConditionGroup,
value: T,
) -> ProcessedDataConditionResult:
is_fast: bool = True,
) -> DataConditionGroupResult:
"""
Evaluate the conditions for a given group and value.
"""
results = []
results: list[tuple[bool, DataConditionResult]] = []
conditions = get_data_conditions_for_group(data_condition_group.id)

if is_fast:
conditions, remaining_conditions = split_conditions_by_speed(conditions)
else:
_, conditions = split_conditions_by_speed(conditions)
remaining_conditions = []

if len(conditions) == 0:
# if we don't have any conditions, always return True
return True, []
return True, [], remaining_conditions

for condition in conditions:
evaluation_result = condition.evaluate_value(value)
Expand All @@ -39,46 +77,41 @@ def evaluate_condition_group(
if is_condition_triggered:
# Check for short-circuiting evaluations
if data_condition_group.logic_type == data_condition_group.Type.ANY_SHORT_CIRCUIT:
return is_condition_triggered, [evaluation_result]
return is_condition_triggered, [evaluation_result], []

if data_condition_group.logic_type == data_condition_group.Type.NONE:
return False, []
return False, [], []

results.append((is_condition_triggered, evaluation_result))

if data_condition_group.logic_type == data_condition_group.Type.NONE:
# if we get to this point, no conditions were met
return True, []

elif data_condition_group.logic_type == data_condition_group.Type.ANY:
is_any_condition_met = any([result[0] for result in results])
logic_type = data_condition_group.logic_type
logic_result, condition_results = process_condition_group_results(
results,
logic_type,
)

if is_any_condition_met:
condition_results = [result[1] for result in results if result[0]]
return is_any_condition_met, condition_results

elif data_condition_group.logic_type == data_condition_group.Type.ALL:
conditions_met = [result[0] for result in results]
is_all_conditions_met = all(conditions_met)
if (not logic_result and logic_type == DataConditionGroup.Type.ALL) or (
logic_result and logic_type == DataConditionGroup.Type.ANY
):
# if we have a logic type of all and a False result,
# or if we have a logic type of any and a True result
# then we can short-circuit any remaining conditions since we have a completd logic result
remaining_conditions = []

if is_all_conditions_met:
condition_results = [result[1] for result in results if result[0]]
return is_all_conditions_met, condition_results

return False, []
return logic_result, condition_results, remaining_conditions


def process_data_condition_group(
data_condition_group_id: int,
value: Any,
) -> ProcessedDataConditionResult:
) -> DataConditionGroupResult:
try:
group = DataConditionGroup.objects.get_from_cache(id=data_condition_group_id)
except DataConditionGroup.DoesNotExist:
logger.exception(
"DataConditionGroup does not exist",
extra={"id": data_condition_group_id},
)
return False, []
return False, [], []

return evaluate_condition_group(group, value)
63 changes: 53 additions & 10 deletions src/sentry/workflow_engine/processors/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@
import sentry_sdk

from sentry import buffer
from sentry.db.models.manager.base_query_set import BaseQuerySet
from sentry.utils import json, metrics
from sentry.workflow_engine.models import Detector, Workflow, WorkflowDataConditionGroup
from sentry.workflow_engine.models.workflow import get_slow_conditions
from sentry.workflow_engine.processors.action import evaluate_workflow_action_filters
from sentry.workflow_engine.models import (
Action,
DataCondition,
DataConditionGroup,
Detector,
Workflow,
WorkflowDataConditionGroup,
)
from sentry.workflow_engine.processors.action import filter_recently_fired_workflow_actions
from sentry.workflow_engine.processors.data_condition_group import evaluate_condition_group
from sentry.workflow_engine.processors.detector import get_detector_by_event
from sentry.workflow_engine.types import WorkflowJob
Expand All @@ -17,6 +24,7 @@
WORKFLOW_ENGINE_BUFFER_LIST_KEY = "workflow_engine_delayed_processing_buffer"


# TODO remove this method
def get_data_condition_groups_to_fire(
workflows: set[Workflow], job: WorkflowJob
) -> dict[int, list[int]]:
Expand All @@ -30,7 +38,7 @@ def get_data_condition_groups_to_fire(

for workflow_dcg in workflow_dcgs:
action_condition = workflow_dcg.condition_group
evaluation, result = evaluate_condition_group(action_condition, job)
evaluation, result, _ = evaluate_condition_group(action_condition, job)

if evaluation:
workflow_action_groups[workflow_dcg.workflow_id].append(action_condition.id)
Expand Down Expand Up @@ -69,19 +77,54 @@ def evaluate_workflow_triggers(workflows: set[Workflow], job: WorkflowJob) -> se
workflows_to_enqueue: set[Workflow] = set()

for workflow in workflows:
if workflow.evaluate_trigger_conditions(job):
triggered_workflows.add(workflow)
evaluation, remaining_conditions = workflow.evaluate_trigger_conditions(job)
if remaining_conditions:
workflows_to_enqueue.add(workflow)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should enqueue the same thing in the buffer as in evaluate_workflow_action filters, but i assume this is happening in a follow up PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! rather than us trying to create a data structure here and then enqueue a bunch at the end, i ended up just enqueuing directly in the other PR. that made it so we don't need to have an additional query to match the workflows to the data condition groups when enqueuing them.

else:
if get_slow_conditions(workflow):
# enqueue to be evaluated later
workflows_to_enqueue.add(workflow)
if evaluation:
# Only add workflows that have no remaining conditions to check
triggered_workflows.add(workflow)

if workflows_to_enqueue:
enqueue_workflows(workflows_to_enqueue, job)

return triggered_workflows


def evaluate_workflows_action_filters(
workflows: set[Workflow],
job: WorkflowJob,
) -> BaseQuerySet[Action]:
filtered_action_groups: set[DataConditionGroup] = set()
enqueued_conditions: list[DataCondition] = []

# gets the list of the workflow ids, and then get the workflow_data_condition_groups for those workflows
workflow_ids = {workflow.id for workflow in workflows}

action_conditions = DataConditionGroup.objects.filter(
workflowdataconditiongroup__workflow_id__in=workflow_ids
).distinct()

for action_condition in action_conditions:
evaluation, result, remaining_conditions = evaluate_condition_group(action_condition, job)

if remaining_conditions:
# If there are remaining conditions for the action filter to evaluate,
# then return the list of conditions to enqueue
enqueued_conditions.extend(remaining_conditions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i had planned to enqueue the DataConditionGroup, i think we can still do that here because the remaining_conditions can only be slow conditions, or am i missing something about why it's better to enqueue the conditions themselves? or is this more generic in case remaining_conditions might not only be slow conditions in the future?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on second thought, probably need to rethink the enqueuing logic since i am planning to have the key be {workflow_id}:{group_id}:{dcg_id,...,dcg_id}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i ended up changing the enqueue method a bit:

def enqueue_workflow(
workflow: Workflow,
delayed_conditions: list[DataCondition],
event: GroupEvent,
source: WorkflowDataConditionGroupType,
) -> None:
project_id = event.group.project.id
buffer.backend.push_to_sorted_set(key=WORKFLOW_ENGINE_BUFFER_LIST_KEY, value=project_id)
condition_groups = ",".join(
str(condition.condition_group_id) for condition in delayed_conditions
)
value = json.dumps({"event_id": event.event_id, "occurrence_id": event.occurrence_id})
buffer.backend.push_to_hash(
model=Workflow,
filters={"project": project_id},
field=f"{workflow.id}:{event.group.id}:{condition_groups}:{source}",
value=value,
)

Since each condition knows it's condition group, i just made the enqueue method signature a little more ergonomic

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this works. will have to refactor my PR using this 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity, why's that? aren't you just reading from the buffer and that's all the format we chatted about this mornin'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh jk i need to learn to read

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think i was misled that we are passing the conditions be enqueued when we're still actually enqueuing the
DataConditionGroup, do we need to collect all the conditions in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eh, probably don't need them all, we could probably move that list coercion up and out of this method and just take a list of ids in the method signature, but can figure that out on the other PR :)

else:
# if we don't have any other conditions to evaluate, add the action to the list
if evaluation:
filtered_action_groups.add(action_condition)

# get the actions for any of the triggered data condition groups
actions = Action.objects.filter(
dataconditiongroupaction__condition_group__in=filtered_action_groups
).distinct()

return filter_recently_fired_workflow_actions(actions, job["event"].group)


def process_workflows(job: WorkflowJob) -> set[Workflow]:
"""
This method will get the detector based on the event, and then gather the associated workflows.
Expand All @@ -101,7 +144,7 @@ def process_workflows(job: WorkflowJob) -> set[Workflow]:
# Get the workflows, evaluate the when_condition_group, finally evaluate the actions for workflows that are triggered
workflows = set(Workflow.objects.filter(detectorworkflow__detector_id=detector.id).distinct())
triggered_workflows = evaluate_workflow_triggers(workflows, job)
actions = evaluate_workflow_action_filters(triggered_workflows, job)
actions = evaluate_workflows_action_filters(triggered_workflows, job)

with sentry_sdk.start_span(op="workflow_engine.process_workflows.trigger_actions"):
for action in actions:
Expand Down
20 changes: 17 additions & 3 deletions tests/sentry/workflow_engine/models/test_workflow.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from sentry.workflow_engine.models.data_condition import Condition
from sentry.workflow_engine.types import WorkflowJob
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest

Expand All @@ -12,19 +13,32 @@ def setUp(self):
self.job = WorkflowJob({"event": self.group_event})

def test_evaluate_trigger_conditions__condition_new_event__True(self):
evaluation = self.workflow.evaluate_trigger_conditions(self.job)
evaluation, _ = self.workflow.evaluate_trigger_conditions(self.job)
assert evaluation is True

def test_evaluate_trigger_conditions__condition_new_event__False(self):
# Update event to have been seen before
self.group_event.group.times_seen = 5

evaluation = self.workflow.evaluate_trigger_conditions(self.job)
evaluation, _ = self.workflow.evaluate_trigger_conditions(self.job)
assert evaluation is False

def test_evaluate_trigger_conditions__no_conditions(self):
self.workflow.when_condition_group = None
self.workflow.save()

evaluation = self.workflow.evaluate_trigger_conditions(self.job)
evaluation, _ = self.workflow.evaluate_trigger_conditions(self.job)
assert evaluation is True

def test_evaluate_trigger_conditions__slow_condition(self):
# Update group to _all_, since the fast condition is met
self.data_condition_group.update(logic_type="all")

slow_condition = self.create_data_condition(
type=Condition.EVENT_FREQUENCY_COUNT, comparison={"interval": "1d", "value": 7}
)
self.data_condition_group.conditions.add(slow_condition)
evaluation, remaining_conditions = self.workflow.evaluate_trigger_conditions(self.job)

assert evaluation is True
assert remaining_conditions == [slow_condition]
Loading
Loading