-
-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ref(rules): refactor delayed processing batching logic to prepare for workflows #83670
Conversation
🔍 Existing Issues For ReviewYour pull request is modifying functions with the following pre-existing issues: 📄 File: src/sentry/rules/processing/delayed_processing.py
Did you find this useful? React with a 👍 or 👎 |
# with metrics.timer("delayed_workflow.process_all_conditions.duration"): | ||
# process_project_ids(fetch_time, WORKFLOW_ENGINE_PROJECT_ID_BUFFER_LIST_KEY, "delayed_workflow") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
example of how this will be done for workflow engine
❌ 1 Tests Failed:
View the top 1 failed tests by shortest run time
To view more test analytics, go to the Test Analytics Dashboard |
try: | ||
processing_info = delayed_processing_registry.get(processing_type)(project_id) | ||
except NoRegistrationExistsError: | ||
logger.exception(log_format.format(processing_type, "no_registration")) | ||
return | ||
|
||
hash_args = processing_info.hash_args | ||
task = processing_info.processing_task | ||
filters: dict[str, models.Model | str | int] = asdict(hash_args.filters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is new, depending on the processing_type
(basically if we're processing rules or workflows) we'll have different args to pass when getting things from the buffer or pushing to it, and we'll have a different task to kick off to query snuba to evaluate slow conditions.
the rest of the logic below just uses this information and is the same as before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we give models.Model | str | int
type a name to help illustrate the different args for this?
2a7b002
to
a8e8717
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall i feel like this approach is great, i think it's mostly making sure we're making this generic in a nicely composable way. i think the biggest callout is that i think we should take a look at how we're registering to the FLUSH event in the buffer. right now i think it's a little split between two worlds, being generic and being specific. if the desire is to be highly generic, then i think we should rename some stuff or maybe move some code around. if teh desire is to be specific; we could change the buffer registry to have multiple handlers for a single event.
a8e8717
to
e98e919
Compare
@saponifi3d i made it more generic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this approach looks great, just the nits / little cleanups 🎉
try: | ||
processing_info = delayed_processing_registry.get(processing_type)(project_id) | ||
except NoRegistrationExistsError: | ||
logger.exception(log_format.format(processing_type, "no_registration")) | ||
return | ||
|
||
hash_args = processing_info.hash_args | ||
task = processing_info.processing_task | ||
filters: dict[str, models.Model | str | int] = asdict(hash_args.filters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we give models.Model | str | int
type a name to help illustrate the different args for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉 looks great, thanks for all the clean up.
should_emit_logs = options.get("delayed_processing.emit_logs") | ||
|
||
for processing_type, handler in delayed_processing_registry.registrations.items(): | ||
with metrics.timer(f"{processing_type}.process_all_conditions.duration"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have a consistent prefix on the metrics for delayed_processing
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
possibly, i can clean this up in a follow up
Workflow engine will be using the same logic as delayed processing for rules to process slow conditions. This PR refactors the shared bit, the batching logic, to prepare for adding processing for workflows.
There are two differences the two kinds of delayed processing:
To encapsulate the two differences, I've added a registry. Depending on which delayed processing we are doing, we fetch a handler that includes the information above and we use it in batching the task.