From 32c46eeb49ca1c677ce10014586a9d7bf7edd1ff Mon Sep 17 00:00:00 2001 From: Cathy Teng Date: Fri, 17 Jan 2025 11:41:13 -0800 Subject: [PATCH 1/5] refactor delayed processing batching logic to prepare for workflows --- .../rules/processing/buffer_processing.py | 156 ++++++++++++++++++ .../rules/processing/delayed_processing.py | 99 ++--------- .../processing/test_delayed_processing.py | 18 +- 3 files changed, 183 insertions(+), 90 deletions(-) create mode 100644 src/sentry/rules/processing/buffer_processing.py diff --git a/src/sentry/rules/processing/buffer_processing.py b/src/sentry/rules/processing/buffer_processing.py new file mode 100644 index 00000000000000..a1b91bec317d9b --- /dev/null +++ b/src/sentry/rules/processing/buffer_processing.py @@ -0,0 +1,156 @@ +import logging +import math +import uuid +from abc import ABC, abstractmethod +from collections.abc import Callable +from datetime import datetime, timezone +from itertools import islice +from typing import NotRequired, TypedDict + +from sentry import buffer, options +from sentry.buffer.redis import BufferHookEvent, redis_buffer_registry +from sentry.db import models +from sentry.rules.processing.processor import PROJECT_ID_BUFFER_LIST_KEY +from sentry.utils import metrics +from sentry.utils.registry import NoRegistrationExistsError, Registry + +logger = logging.getLogger("sentry.delayed_processing") + + +class FilterKeys(TypedDict): + project_id: int + batch_key: NotRequired[str] + + +class BufferHashKeys(TypedDict): + model: type[models.Model] + filters: FilterKeys + + +class DelayedProcessingBase(ABC): + def __init__(self, project_id: int): + self.project_id = project_id + + @property + @abstractmethod + def hash_args(self) -> BufferHashKeys: + raise NotImplementedError + + @property + @abstractmethod + def processing_task(self) -> Callable[[int], None]: + raise NotImplementedError + + +delayed_processing_registry = Registry[type[DelayedProcessingBase]]() + + +def fetch_alertgroup_to_event_data( + project_id: int, model: type[models.Model], batch_key: str | None = None +) -> dict[str, str]: + field: dict[str, models.Model | int | str] = { + "project_id": project_id, + } + + if batch_key: + field["batch_key"] = batch_key + + return buffer.backend.get_hash(model=model, field=field) + + +def bucket_num_groups(num_groups: int) -> str: + if num_groups > 1: + magnitude = 10 ** int(math.log10(num_groups)) + return f">{magnitude}" + return "1" + + +def process_project_alerts_in_batches(project_id: int, processing_type: str) -> None: + """ + This will check the number of alertgroup_to_event_data items in the Redis buffer for a project. + + If the number is larger than the batch size, it will chunk the items and process them in batches. + + The batches are replicated into a new redis hash with a unique filter (a uuid) to identify the batch. + We need to use a UUID because these batches can be created in multiple processes and we need to ensure + uniqueness across all of them for the centralized redis buffer. The batches are stored in redis because + we shouldn't pass objects that need to be pickled and 10k items could be problematic in the celery tasks + as arguments could be problematic. Finally, we can't use a pagination system on the data because + redis doesn't maintain the sort order of the hash keys. + + `processing_task` will fetch the batch from redis and process the rules. + """ + batch_size = options.get("delayed_processing.batch_size") + log_format = "{}.{}" + + try: + processing_info = delayed_processing_registry.get(processing_type)(project_id) + except NoRegistrationExistsError: + logger.exception(log_format.format(processing_type, "no_registration")) + return + + hash_args = processing_info.hash_args + task = processing_info.processing_task + + event_count = buffer.backend.get_hash_length( + model=hash_args["model"], field=hash_args["filters"] + ) + metrics.incr( + f"{processing_type}.num_groups", tags={"num_groups": bucket_num_groups(event_count)} + ) + + if event_count < batch_size: + return task.delay(project_id) + + logger.info( + log_format.format(processing_type, "process_large_batch"), + extra={"project_id": project_id, "count": event_count}, + ) + + # if the dictionary is large, get the items and chunk them. + alertgroup_to_event_data = fetch_alertgroup_to_event_data(project_id, hash_args["model"]) + + with metrics.timer(f"{processing_type}.process_batch.duration"): + items = iter(alertgroup_to_event_data.items()) + + while batch := dict(islice(items, batch_size)): + batch_key = str(uuid.uuid4()) + + buffer.backend.push_to_hash_bulk( + model=hash_args["model"], + filters={**hash_args["filters"], "batch_key": batch_key}, + data=batch, + ) + + # remove the batched items from the project alertgroup_to_event_data + buffer.backend.delete_hash(**hash_args, fields=list(batch.keys())) + + task.delay(project_id, batch_key) + + +def process_project_ids( + fetch_time: datetime, buffer_list_key: str, processing_type: str +) -> list[int]: + project_ids = buffer.backend.get_sorted_set(buffer_list_key, min=0, max=fetch_time.timestamp()) + log_str = ", ".join(f"{project_id}: {timestamp}" for project_id, timestamp in project_ids) + log_name = f"{processing_type}.project_id_list" + logger.info(log_name, extra={"project_ids": log_str}) + + for project_id, _ in project_ids: + process_project_alerts_in_batches(project_id, processing_type) + + buffer.backend.delete_key(buffer_list_key, min=0, max=fetch_time.timestamp()) + + +def process_delayed_alert_conditions() -> None: + fetch_time = datetime.now(tz=timezone.utc) + + with metrics.timer("delayed_processing.process_all_conditions.duration"): + process_project_ids(fetch_time, PROJECT_ID_BUFFER_LIST_KEY, "delayed_processing") + + # with metrics.timer("delayed_workflow.process_all_conditions.duration"): + # process_project_ids(fetch_time, WORKFLOW_ENGINE_PROJECT_ID_BUFFER_LIST_KEY, "delayed_workflow") + + +if not redis_buffer_registry.has(BufferHookEvent.FLUSH): + redis_buffer_registry.add_handler(BufferHookEvent.FLUSH, process_delayed_alert_conditions) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 4f15030c02f4d4..18c9184ec3e02c 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -1,17 +1,14 @@ import logging -import math import random import uuid from collections import defaultdict -from collections.abc import Sequence +from collections.abc import Callable, Sequence from datetime import datetime, timedelta, timezone -from itertools import islice from typing import Any, DefaultDict, NamedTuple from django.db.models import OuterRef, Subquery -from sentry import buffer, nodestore, options -from sentry.buffer.redis import BufferHookEvent, redis_buffer_registry +from sentry import buffer, nodestore from sentry.db import models from sentry.eventstore.models import Event, GroupEvent from sentry.issues.issue_occurrence import IssueOccurrence @@ -29,8 +26,12 @@ EventFrequencyConditionData, percent_increase, ) +from sentry.rules.processing.buffer_processing import ( + BufferHashKeys, + DelayedProcessingBase, + delayed_processing_registry, +) from sentry.rules.processing.processor import ( - PROJECT_ID_BUFFER_LIST_KEY, activate_downstream_actions, bulk_get_rule_status, is_condition_slow, @@ -90,6 +91,7 @@ def fetch_project(project_id: int) -> Project | None: return None +# TODO: replace with fetch_alertgroup_to_event_data def fetch_rulegroup_to_event_data(project_id: int, batch_key: str | None = None) -> dict[str, str]: field: dict[str, models.Model | int | str] = { "project_id": project_id, @@ -477,80 +479,6 @@ def cleanup_redis_buffer( buffer.backend.delete_hash(model=Project, filters=filters, fields=hashes_to_delete) -def bucket_num_groups(num_groups: int) -> str: - if num_groups > 1: - magnitude = 10 ** int(math.log10(num_groups)) - return f">{magnitude}" - return "1" - - -def process_rulegroups_in_batches(project_id: int): - """ - This will check the number of rulegroup_to_event_data items in the Redis buffer for a project. - - If the number is larger than the batch size, it will chunk the items and process them in batches. - - The batches are replicated into a new redis hash with a unique filter (a uuid) to identify the batch. - We need to use a UUID because these batches can be created in multiple processes and we need to ensure - uniqueness across all of them for the centralized redis buffer. The batches are stored in redis because - we shouldn't pass objects that need to be pickled and 10k items could be problematic in the celery tasks - as arguments could be problematic. Finally, we can't use a pagination system on the data because - redis doesn't maintain the sort order of the hash keys. - - `apply_delayed` will fetch the batch from redis and process the rules. - """ - batch_size = options.get("delayed_processing.batch_size") - event_count = buffer.backend.get_hash_length(Project, {"project_id": project_id}) - metrics.incr( - "delayed_processing.num_groups", tags={"num_groups": bucket_num_groups(event_count)} - ) - - if event_count < batch_size: - return apply_delayed.delay(project_id) - - logger.info( - "delayed_processing.process_large_batch", - extra={"project_id": project_id, "count": event_count}, - ) - - # if the dictionary is large, get the items and chunk them. - rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id) - - with metrics.timer("delayed_processing.process_batch.duration"): - items = iter(rulegroup_to_event_data.items()) - - while batch := dict(islice(items, batch_size)): - batch_key = str(uuid.uuid4()) - - buffer.backend.push_to_hash_bulk( - model=Project, - filters={"project_id": project_id, "batch_key": batch_key}, - data=batch, - ) - - # remove the batched items from the project rulegroup_to_event_data - buffer.backend.delete_hash( - model=Project, filters={"project_id": project_id}, fields=list(batch.keys()) - ) - - apply_delayed.delay(project_id, batch_key) - - -def process_delayed_alert_conditions() -> None: - with metrics.timer("delayed_processing.process_all_conditions.duration"): - fetch_time = datetime.now(tz=timezone.utc) - project_ids = buffer.backend.get_sorted_set( - PROJECT_ID_BUFFER_LIST_KEY, min=0, max=fetch_time.timestamp() - ) - log_str = ", ".join(f"{project_id}: {timestamp}" for project_id, timestamp in project_ids) - logger.info("delayed_processing.project_id_list", extra={"project_ids": log_str}) - - for project_id, _ in project_ids: - process_rulegroups_in_batches(project_id) - - buffer.backend.delete_key(PROJECT_ID_BUFFER_LIST_KEY, min=0, max=fetch_time.timestamp()) - - @instrumented_task( name="sentry.rules.processing.delayed_processing", queue="delayed_rules", @@ -602,5 +530,12 @@ def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **k cleanup_redis_buffer(project_id, rules_to_groups, batch_key) -if not redis_buffer_registry.has(BufferHookEvent.FLUSH): - redis_buffer_registry.add_handler(BufferHookEvent.FLUSH, process_delayed_alert_conditions) +@delayed_processing_registry.register("delayed_processing") # default delayed processing +class DelayedRule(DelayedProcessingBase): + @property + def hash_args(self) -> BufferHashKeys: + return BufferHashKeys({"model": Project, "filters": {"project_id": self.project_id}}) + + @property + def processing_task(self) -> Callable[[int], None]: + return apply_delayed diff --git a/tests/sentry/rules/processing/test_delayed_processing.py b/tests/sentry/rules/processing/test_delayed_processing.py index b65d5d9a248198..5ecc3f01cbe9b2 100644 --- a/tests/sentry/rules/processing/test_delayed_processing.py +++ b/tests/sentry/rules/processing/test_delayed_processing.py @@ -19,11 +19,15 @@ EventFrequencyCondition, EventFrequencyConditionData, ) +from sentry.rules.processing.buffer_processing import ( + bucket_num_groups, + process_delayed_alert_conditions, + process_project_alerts_in_batches, +) from sentry.rules.processing.delayed_processing import ( DataAndGroups, UniqueConditionQuery, apply_delayed, - bucket_num_groups, bulk_fetch_events, cleanup_redis_buffer, generate_unique_queries, @@ -34,8 +38,6 @@ get_rules_to_groups, get_slow_conditions, parse_rulegroup_to_event_data, - process_delayed_alert_conditions, - process_rulegroups_in_batches, ) from sentry.rules.processing.processor import PROJECT_ID_BUFFER_LIST_KEY, RuleProcessor from sentry.testutils.cases import PerformanceIssueTestCase, RuleTestCase, TestCase @@ -761,7 +763,7 @@ def _push_base_events(self) -> None: self.push_to_hash(self.project_two.id, self.rule3.id, self.group3.id, self.event3.event_id) self.push_to_hash(self.project_two.id, self.rule4.id, self.group4.id, self.event4.event_id) - @patch("sentry.rules.processing.delayed_processing.process_rulegroups_in_batches") + @patch("sentry.rules.processing.buffer_processing.process_project_alerts_in_batches") def test_fetches_from_buffer_and_executes(self, mock_process_in_batches): self._push_base_events() # To get the correct mapping, we need to return the correct @@ -1502,7 +1504,7 @@ def setUp(self): @patch("sentry.rules.processing.delayed_processing.apply_delayed.delay") def test_no_redis_data(self, mock_apply_delayed): - process_rulegroups_in_batches(self.project.id) + process_project_alerts_in_batches(self.project.id, "delayed_processing") mock_apply_delayed.assert_called_once_with(self.project.id) @patch("sentry.rules.processing.delayed_processing.apply_delayed.delay") @@ -1511,7 +1513,7 @@ def test_basic(self, mock_apply_delayed): self.push_to_hash(self.project.id, self.rule.id, self.group_two.id) self.push_to_hash(self.project.id, self.rule.id, self.group_three.id) - process_rulegroups_in_batches(self.project.id) + process_project_alerts_in_batches(self.project.id, "delayed_processing") mock_apply_delayed.assert_called_once_with(self.project.id) @override_options({"delayed_processing.batch_size": 2}) @@ -1521,7 +1523,7 @@ def test_batch(self, mock_apply_delayed): self.push_to_hash(self.project.id, self.rule.id, self.group_two.id) self.push_to_hash(self.project.id, self.rule.id, self.group_three.id) - process_rulegroups_in_batches(self.project.id) + process_project_alerts_in_batches(self.project.id, "delayed_processing") assert mock_apply_delayed.call_count == 2 # Validate the batches are created correctly @@ -1601,7 +1603,7 @@ def test_batched_cleanup(self, mock_apply_delayed): rules_to_groups[self.rule.id].add(group_two.id) rules_to_groups[self.rule.id].add(group_three.id) - process_rulegroups_in_batches(self.project.id) + process_project_alerts_in_batches(self.project.id, "delayed_processing") batch_one_key = mock_apply_delayed.call_args_list[0][0][1] batch_two_key = mock_apply_delayed.call_args_list[1][0][1] From 250ba3cebcc070414840461a8cd5441af9dc23e1 Mon Sep 17 00:00:00 2001 From: Cathy Teng Date: Fri, 17 Jan 2025 12:56:27 -0800 Subject: [PATCH 2/5] fix tests --- .../rules/processing/buffer_processing.py | 31 +++++++++---------- .../rules/processing/delayed_processing.py | 8 +++-- tests/sentry/buffer/test_redis.py | 2 +- 3 files changed, 21 insertions(+), 20 deletions(-) diff --git a/src/sentry/rules/processing/buffer_processing.py b/src/sentry/rules/processing/buffer_processing.py index a1b91bec317d9b..7c8be43f758227 100644 --- a/src/sentry/rules/processing/buffer_processing.py +++ b/src/sentry/rules/processing/buffer_processing.py @@ -2,10 +2,11 @@ import math import uuid from abc import ABC, abstractmethod -from collections.abc import Callable +from dataclasses import asdict, dataclass from datetime import datetime, timezone from itertools import islice -from typing import NotRequired, TypedDict + +from celery import Task from sentry import buffer, options from sentry.buffer.redis import BufferHookEvent, redis_buffer_registry @@ -17,12 +18,13 @@ logger = logging.getLogger("sentry.delayed_processing") -class FilterKeys(TypedDict): +@dataclass +class FilterKeys: project_id: int - batch_key: NotRequired[str] -class BufferHashKeys(TypedDict): +@dataclass +class BufferHashKeys: model: type[models.Model] filters: FilterKeys @@ -38,7 +40,7 @@ def hash_args(self) -> BufferHashKeys: @property @abstractmethod - def processing_task(self) -> Callable[[int], None]: + def processing_task(self) -> Task: raise NotImplementedError @@ -91,10 +93,9 @@ def process_project_alerts_in_batches(project_id: int, processing_type: str) -> hash_args = processing_info.hash_args task = processing_info.processing_task + filters: dict[str, models.Model | str | int] = asdict(hash_args.filters) - event_count = buffer.backend.get_hash_length( - model=hash_args["model"], field=hash_args["filters"] - ) + event_count = buffer.backend.get_hash_length(model=hash_args.model, field=filters) metrics.incr( f"{processing_type}.num_groups", tags={"num_groups": bucket_num_groups(event_count)} ) @@ -108,7 +109,7 @@ def process_project_alerts_in_batches(project_id: int, processing_type: str) -> ) # if the dictionary is large, get the items and chunk them. - alertgroup_to_event_data = fetch_alertgroup_to_event_data(project_id, hash_args["model"]) + alertgroup_to_event_data = fetch_alertgroup_to_event_data(project_id, hash_args.model) with metrics.timer(f"{processing_type}.process_batch.duration"): items = iter(alertgroup_to_event_data.items()) @@ -117,20 +118,18 @@ def process_project_alerts_in_batches(project_id: int, processing_type: str) -> batch_key = str(uuid.uuid4()) buffer.backend.push_to_hash_bulk( - model=hash_args["model"], - filters={**hash_args["filters"], "batch_key": batch_key}, + model=hash_args.model, + filters={**filters, "batch_key": batch_key}, data=batch, ) # remove the batched items from the project alertgroup_to_event_data - buffer.backend.delete_hash(**hash_args, fields=list(batch.keys())) + buffer.backend.delete_hash(**asdict(hash_args), fields=list(batch.keys())) task.delay(project_id, batch_key) -def process_project_ids( - fetch_time: datetime, buffer_list_key: str, processing_type: str -) -> list[int]: +def process_project_ids(fetch_time: datetime, buffer_list_key: str, processing_type: str) -> None: project_ids = buffer.backend.get_sorted_set(buffer_list_key, min=0, max=fetch_time.timestamp()) log_str = ", ".join(f"{project_id}: {timestamp}" for project_id, timestamp in project_ids) log_name = f"{processing_type}.project_id_list" diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 18c9184ec3e02c..f8c8737475d969 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -2,10 +2,11 @@ import random import uuid from collections import defaultdict -from collections.abc import Callable, Sequence +from collections.abc import Sequence from datetime import datetime, timedelta, timezone from typing import Any, DefaultDict, NamedTuple +from celery import Task from django.db.models import OuterRef, Subquery from sentry import buffer, nodestore @@ -29,6 +30,7 @@ from sentry.rules.processing.buffer_processing import ( BufferHashKeys, DelayedProcessingBase, + FilterKeys, delayed_processing_registry, ) from sentry.rules.processing.processor import ( @@ -534,8 +536,8 @@ def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **k class DelayedRule(DelayedProcessingBase): @property def hash_args(self) -> BufferHashKeys: - return BufferHashKeys({"model": Project, "filters": {"project_id": self.project_id}}) + return BufferHashKeys(model=Project, filters=FilterKeys(project_id=self.project_id)) @property - def processing_task(self) -> Callable[[int], None]: + def processing_task(self) -> Task: return apply_delayed diff --git a/tests/sentry/buffer/test_redis.py b/tests/sentry/buffer/test_redis.py index f9fea38675c931..ba573894ac4773 100644 --- a/tests/sentry/buffer/test_redis.py +++ b/tests/sentry/buffer/test_redis.py @@ -18,7 +18,7 @@ ) from sentry.models.group import Group from sentry.models.project import Project -from sentry.rules.processing.delayed_processing import process_delayed_alert_conditions +from sentry.rules.processing.buffer_processing import process_delayed_alert_conditions from sentry.rules.processing.processor import PROJECT_ID_BUFFER_LIST_KEY from sentry.testutils.helpers.datetime import freeze_time from sentry.testutils.pytest.fixtures import django_db_all From e98e919485cb895dca19f634eb0598ae59d72847 Mon Sep 17 00:00:00 2001 From: Cathy Teng Date: Tue, 28 Jan 2025 10:29:42 -0800 Subject: [PATCH 3/5] generalize the logic --- .../rules/processing/buffer_processing.py | 40 +++++++++---------- .../rules/processing/delayed_processing.py | 3 ++ .../processing/test_delayed_processing.py | 16 ++++---- 3 files changed, 31 insertions(+), 28 deletions(-) diff --git a/src/sentry/rules/processing/buffer_processing.py b/src/sentry/rules/processing/buffer_processing.py index 7c8be43f758227..0386ed95ff5012 100644 --- a/src/sentry/rules/processing/buffer_processing.py +++ b/src/sentry/rules/processing/buffer_processing.py @@ -5,13 +5,13 @@ from dataclasses import asdict, dataclass from datetime import datetime, timezone from itertools import islice +from typing import ClassVar from celery import Task from sentry import buffer, options from sentry.buffer.redis import BufferHookEvent, redis_buffer_registry from sentry.db import models -from sentry.rules.processing.processor import PROJECT_ID_BUFFER_LIST_KEY from sentry.utils import metrics from sentry.utils.registry import NoRegistrationExistsError, Registry @@ -30,6 +30,8 @@ class BufferHashKeys: class DelayedProcessingBase(ABC): + buffer_key: ClassVar[str] + def __init__(self, project_id: int): self.project_id = project_id @@ -67,7 +69,7 @@ def bucket_num_groups(num_groups: int) -> str: return "1" -def process_project_alerts_in_batches(project_id: int, processing_type: str) -> None: +def process_in_batches(project_id: int, processing_type: str) -> None: """ This will check the number of alertgroup_to_event_data items in the Redis buffer for a project. @@ -129,27 +131,25 @@ def process_project_alerts_in_batches(project_id: int, processing_type: str) -> task.delay(project_id, batch_key) -def process_project_ids(fetch_time: datetime, buffer_list_key: str, processing_type: str) -> None: - project_ids = buffer.backend.get_sorted_set(buffer_list_key, min=0, max=fetch_time.timestamp()) - log_str = ", ".join(f"{project_id}: {timestamp}" for project_id, timestamp in project_ids) - log_name = f"{processing_type}.project_id_list" - logger.info(log_name, extra={"project_ids": log_str}) - - for project_id, _ in project_ids: - process_project_alerts_in_batches(project_id, processing_type) - - buffer.backend.delete_key(buffer_list_key, min=0, max=fetch_time.timestamp()) - - -def process_delayed_alert_conditions() -> None: +def process_buffer() -> None: fetch_time = datetime.now(tz=timezone.utc) - with metrics.timer("delayed_processing.process_all_conditions.duration"): - process_project_ids(fetch_time, PROJECT_ID_BUFFER_LIST_KEY, "delayed_processing") + for processing_type, handler in delayed_processing_registry.registrations.items(): + with metrics.timer(f"{processing_type}.process_all_conditions.duration"): + project_ids = buffer.backend.get_sorted_set( + handler.buffer_key, min=0, max=fetch_time.timestamp() + ) + log_str = ", ".join( + f"{project_id}: {timestamp}" for project_id, timestamp in project_ids + ) + log_name = f"{processing_type}.project_id_list" + logger.info(log_name, extra={"project_ids": log_str}) + + for project_id, _ in project_ids: + process_in_batches(project_id, processing_type) - # with metrics.timer("delayed_workflow.process_all_conditions.duration"): - # process_project_ids(fetch_time, WORKFLOW_ENGINE_PROJECT_ID_BUFFER_LIST_KEY, "delayed_workflow") + buffer.backend.delete_key(handler.buffer_key, min=0, max=fetch_time.timestamp()) if not redis_buffer_registry.has(BufferHookEvent.FLUSH): - redis_buffer_registry.add_handler(BufferHookEvent.FLUSH, process_delayed_alert_conditions) + redis_buffer_registry.add_handler(BufferHookEvent.FLUSH, process_buffer) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index f8c8737475d969..314f31cd846466 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -34,6 +34,7 @@ delayed_processing_registry, ) from sentry.rules.processing.processor import ( + PROJECT_ID_BUFFER_LIST_KEY, activate_downstream_actions, bulk_get_rule_status, is_condition_slow, @@ -534,6 +535,8 @@ def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **k @delayed_processing_registry.register("delayed_processing") # default delayed processing class DelayedRule(DelayedProcessingBase): + buffer_key = PROJECT_ID_BUFFER_LIST_KEY + @property def hash_args(self) -> BufferHashKeys: return BufferHashKeys(model=Project, filters=FilterKeys(project_id=self.project_id)) diff --git a/tests/sentry/rules/processing/test_delayed_processing.py b/tests/sentry/rules/processing/test_delayed_processing.py index 5ecc3f01cbe9b2..c4da150e07d0e4 100644 --- a/tests/sentry/rules/processing/test_delayed_processing.py +++ b/tests/sentry/rules/processing/test_delayed_processing.py @@ -21,8 +21,8 @@ ) from sentry.rules.processing.buffer_processing import ( bucket_num_groups, - process_delayed_alert_conditions, - process_project_alerts_in_batches, + process_buffer, + process_in_batches, ) from sentry.rules.processing.delayed_processing import ( DataAndGroups, @@ -763,12 +763,12 @@ def _push_base_events(self) -> None: self.push_to_hash(self.project_two.id, self.rule3.id, self.group3.id, self.event3.event_id) self.push_to_hash(self.project_two.id, self.rule4.id, self.group4.id, self.event4.event_id) - @patch("sentry.rules.processing.buffer_processing.process_project_alerts_in_batches") + @patch("sentry.rules.processing.buffer_processing.process_in_batches") def test_fetches_from_buffer_and_executes(self, mock_process_in_batches): self._push_base_events() # To get the correct mapping, we need to return the correct # rulegroup_event mapping based on the project_id input - process_delayed_alert_conditions() + process_buffer() for project, rule_group_event_mapping in ( (self.project, self.rulegroup_event_mapping_one), @@ -1504,7 +1504,7 @@ def setUp(self): @patch("sentry.rules.processing.delayed_processing.apply_delayed.delay") def test_no_redis_data(self, mock_apply_delayed): - process_project_alerts_in_batches(self.project.id, "delayed_processing") + process_in_batches(self.project.id, "delayed_processing") mock_apply_delayed.assert_called_once_with(self.project.id) @patch("sentry.rules.processing.delayed_processing.apply_delayed.delay") @@ -1513,7 +1513,7 @@ def test_basic(self, mock_apply_delayed): self.push_to_hash(self.project.id, self.rule.id, self.group_two.id) self.push_to_hash(self.project.id, self.rule.id, self.group_three.id) - process_project_alerts_in_batches(self.project.id, "delayed_processing") + process_in_batches(self.project.id, "delayed_processing") mock_apply_delayed.assert_called_once_with(self.project.id) @override_options({"delayed_processing.batch_size": 2}) @@ -1523,7 +1523,7 @@ def test_batch(self, mock_apply_delayed): self.push_to_hash(self.project.id, self.rule.id, self.group_two.id) self.push_to_hash(self.project.id, self.rule.id, self.group_three.id) - process_project_alerts_in_batches(self.project.id, "delayed_processing") + process_in_batches(self.project.id, "delayed_processing") assert mock_apply_delayed.call_count == 2 # Validate the batches are created correctly @@ -1603,7 +1603,7 @@ def test_batched_cleanup(self, mock_apply_delayed): rules_to_groups[self.rule.id].add(group_two.id) rules_to_groups[self.rule.id].add(group_three.id) - process_project_alerts_in_batches(self.project.id, "delayed_processing") + process_in_batches(self.project.id, "delayed_processing") batch_one_key = mock_apply_delayed.call_args_list[0][0][1] batch_two_key = mock_apply_delayed.call_args_list[1][0][1] From 314a4dcbf53ac4c98848dc53b14fb09d46228b65 Mon Sep 17 00:00:00 2001 From: Cathy Teng Date: Tue, 28 Jan 2025 12:19:08 -0800 Subject: [PATCH 4/5] fix test --- tests/sentry/buffer/test_redis.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/sentry/buffer/test_redis.py b/tests/sentry/buffer/test_redis.py index ba573894ac4773..ed783fef0b9d23 100644 --- a/tests/sentry/buffer/test_redis.py +++ b/tests/sentry/buffer/test_redis.py @@ -18,7 +18,7 @@ ) from sentry.models.group import Group from sentry.models.project import Project -from sentry.rules.processing.buffer_processing import process_delayed_alert_conditions +from sentry.rules.processing.buffer_processing import process_buffer from sentry.rules.processing.processor import PROJECT_ID_BUFFER_LIST_KEY from sentry.testutils.helpers.datetime import freeze_time from sentry.testutils.pytest.fixtures import django_db_all @@ -290,7 +290,7 @@ def test_buffer_hook_registry(self): @mock.patch("sentry.rules.processing.delayed_processing.metrics.timer") def test_callback(self, mock_metrics_timer): - redis_buffer_registry.add_handler(BufferHookEvent.FLUSH, process_delayed_alert_conditions) + redis_buffer_registry.add_handler(BufferHookEvent.FLUSH, process_buffer) self.buf.process_batch() assert mock_metrics_timer.call_count == 1 From 5612d3e6ce3ce72af88c85c230ecd06d501735e5 Mon Sep 17 00:00:00 2001 From: Cathy Teng Date: Wed, 29 Jan 2025 11:25:46 -0800 Subject: [PATCH 5/5] address nits --- src/sentry/buffer/base.py | 18 +++++------- src/sentry/buffer/redis.py | 20 +++++-------- src/sentry/options/defaults.py | 6 ++++ .../rules/processing/buffer_processing.py | 29 +++++++++++-------- .../rules/processing/delayed_processing.py | 7 +++-- tests/sentry/buffer/test_base.py | 5 ++-- tests/sentry/buffer/test_redis.py | 1 + 7 files changed, 46 insertions(+), 40 deletions(-) diff --git a/src/sentry/buffer/base.py b/src/sentry/buffer/base.py index a9cde77448f9e3..88a1305b83bcdc 100644 --- a/src/sentry/buffer/base.py +++ b/src/sentry/buffer/base.py @@ -8,6 +8,8 @@ from sentry.tasks.process_buffer import process_incr from sentry.utils.services import Service +BufferField = models.Model | str | int + class Buffer(Service): """ @@ -50,14 +52,10 @@ def get( """ return {col: 0 for col in columns} - def get_hash( - self, model: type[models.Model], field: dict[str, models.Model | str | int] - ) -> dict[str, str]: + def get_hash(self, model: type[models.Model], field: dict[str, BufferField]) -> dict[str, str]: return {} - def get_hash_length( - self, model: type[models.Model], field: dict[str, models.Model | str | int] - ) -> int: + def get_hash_length(self, model: type[models.Model], field: dict[str, BufferField]) -> int: raise NotImplementedError def get_sorted_set(self, key: str, min: float, max: float) -> list[tuple[int, datetime]]: @@ -69,7 +67,7 @@ def push_to_sorted_set(self, key: str, value: list[int] | int) -> None: def push_to_hash( self, model: type[models.Model], - filters: dict[str, models.Model | str | int], + filters: dict[str, BufferField], field: str, value: str, ) -> None: @@ -78,7 +76,7 @@ def push_to_hash( def push_to_hash_bulk( self, model: type[models.Model], - filters: dict[str, models.Model | str | int], + filters: dict[str, BufferField], data: dict[str, str], ) -> None: raise NotImplementedError @@ -86,7 +84,7 @@ def push_to_hash_bulk( def delete_hash( self, model: type[models.Model], - filters: dict[str, models.Model | str | int], + filters: dict[str, BufferField], fields: list[str], ) -> None: return None @@ -98,7 +96,7 @@ def incr( self, model: type[models.Model], columns: dict[str, int], - filters: dict[str, models.Model | str | int], + filters: dict[str, BufferField], extra: dict[str, Any] | None = None, signal_only: bool | None = None, ) -> None: diff --git a/src/sentry/buffer/redis.py b/src/sentry/buffer/redis.py index 5b0ffbd01aa252..d3f5e1c5ca0f7f 100644 --- a/src/sentry/buffer/redis.py +++ b/src/sentry/buffer/redis.py @@ -13,7 +13,7 @@ from django.utils.encoding import force_bytes, force_str from rediscluster import RedisCluster -from sentry.buffer.base import Buffer +from sentry.buffer.base import Buffer, BufferField from sentry.db import models from sentry.tasks.process_buffer import process_incr from sentry.utils import json, metrics @@ -235,7 +235,7 @@ def __init__(self, incr_batch_size: int = 2, **options: object): def validate(self) -> None: validate_dynamic_cluster(self.is_redis_cluster, self.cluster) - def _coerce_val(self, value: models.Model | str | int) -> bytes: + def _coerce_val(self, value: BufferField) -> bytes: if isinstance(value, models.Model): value = value.pk return force_bytes(value, errors="replace") @@ -395,7 +395,7 @@ def delete_key(self, key: str, min: float, max: float) -> None: def delete_hash( self, model: type[models.Model], - filters: dict[str, models.Model | str | int], + filters: dict[str, BufferField], fields: list[str], ) -> None: key = self._make_key(model, filters) @@ -408,7 +408,7 @@ def delete_hash( def push_to_hash( self, model: type[models.Model], - filters: dict[str, models.Model | str | int], + filters: dict[str, BufferField], field: str, value: str, ) -> None: @@ -418,15 +418,13 @@ def push_to_hash( def push_to_hash_bulk( self, model: type[models.Model], - filters: dict[str, models.Model | str | int], + filters: dict[str, BufferField], data: dict[str, str], ) -> None: key = self._make_key(model, filters) self._execute_redis_operation(key, RedisOperation.HASH_ADD_BULK, data) - def get_hash( - self, model: type[models.Model], field: dict[str, models.Model | str | int] - ) -> dict[str, str]: + def get_hash(self, model: type[models.Model], field: dict[str, BufferField]) -> dict[str, str]: key = self._make_key(model, field) redis_hash = self._execute_redis_operation(key, RedisOperation.HASH_GET_ALL) decoded_hash = {} @@ -439,9 +437,7 @@ def get_hash( return decoded_hash - def get_hash_length( - self, model: type[models.Model], field: dict[str, models.Model | str | int] - ) -> int: + def get_hash_length(self, model: type[models.Model], field: dict[str, BufferField]) -> int: key = self._make_key(model, field) return self._execute_redis_operation(key, RedisOperation.HASH_LENGTH) @@ -455,7 +451,7 @@ def incr( self, model: type[models.Model], columns: dict[str, int], - filters: dict[str, models.Model | str | int], + filters: dict[str, BufferField], extra: dict[str, Any] | None = None, signal_only: bool | None = None, ) -> None: diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 45b25bc5a2298d..1220eb444b5140 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -2820,6 +2820,12 @@ default=10000, flags=FLAG_AUTOMATOR_MODIFIABLE, ) +register( + "delayed_processing.emit_logs", + type=Bool, + default=False, + flags=FLAG_AUTOMATOR_MODIFIABLE, +) register( "celery_split_queue_task_rollout", default={}, diff --git a/src/sentry/rules/processing/buffer_processing.py b/src/sentry/rules/processing/buffer_processing.py index 0386ed95ff5012..dc676150777937 100644 --- a/src/sentry/rules/processing/buffer_processing.py +++ b/src/sentry/rules/processing/buffer_processing.py @@ -10,6 +10,7 @@ from celery import Task from sentry import buffer, options +from sentry.buffer.base import BufferField from sentry.buffer.redis import BufferHookEvent, redis_buffer_registry from sentry.db import models from sentry.utils import metrics @@ -49,7 +50,7 @@ def processing_task(self) -> Task: delayed_processing_registry = Registry[type[DelayedProcessingBase]]() -def fetch_alertgroup_to_event_data( +def fetch_group_to_event_data( project_id: int, model: type[models.Model], batch_key: str | None = None ) -> dict[str, str]: field: dict[str, models.Model | int | str] = { @@ -85,6 +86,7 @@ def process_in_batches(project_id: int, processing_type: str) -> None: `processing_task` will fetch the batch from redis and process the rules. """ batch_size = options.get("delayed_processing.batch_size") + should_emit_logs = options.get("delayed_processing.emit_logs") log_format = "{}.{}" try: @@ -95,7 +97,7 @@ def process_in_batches(project_id: int, processing_type: str) -> None: hash_args = processing_info.hash_args task = processing_info.processing_task - filters: dict[str, models.Model | str | int] = asdict(hash_args.filters) + filters: dict[str, BufferField] = asdict(hash_args.filters) event_count = buffer.backend.get_hash_length(model=hash_args.model, field=filters) metrics.incr( @@ -105,13 +107,14 @@ def process_in_batches(project_id: int, processing_type: str) -> None: if event_count < batch_size: return task.delay(project_id) - logger.info( - log_format.format(processing_type, "process_large_batch"), - extra={"project_id": project_id, "count": event_count}, - ) + if should_emit_logs: + logger.info( + log_format.format(processing_type, "process_large_batch"), + extra={"project_id": project_id, "count": event_count}, + ) # if the dictionary is large, get the items and chunk them. - alertgroup_to_event_data = fetch_alertgroup_to_event_data(project_id, hash_args.model) + alertgroup_to_event_data = fetch_group_to_event_data(project_id, hash_args.model) with metrics.timer(f"{processing_type}.process_batch.duration"): items = iter(alertgroup_to_event_data.items()) @@ -133,17 +136,19 @@ def process_in_batches(project_id: int, processing_type: str) -> None: def process_buffer() -> None: fetch_time = datetime.now(tz=timezone.utc) + should_emit_logs = options.get("delayed_processing.emit_logs") for processing_type, handler in delayed_processing_registry.registrations.items(): with metrics.timer(f"{processing_type}.process_all_conditions.duration"): project_ids = buffer.backend.get_sorted_set( handler.buffer_key, min=0, max=fetch_time.timestamp() ) - log_str = ", ".join( - f"{project_id}: {timestamp}" for project_id, timestamp in project_ids - ) - log_name = f"{processing_type}.project_id_list" - logger.info(log_name, extra={"project_ids": log_str}) + if should_emit_logs: + log_str = ", ".join( + f"{project_id}: {timestamp}" for project_id, timestamp in project_ids + ) + log_name = f"{processing_type}.project_id_list" + logger.info(log_name, extra={"project_ids": log_str}) for project_id, _ in project_ids: process_in_batches(project_id, processing_type) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 314f31cd846466..eb0b4165280121 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -10,6 +10,7 @@ from django.db.models import OuterRef, Subquery from sentry import buffer, nodestore +from sentry.buffer.base import BufferField from sentry.db import models from sentry.eventstore.models import Event, GroupEvent from sentry.issues.issue_occurrence import IssueOccurrence @@ -94,7 +95,7 @@ def fetch_project(project_id: int) -> Project | None: return None -# TODO: replace with fetch_alertgroup_to_event_data +# TODO: replace with fetch_group_to_event_data def fetch_rulegroup_to_event_data(project_id: int, batch_key: str | None = None) -> dict[str, str]: field: dict[str, models.Model | int | str] = { "project_id": project_id, @@ -475,7 +476,7 @@ def cleanup_redis_buffer( hashes_to_delete = [ f"{rule}:{group}" for rule, groups in rules_to_groups.items() for group in groups ] - filters: dict[str, models.Model | str | int] = {"project_id": project_id} + filters: dict[str, BufferField] = {"project_id": project_id} if batch_key: filters["batch_key"] = batch_key @@ -505,7 +506,7 @@ def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **k condition_groups = get_condition_query_groups(alert_rules, rules_to_groups) logger.info( "delayed_processing.condition_groups", - extra={"condition_groups": condition_groups, "project_id": project_id}, + extra={"condition_groups": len(condition_groups), "project_id": project_id}, ) with metrics.timer("delayed_processing.get_condition_group_results.duration"): diff --git a/tests/sentry/buffer/test_base.py b/tests/sentry/buffer/test_base.py index 6fd955b6f99f2b..6a5e8a9430d0b8 100644 --- a/tests/sentry/buffer/test_base.py +++ b/tests/sentry/buffer/test_base.py @@ -4,8 +4,7 @@ from django.utils import timezone from pytest import raises -from sentry.buffer.base import Buffer -from sentry.db import models +from sentry.buffer.base import Buffer, BufferField from sentry.models.group import Group from sentry.models.organization import Organization from sentry.models.project import Project @@ -25,7 +24,7 @@ def setUp(self): def test_incr_delays_task(self, process_incr): model = mock.Mock() columns = {"times_seen": 1} - filters: dict[str, models.Model | str | int] = {"id": 1} + filters: dict[str, BufferField] = {"id": 1} self.buf.incr(model, columns, filters) kwargs = dict(model=model, columns=columns, filters=filters, extra=None, signal_only=None) process_incr.apply_async.assert_called_once_with(kwargs=kwargs, headers=mock.ANY) diff --git a/tests/sentry/buffer/test_redis.py b/tests/sentry/buffer/test_redis.py index ed783fef0b9d23..8fad884f63e8c5 100644 --- a/tests/sentry/buffer/test_redis.py +++ b/tests/sentry/buffer/test_redis.py @@ -34,6 +34,7 @@ def _hgetall_decode_keys(client, key, is_redis_cluster): return ret +@pytest.mark.django_db class TestRedisBuffer: @pytest.fixture(params=["cluster", "blaster"]) def buffer(self, set_sentry_option, request):