Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit Celery Beat event updates on bulk trash-bin actions #5207

Open
wants to merge 2 commits into
base: release/2.024.33
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 31 additions & 30 deletions kobo/apps/trash_bin/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from django_celery_beat.models import (
ClockedSchedule,
PeriodicTask,
PeriodicTasks,
)
from requests.exceptions import HTTPError

Expand All @@ -25,7 +24,11 @@
from .models import TrashStatus
from .models.account import AccountTrash
from .models.project import ProjectTrash
from .utils import delete_asset, replace_user_with_placeholder
from .utils import (
delete_asset,
replace_user_with_placeholder,
signals_temporarily_disconnected,
)


@celery_app.task(
Expand Down Expand Up @@ -205,8 +208,6 @@ def empty_project(project_trash_id: int):

@task_failure.connect(sender=empty_account)
def empty_account_failure(sender=None, **kwargs):
# Force scheduler to refresh
PeriodicTasks.update_changed()

exception = kwargs['exception']
account_trash_id = kwargs['args'][0]
Expand Down Expand Up @@ -234,8 +235,6 @@ def empty_account_retry(sender=None, **kwargs):

@task_failure.connect(sender=empty_project)
def empty_project_failure(sender=None, **kwargs):
# Force scheduler to refresh
PeriodicTasks.update_changed()

exception = kwargs['exception']
project_trash_id = kwargs['args'][0]
Expand Down Expand Up @@ -263,27 +262,29 @@ def empty_project_retry(sender=None, **kwargs):

@celery_app.task
def garbage_collector():
with transaction.atomic():
# Remove orphan periodic tasks
PeriodicTask.objects.exclude(
pk__in=AccountTrash.objects.values_list(
'periodic_task_id', flat=True
),
).filter(
name__startswith=DELETE_USER_STR_PREFIX, clocked__isnull=False
).delete()

PeriodicTask.objects.exclude(
pk__in=ProjectTrash.objects.values_list(
'periodic_task_id', flat=True
),
).filter(
name__startswith=DELETE_PROJECT_STR_PREFIX, clocked__isnull=False
).delete()

# Then, remove clocked schedules
ClockedSchedule.objects.exclude(
pk__in=PeriodicTask.objects.filter(
clocked__isnull=False
).values_list('clocked_id', flat=True),
).delete()

with signals_temporarily_disconnected(delete=True):
with transaction.atomic():
# Remove orphan periodic tasks
PeriodicTask.objects.exclude(
pk__in=AccountTrash.objects.values_list(
'periodic_task_id', flat=True
),
).filter(
name__startswith=DELETE_USER_STR_PREFIX, clocked__isnull=False
).delete()

PeriodicTask.objects.exclude(
pk__in=ProjectTrash.objects.values_list(
'periodic_task_id', flat=True
),
).filter(
name__startswith=DELETE_PROJECT_STR_PREFIX, clocked__isnull=False
).delete()

# Then, remove clocked schedules
ClockedSchedule.objects.exclude(
pk__in=PeriodicTask.objects.filter(
clocked__isnull=False
).values_list('clocked_id', flat=True),
).delete()
90 changes: 58 additions & 32 deletions kobo/apps/trash_bin/utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from __future__ import annotations

import json
from contextlib import contextmanager
from copy import deepcopy
from datetime import timedelta

from django.contrib.auth import get_user_model
from django.conf import settings
from django.db import IntegrityError, models, transaction
from django.db.models import F, Q
from django.db.models.signals import pre_delete
from django.utils.timezone import now
from django.db.models.signals import post_delete, post_save, pre_delete, pre_save
from django.utils import timezone
from django_celery_beat.models import (
ClockedSchedule,
PeriodicTask,
Expand Down Expand Up @@ -103,9 +104,6 @@ def move_to_trash(
username and primary key is retained after deleting all other data.
"""

clocked_time = now() + timedelta(days=grace_period)
clocked = ClockedSchedule.objects.create(clocked_time=clocked_time)

(
trash_model,
fk_field_name,
Expand Down Expand Up @@ -157,25 +155,27 @@ def move_to_trash(
)
)

try:
with signals_temporarily_disconnected(save=True):
clocked_time = timezone.now() + timedelta(days=grace_period)
clocked = ClockedSchedule.objects.create(clocked_time=clocked_time)
trash_model.objects.bulk_create(trash_objects)
try:
periodic_tasks = PeriodicTask.objects.bulk_create(
[
PeriodicTask(
clocked=clocked,
name=task_name_placeholder.format(**ato.metadata),
task=f'kobo.apps.trash_bin.tasks.{task}',
args=json.dumps([ato.id]),
one_off=True,
enabled=not empty_manually,
)
for ato in trash_objects
],
)

periodic_tasks = PeriodicTask.objects.bulk_create(
[
PeriodicTask(
clocked=clocked,
name=task_name_placeholder.format(**ato.metadata),
task=f'kobo.apps.trash_bin.tasks.{task}',
args=json.dumps([ato.id]),
one_off=True,
enabled=not empty_manually,
)
for ato in trash_objects
],
)

except IntegrityError as e:
raise TrashIntegrityError
except IntegrityError:
raise TrashIntegrityError

# Update relationships between periodic task and trash objects
updated_trash_objects = []
Expand Down Expand Up @@ -245,17 +245,9 @@ def put_back(
for obj_dict in objects_list
]
)
try:
# Disconnect `PeriodicTasks` (plural) signal, until `PeriodicTask` (singular)
# delete query finishes to avoid unnecessary DB queries.
# see https://django-celery-beat.readthedocs.io/en/stable/reference/django-celery-beat.models.html#django_celery_beat.models.PeriodicTasks
pre_delete.disconnect(PeriodicTasks.changed, sender=PeriodicTask)
PeriodicTask.objects.only('pk').filter(pk__in=periodic_task_ids).delete()
finally:
pre_delete.connect(PeriodicTasks.changed, sender=PeriodicTask)

# Force celery beat scheduler to refresh
PeriodicTasks.update_changed()
with signals_temporarily_disconnected(delete=True):
PeriodicTask.objects.only('pk').filter(pk__in=periodic_task_ids).delete()


def replace_user_with_placeholder(
Expand Down Expand Up @@ -298,6 +290,40 @@ def replace_user_with_placeholder(
return placeholder_user


@contextmanager
def signals_temporarily_disconnected(save=False, delete=False):
"""
Temporarily disconnects `PeriodicTasks` signals to prevent accumulating
update queries for Celery Beat while bulk operations are in progress.

See https://django-celery-beat.readthedocs.io/en/stable/reference/django-celery-beat.models.html#django_celery_beat.models.PeriodicTasks
"""

try:
if delete:
pre_delete.disconnect(PeriodicTasks.changed, sender=PeriodicTask)
post_delete.disconnect(PeriodicTasks.update_changed, sender=ClockedSchedule)
if save:
pre_save.disconnect(PeriodicTasks.changed, sender=PeriodicTask)
post_save.disconnect(PeriodicTasks.update_changed, sender=ClockedSchedule)
yield
finally:
if delete:
post_delete.connect(PeriodicTasks.update_changed, sender=ClockedSchedule)
pre_delete.connect(PeriodicTasks.changed, sender=PeriodicTask)
if save:
pre_save.connect(PeriodicTasks.changed, sender=PeriodicTask)
post_save.connect(PeriodicTasks.update_changed, sender=ClockedSchedule)

few_minutes_ago = timezone.now() - timedelta(minutes=5)

# Limit the number of `update_changed()` calls to prevent table locking,
# which can create a bottleneck.
if PeriodicTasks.objects.filter(last_update__lt=few_minutes_ago).exists():
# Force celery beat scheduler to refresh
PeriodicTasks.update_changed()


def _delete_submissions(request_author: settings.AUTH_USER_MODEL, asset: 'kpi.Asset'):

(
Expand Down