diff --git a/src/sentry/buffer/base.py b/src/sentry/buffer/base.py index a9cde77448f9e3..88a1305b83bcdc 100644 --- a/src/sentry/buffer/base.py +++ b/src/sentry/buffer/base.py @@ -8,6 +8,8 @@ from sentry.tasks.process_buffer import process_incr from sentry.utils.services import Service +BufferField = models.Model | str | int + class Buffer(Service): """ @@ -50,14 +52,10 @@ def get( """ return {col: 0 for col in columns} - def get_hash( - self, model: type[models.Model], field: dict[str, models.Model | str | int] - ) -> dict[str, str]: + def get_hash(self, model: type[models.Model], field: dict[str, BufferField]) -> dict[str, str]: return {} - def get_hash_length( - self, model: type[models.Model], field: dict[str, models.Model | str | int] - ) -> int: + def get_hash_length(self, model: type[models.Model], field: dict[str, BufferField]) -> int: raise NotImplementedError def get_sorted_set(self, key: str, min: float, max: float) -> list[tuple[int, datetime]]: @@ -69,7 +67,7 @@ def push_to_sorted_set(self, key: str, value: list[int] | int) -> None: def push_to_hash( self, model: type[models.Model], - filters: dict[str, models.Model | str | int], + filters: dict[str, BufferField], field: str, value: str, ) -> None: @@ -78,7 +76,7 @@ def push_to_hash( def push_to_hash_bulk( self, model: type[models.Model], - filters: dict[str, models.Model | str | int], + filters: dict[str, BufferField], data: dict[str, str], ) -> None: raise NotImplementedError @@ -86,7 +84,7 @@ def push_to_hash_bulk( def delete_hash( self, model: type[models.Model], - filters: dict[str, models.Model | str | int], + filters: dict[str, BufferField], fields: list[str], ) -> None: return None @@ -98,7 +96,7 @@ def incr( self, model: type[models.Model], columns: dict[str, int], - filters: dict[str, models.Model | str | int], + filters: dict[str, BufferField], extra: dict[str, Any] | None = None, signal_only: bool | None = None, ) -> None: diff --git a/src/sentry/buffer/redis.py b/src/sentry/buffer/redis.py index 5b0ffbd01aa252..d3f5e1c5ca0f7f 100644 --- a/src/sentry/buffer/redis.py +++ b/src/sentry/buffer/redis.py @@ -13,7 +13,7 @@ from django.utils.encoding import force_bytes, force_str from rediscluster import RedisCluster -from sentry.buffer.base import Buffer +from sentry.buffer.base import Buffer, BufferField from sentry.db import models from sentry.tasks.process_buffer import process_incr from sentry.utils import json, metrics @@ -235,7 +235,7 @@ def __init__(self, incr_batch_size: int = 2, **options: object): def validate(self) -> None: validate_dynamic_cluster(self.is_redis_cluster, self.cluster) - def _coerce_val(self, value: models.Model | str | int) -> bytes: + def _coerce_val(self, value: BufferField) -> bytes: if isinstance(value, models.Model): value = value.pk return force_bytes(value, errors="replace") @@ -395,7 +395,7 @@ def delete_key(self, key: str, min: float, max: float) -> None: def delete_hash( self, model: type[models.Model], - filters: dict[str, models.Model | str | int], + filters: dict[str, BufferField], fields: list[str], ) -> None: key = self._make_key(model, filters) @@ -408,7 +408,7 @@ def delete_hash( def push_to_hash( self, model: type[models.Model], - filters: dict[str, models.Model | str | int], + filters: dict[str, BufferField], field: str, value: str, ) -> None: @@ -418,15 +418,13 @@ def push_to_hash( def push_to_hash_bulk( self, model: type[models.Model], - filters: dict[str, models.Model | str | int], + filters: dict[str, BufferField], data: dict[str, str], ) -> None: key = self._make_key(model, filters) self._execute_redis_operation(key, RedisOperation.HASH_ADD_BULK, data) - def get_hash( - self, model: type[models.Model], field: dict[str, models.Model | str | int] - ) -> dict[str, str]: + def get_hash(self, model: type[models.Model], field: dict[str, BufferField]) -> dict[str, str]: key = self._make_key(model, field) redis_hash = self._execute_redis_operation(key, RedisOperation.HASH_GET_ALL) decoded_hash = {} @@ -439,9 +437,7 @@ def get_hash( return decoded_hash - def get_hash_length( - self, model: type[models.Model], field: dict[str, models.Model | str | int] - ) -> int: + def get_hash_length(self, model: type[models.Model], field: dict[str, BufferField]) -> int: key = self._make_key(model, field) return self._execute_redis_operation(key, RedisOperation.HASH_LENGTH) @@ -455,7 +451,7 @@ def incr( self, model: type[models.Model], columns: dict[str, int], - filters: dict[str, models.Model | str | int], + filters: dict[str, BufferField], extra: dict[str, Any] | None = None, signal_only: bool | None = None, ) -> None: diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 45b25bc5a2298d..1220eb444b5140 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -2820,6 +2820,12 @@ default=10000, flags=FLAG_AUTOMATOR_MODIFIABLE, ) +register( + "delayed_processing.emit_logs", + type=Bool, + default=False, + flags=FLAG_AUTOMATOR_MODIFIABLE, +) register( "celery_split_queue_task_rollout", default={}, diff --git a/src/sentry/rules/processing/buffer_processing.py b/src/sentry/rules/processing/buffer_processing.py new file mode 100644 index 00000000000000..dc676150777937 --- /dev/null +++ b/src/sentry/rules/processing/buffer_processing.py @@ -0,0 +1,160 @@ +import logging +import math +import uuid +from abc import ABC, abstractmethod +from dataclasses import asdict, dataclass +from datetime import datetime, timezone +from itertools import islice +from typing import ClassVar + +from celery import Task + +from sentry import buffer, options +from sentry.buffer.base import BufferField +from sentry.buffer.redis import BufferHookEvent, redis_buffer_registry +from sentry.db import models +from sentry.utils import metrics +from sentry.utils.registry import NoRegistrationExistsError, Registry + +logger = logging.getLogger("sentry.delayed_processing") + + +@dataclass +class FilterKeys: + project_id: int + + +@dataclass +class BufferHashKeys: + model: type[models.Model] + filters: FilterKeys + + +class DelayedProcessingBase(ABC): + buffer_key: ClassVar[str] + + def __init__(self, project_id: int): + self.project_id = project_id + + @property + @abstractmethod + def hash_args(self) -> BufferHashKeys: + raise NotImplementedError + + @property + @abstractmethod + def processing_task(self) -> Task: + raise NotImplementedError + + +delayed_processing_registry = Registry[type[DelayedProcessingBase]]() + + +def fetch_group_to_event_data( + project_id: int, model: type[models.Model], batch_key: str | None = None +) -> dict[str, str]: + field: dict[str, models.Model | int | str] = { + "project_id": project_id, + } + + if batch_key: + field["batch_key"] = batch_key + + return buffer.backend.get_hash(model=model, field=field) + + +def bucket_num_groups(num_groups: int) -> str: + if num_groups > 1: + magnitude = 10 ** int(math.log10(num_groups)) + return f">{magnitude}" + return "1" + + +def process_in_batches(project_id: int, processing_type: str) -> None: + """ + This will check the number of alertgroup_to_event_data items in the Redis buffer for a project. + + If the number is larger than the batch size, it will chunk the items and process them in batches. + + The batches are replicated into a new redis hash with a unique filter (a uuid) to identify the batch. + We need to use a UUID because these batches can be created in multiple processes and we need to ensure + uniqueness across all of them for the centralized redis buffer. The batches are stored in redis because + we shouldn't pass objects that need to be pickled and 10k items could be problematic in the celery tasks + as arguments could be problematic. Finally, we can't use a pagination system on the data because + redis doesn't maintain the sort order of the hash keys. + + `processing_task` will fetch the batch from redis and process the rules. + """ + batch_size = options.get("delayed_processing.batch_size") + should_emit_logs = options.get("delayed_processing.emit_logs") + log_format = "{}.{}" + + try: + processing_info = delayed_processing_registry.get(processing_type)(project_id) + except NoRegistrationExistsError: + logger.exception(log_format.format(processing_type, "no_registration")) + return + + hash_args = processing_info.hash_args + task = processing_info.processing_task + filters: dict[str, BufferField] = asdict(hash_args.filters) + + event_count = buffer.backend.get_hash_length(model=hash_args.model, field=filters) + metrics.incr( + f"{processing_type}.num_groups", tags={"num_groups": bucket_num_groups(event_count)} + ) + + if event_count < batch_size: + return task.delay(project_id) + + if should_emit_logs: + logger.info( + log_format.format(processing_type, "process_large_batch"), + extra={"project_id": project_id, "count": event_count}, + ) + + # if the dictionary is large, get the items and chunk them. + alertgroup_to_event_data = fetch_group_to_event_data(project_id, hash_args.model) + + with metrics.timer(f"{processing_type}.process_batch.duration"): + items = iter(alertgroup_to_event_data.items()) + + while batch := dict(islice(items, batch_size)): + batch_key = str(uuid.uuid4()) + + buffer.backend.push_to_hash_bulk( + model=hash_args.model, + filters={**filters, "batch_key": batch_key}, + data=batch, + ) + + # remove the batched items from the project alertgroup_to_event_data + buffer.backend.delete_hash(**asdict(hash_args), fields=list(batch.keys())) + + task.delay(project_id, batch_key) + + +def process_buffer() -> None: + fetch_time = datetime.now(tz=timezone.utc) + should_emit_logs = options.get("delayed_processing.emit_logs") + + for processing_type, handler in delayed_processing_registry.registrations.items(): + with metrics.timer(f"{processing_type}.process_all_conditions.duration"): + project_ids = buffer.backend.get_sorted_set( + handler.buffer_key, min=0, max=fetch_time.timestamp() + ) + if should_emit_logs: + log_str = ", ".join( + f"{project_id}: {timestamp}" for project_id, timestamp in project_ids + ) + log_name = f"{processing_type}.project_id_list" + logger.info(log_name, extra={"project_ids": log_str}) + + for project_id, _ in project_ids: + process_in_batches(project_id, processing_type) + + buffer.backend.delete_key(handler.buffer_key, min=0, max=fetch_time.timestamp()) + + +if not redis_buffer_registry.has(BufferHookEvent.FLUSH): + redis_buffer_registry.add_handler(BufferHookEvent.FLUSH, process_buffer) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 4f15030c02f4d4..eb0b4165280121 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -1,17 +1,16 @@ import logging -import math import random import uuid from collections import defaultdict from collections.abc import Sequence from datetime import datetime, timedelta, timezone -from itertools import islice from typing import Any, DefaultDict, NamedTuple +from celery import Task from django.db.models import OuterRef, Subquery -from sentry import buffer, nodestore, options -from sentry.buffer.redis import BufferHookEvent, redis_buffer_registry +from sentry import buffer, nodestore +from sentry.buffer.base import BufferField from sentry.db import models from sentry.eventstore.models import Event, GroupEvent from sentry.issues.issue_occurrence import IssueOccurrence @@ -29,6 +28,12 @@ EventFrequencyConditionData, percent_increase, ) +from sentry.rules.processing.buffer_processing import ( + BufferHashKeys, + DelayedProcessingBase, + FilterKeys, + delayed_processing_registry, +) from sentry.rules.processing.processor import ( PROJECT_ID_BUFFER_LIST_KEY, activate_downstream_actions, @@ -90,6 +95,7 @@ def fetch_project(project_id: int) -> Project | None: return None +# TODO: replace with fetch_group_to_event_data def fetch_rulegroup_to_event_data(project_id: int, batch_key: str | None = None) -> dict[str, str]: field: dict[str, models.Model | int | str] = { "project_id": project_id, @@ -470,87 +476,13 @@ def cleanup_redis_buffer( hashes_to_delete = [ f"{rule}:{group}" for rule, groups in rules_to_groups.items() for group in groups ] - filters: dict[str, models.Model | str | int] = {"project_id": project_id} + filters: dict[str, BufferField] = {"project_id": project_id} if batch_key: filters["batch_key"] = batch_key buffer.backend.delete_hash(model=Project, filters=filters, fields=hashes_to_delete) -def bucket_num_groups(num_groups: int) -> str: - if num_groups > 1: - magnitude = 10 ** int(math.log10(num_groups)) - return f">{magnitude}" - return "1" - - -def process_rulegroups_in_batches(project_id: int): - """ - This will check the number of rulegroup_to_event_data items in the Redis buffer for a project. - - If the number is larger than the batch size, it will chunk the items and process them in batches. - - The batches are replicated into a new redis hash with a unique filter (a uuid) to identify the batch. - We need to use a UUID because these batches can be created in multiple processes and we need to ensure - uniqueness across all of them for the centralized redis buffer. The batches are stored in redis because - we shouldn't pass objects that need to be pickled and 10k items could be problematic in the celery tasks - as arguments could be problematic. Finally, we can't use a pagination system on the data because - redis doesn't maintain the sort order of the hash keys. - - `apply_delayed` will fetch the batch from redis and process the rules. - """ - batch_size = options.get("delayed_processing.batch_size") - event_count = buffer.backend.get_hash_length(Project, {"project_id": project_id}) - metrics.incr( - "delayed_processing.num_groups", tags={"num_groups": bucket_num_groups(event_count)} - ) - - if event_count < batch_size: - return apply_delayed.delay(project_id) - - logger.info( - "delayed_processing.process_large_batch", - extra={"project_id": project_id, "count": event_count}, - ) - - # if the dictionary is large, get the items and chunk them. - rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id) - - with metrics.timer("delayed_processing.process_batch.duration"): - items = iter(rulegroup_to_event_data.items()) - - while batch := dict(islice(items, batch_size)): - batch_key = str(uuid.uuid4()) - - buffer.backend.push_to_hash_bulk( - model=Project, - filters={"project_id": project_id, "batch_key": batch_key}, - data=batch, - ) - - # remove the batched items from the project rulegroup_to_event_data - buffer.backend.delete_hash( - model=Project, filters={"project_id": project_id}, fields=list(batch.keys()) - ) - - apply_delayed.delay(project_id, batch_key) - - -def process_delayed_alert_conditions() -> None: - with metrics.timer("delayed_processing.process_all_conditions.duration"): - fetch_time = datetime.now(tz=timezone.utc) - project_ids = buffer.backend.get_sorted_set( - PROJECT_ID_BUFFER_LIST_KEY, min=0, max=fetch_time.timestamp() - ) - log_str = ", ".join(f"{project_id}: {timestamp}" for project_id, timestamp in project_ids) - logger.info("delayed_processing.project_id_list", extra={"project_ids": log_str}) - - for project_id, _ in project_ids: - process_rulegroups_in_batches(project_id) - - buffer.backend.delete_key(PROJECT_ID_BUFFER_LIST_KEY, min=0, max=fetch_time.timestamp()) - - @instrumented_task( name="sentry.rules.processing.delayed_processing", queue="delayed_rules", @@ -574,7 +506,7 @@ def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **k condition_groups = get_condition_query_groups(alert_rules, rules_to_groups) logger.info( "delayed_processing.condition_groups", - extra={"condition_groups": condition_groups, "project_id": project_id}, + extra={"condition_groups": len(condition_groups), "project_id": project_id}, ) with metrics.timer("delayed_processing.get_condition_group_results.duration"): @@ -602,5 +534,14 @@ def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **k cleanup_redis_buffer(project_id, rules_to_groups, batch_key) -if not redis_buffer_registry.has(BufferHookEvent.FLUSH): - redis_buffer_registry.add_handler(BufferHookEvent.FLUSH, process_delayed_alert_conditions) +@delayed_processing_registry.register("delayed_processing") # default delayed processing +class DelayedRule(DelayedProcessingBase): + buffer_key = PROJECT_ID_BUFFER_LIST_KEY + + @property + def hash_args(self) -> BufferHashKeys: + return BufferHashKeys(model=Project, filters=FilterKeys(project_id=self.project_id)) + + @property + def processing_task(self) -> Task: + return apply_delayed diff --git a/tests/sentry/buffer/test_base.py b/tests/sentry/buffer/test_base.py index 6fd955b6f99f2b..6a5e8a9430d0b8 100644 --- a/tests/sentry/buffer/test_base.py +++ b/tests/sentry/buffer/test_base.py @@ -4,8 +4,7 @@ from django.utils import timezone from pytest import raises -from sentry.buffer.base import Buffer -from sentry.db import models +from sentry.buffer.base import Buffer, BufferField from sentry.models.group import Group from sentry.models.organization import Organization from sentry.models.project import Project @@ -25,7 +24,7 @@ def setUp(self): def test_incr_delays_task(self, process_incr): model = mock.Mock() columns = {"times_seen": 1} - filters: dict[str, models.Model | str | int] = {"id": 1} + filters: dict[str, BufferField] = {"id": 1} self.buf.incr(model, columns, filters) kwargs = dict(model=model, columns=columns, filters=filters, extra=None, signal_only=None) process_incr.apply_async.assert_called_once_with(kwargs=kwargs, headers=mock.ANY) diff --git a/tests/sentry/buffer/test_redis.py b/tests/sentry/buffer/test_redis.py index f9fea38675c931..8fad884f63e8c5 100644 --- a/tests/sentry/buffer/test_redis.py +++ b/tests/sentry/buffer/test_redis.py @@ -18,7 +18,7 @@ ) from sentry.models.group import Group from sentry.models.project import Project -from sentry.rules.processing.delayed_processing import process_delayed_alert_conditions +from sentry.rules.processing.buffer_processing import process_buffer from sentry.rules.processing.processor import PROJECT_ID_BUFFER_LIST_KEY from sentry.testutils.helpers.datetime import freeze_time from sentry.testutils.pytest.fixtures import django_db_all @@ -34,6 +34,7 @@ def _hgetall_decode_keys(client, key, is_redis_cluster): return ret +@pytest.mark.django_db class TestRedisBuffer: @pytest.fixture(params=["cluster", "blaster"]) def buffer(self, set_sentry_option, request): @@ -290,7 +291,7 @@ def test_buffer_hook_registry(self): @mock.patch("sentry.rules.processing.delayed_processing.metrics.timer") def test_callback(self, mock_metrics_timer): - redis_buffer_registry.add_handler(BufferHookEvent.FLUSH, process_delayed_alert_conditions) + redis_buffer_registry.add_handler(BufferHookEvent.FLUSH, process_buffer) self.buf.process_batch() assert mock_metrics_timer.call_count == 1 diff --git a/tests/sentry/rules/processing/test_delayed_processing.py b/tests/sentry/rules/processing/test_delayed_processing.py index b65d5d9a248198..c4da150e07d0e4 100644 --- a/tests/sentry/rules/processing/test_delayed_processing.py +++ b/tests/sentry/rules/processing/test_delayed_processing.py @@ -19,11 +19,15 @@ EventFrequencyCondition, EventFrequencyConditionData, ) +from sentry.rules.processing.buffer_processing import ( + bucket_num_groups, + process_buffer, + process_in_batches, +) from sentry.rules.processing.delayed_processing import ( DataAndGroups, UniqueConditionQuery, apply_delayed, - bucket_num_groups, bulk_fetch_events, cleanup_redis_buffer, generate_unique_queries, @@ -34,8 +38,6 @@ get_rules_to_groups, get_slow_conditions, parse_rulegroup_to_event_data, - process_delayed_alert_conditions, - process_rulegroups_in_batches, ) from sentry.rules.processing.processor import PROJECT_ID_BUFFER_LIST_KEY, RuleProcessor from sentry.testutils.cases import PerformanceIssueTestCase, RuleTestCase, TestCase @@ -761,12 +763,12 @@ def _push_base_events(self) -> None: self.push_to_hash(self.project_two.id, self.rule3.id, self.group3.id, self.event3.event_id) self.push_to_hash(self.project_two.id, self.rule4.id, self.group4.id, self.event4.event_id) - @patch("sentry.rules.processing.delayed_processing.process_rulegroups_in_batches") + @patch("sentry.rules.processing.buffer_processing.process_in_batches") def test_fetches_from_buffer_and_executes(self, mock_process_in_batches): self._push_base_events() # To get the correct mapping, we need to return the correct # rulegroup_event mapping based on the project_id input - process_delayed_alert_conditions() + process_buffer() for project, rule_group_event_mapping in ( (self.project, self.rulegroup_event_mapping_one), @@ -1502,7 +1504,7 @@ def setUp(self): @patch("sentry.rules.processing.delayed_processing.apply_delayed.delay") def test_no_redis_data(self, mock_apply_delayed): - process_rulegroups_in_batches(self.project.id) + process_in_batches(self.project.id, "delayed_processing") mock_apply_delayed.assert_called_once_with(self.project.id) @patch("sentry.rules.processing.delayed_processing.apply_delayed.delay") @@ -1511,7 +1513,7 @@ def test_basic(self, mock_apply_delayed): self.push_to_hash(self.project.id, self.rule.id, self.group_two.id) self.push_to_hash(self.project.id, self.rule.id, self.group_three.id) - process_rulegroups_in_batches(self.project.id) + process_in_batches(self.project.id, "delayed_processing") mock_apply_delayed.assert_called_once_with(self.project.id) @override_options({"delayed_processing.batch_size": 2}) @@ -1521,7 +1523,7 @@ def test_batch(self, mock_apply_delayed): self.push_to_hash(self.project.id, self.rule.id, self.group_two.id) self.push_to_hash(self.project.id, self.rule.id, self.group_three.id) - process_rulegroups_in_batches(self.project.id) + process_in_batches(self.project.id, "delayed_processing") assert mock_apply_delayed.call_count == 2 # Validate the batches are created correctly @@ -1601,7 +1603,7 @@ def test_batched_cleanup(self, mock_apply_delayed): rules_to_groups[self.rule.id].add(group_two.id) rules_to_groups[self.rule.id].add(group_three.id) - process_rulegroups_in_batches(self.project.id) + process_in_batches(self.project.id, "delayed_processing") batch_one_key = mock_apply_delayed.call_args_list[0][0][1] batch_two_key = mock_apply_delayed.call_args_list[1][0][1]