Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TP2000-1434 Prevent packaging queue race conditions #1267

Merged
merged 11 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ repos:
hooks:
- id: add-trailing-comma
- repo: https://github.com/myint/autoflake.git
rev: v2.2.1
rev: v2.3.1
hooks:
- id: autoflake
args: [
Expand All @@ -13,7 +13,7 @@ repos:
"--remove-unused-variable",
]
- repo: https://github.com/pycqa/isort
rev: 5.12.0
rev: 5.13.2
hooks:
- id: isort
args: [--force-single-line-imports]
Expand All @@ -28,7 +28,7 @@ repos:
"--pre-summary-newline",
]
- repo: https://github.com/psf/black
rev: 24.3.0
rev: 24.4.2
hooks:
- id: black
- repo: https://github.com/ikamensh/flynt/
Expand Down
7 changes: 6 additions & 1 deletion common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import magic
import wrapt
from defusedxml.common import DTDForbidden
from django.apps import apps
from django.conf import settings
from django.db import transaction
from django.db.models import F
Expand Down Expand Up @@ -435,7 +436,11 @@ def wrapper(wrapped, instance, args, kwargs):
with atomic():
with transaction.get_connection().cursor() as cursor:
for model in models:
cursor.execute(f"LOCK TABLE {model._meta.db_table}")
if isinstance(model, str):
model = apps.get_model(model)
cursor.execute(
f"LOCK TABLE {model._meta.db_table} IN {lock} MODE",
)

return wrapped(*args, **kwargs)

Expand Down
1 change: 1 addition & 0 deletions publishing/jinja2/includes/envelope-queue.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
class="govuk-link fake-link process-envelope"
name="process_envelope"
value="{{ obj.pk }}"
data-prevent-double-click="true"
>
Start processing <span class="right-arrow">&#9654;</span>
</button>
Expand Down
5 changes: 5 additions & 0 deletions publishing/jinja2/includes/packaged-workbasket-queue.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
class="govuk-link fake-link"
name="promote_position"
value="{{ obj.pk }}"
data-prevent-double-click="true"
>
<img
class="icon"
Expand All @@ -21,6 +22,7 @@
class="govuk-link fake-link"
name="demote_position"
value="{{ obj.pk }}"
data-prevent-double-click="true"
>
<img
class="icon"
Expand Down Expand Up @@ -106,6 +108,7 @@
class="govuk-link fake-link"
name="demote_position"
value="{{ obj.pk }}"
data-prevent-double-click="true"
>
Move down
</button>
Expand All @@ -115,6 +118,7 @@
class="govuk-link fake-link"
name="promote_to_top_position"
value="{{ obj.pk }}"
data-prevent-double-click="true"
>
Move to top
</button>
Expand All @@ -128,6 +132,7 @@
class="govuk-link fake-link"
name="remove_from_queue"
value="{{ obj.pk }}"
data-prevent-double-click="true"
>
Remove
</button>
Expand Down
46 changes: 36 additions & 10 deletions publishing/models/packaged_workbasket.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from django_fsm import transition

from common.models.mixins import TimestampedMixin
from common.util import TableLock
from notifications.models import EnvelopeAcceptedNotification
from notifications.models import EnvelopeReadyForProcessingNotification
from notifications.models import EnvelopeRejectedNotification
Expand Down Expand Up @@ -124,6 +125,7 @@ class PackagedWorkBasketInvalidQueueOperation(Exception):

class PackagedWorkBasketManager(Manager):
@atomic
@TableLock.acquire_lock("publishing.PackagedWorkBasket", lock=TableLock.EXCLUSIVE)
def create(self, workbasket: WorkBasket, **kwargs):
"""Create a new instance, associating with workbasket."""
if workbasket.status in WorkflowStatus.unchecked_statuses():
Expand Down Expand Up @@ -437,6 +439,7 @@ def begin_processing_condition_no_instances_currently_processing(self) -> bool:

return not PackagedWorkBasket.objects.currently_processing()

@atomic
@pop_top_after
@save_after
@transition(
Expand Down Expand Up @@ -466,6 +469,7 @@ def begin_processing(self):
multiple instances it's necessary for this method to perform a save()
operation upon successful transitions.
"""
PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk)
self.processing_started_at = datetime.now()
self.save()

Expand Down Expand Up @@ -621,7 +625,9 @@ def pop_top(self) -> "PackagedWorkBasket":
"because it is not at position 1.",
)

PackagedWorkBasket.objects.filter(position__gt=0).update(
PackagedWorkBasket.objects.select_for_update(nowait=True).filter(
position__gt=0,
).update(
position=F("position") - 1,
)
self.refresh_from_db()
Expand All @@ -638,6 +644,10 @@ def remove_from_queue(self) -> "PackagedWorkBasket":
Management of the queued instance's `processing_state` is not altered by
this function and should be managed separately by the caller.
"""

PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk)
self.refresh_from_db()

if self.position == 0:
raise PackagedWorkBasketInvalidQueueOperation(
"Unable to remove instance with a position value of 0 from "
Expand All @@ -648,7 +658,9 @@ def remove_from_queue(self) -> "PackagedWorkBasket":
self.position = 0
self.save()

PackagedWorkBasket.objects.filter(position__gt=current_position).update(
PackagedWorkBasket.objects.select_for_update(nowait=True).filter(
position__gt=current_position,
).update(
position=F("position") - 1,
)
self.refresh_from_db()
Expand All @@ -661,17 +673,21 @@ def promote_to_top_position(self) -> "PackagedWorkBasket":
"""Promote the instance to the top position of the package processing
queue so that it occupies position 1."""

if self.position == 1:
PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk)
self.refresh_from_db()

if self.position <= 1:
return self

position = self.position

PackagedWorkBasket.objects.filter(
PackagedWorkBasket.objects.select_for_update(nowait=True).filter(
Q(position__gte=1) & Q(position__lt=position),
).update(position=F("position") + 1)

self.position = 1
self.save()
self.refresh_from_db()

return self

Expand All @@ -681,10 +697,15 @@ def promote_position(self) -> "PackagedWorkBasket":
"""Promote the instance by one position up the package processing
queue."""

if self.position == 1:
return
PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk)
self.refresh_from_db()

obj_to_swap = PackagedWorkBasket.objects.get(position=self.position - 1)
if self.position <= 1:
return self

obj_to_swap = PackagedWorkBasket.objects.select_for_update(nowait=True).get(
dalecannon marked this conversation as resolved.
Show resolved Hide resolved
position=self.position - 1,
)
obj_to_swap.position += 1
self.position -= 1
PackagedWorkBasket.objects.bulk_update(
Expand All @@ -701,10 +722,15 @@ def demote_position(self) -> "PackagedWorkBasket":
"""Demote the instance by one position down the package processing
queue."""

if self.position == PackagedWorkBasket.objects.max_position():
return
PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk)
self.refresh_from_db()

obj_to_swap = PackagedWorkBasket.objects.get(position=self.position + 1)
if self.position in {0, PackagedWorkBasket.objects.max_position()}:
return self

obj_to_swap = PackagedWorkBasket.objects.select_for_update(nowait=True).get(
dalecannon marked this conversation as resolved.
Show resolved Hide resolved
position=self.position + 1,
)
obj_to_swap.position -= 1
self.position += 1
PackagedWorkBasket.objects.bulk_update(
Expand Down
31 changes: 31 additions & 0 deletions publishing/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,37 @@ def test_demote_position():
assert initially_second.position == 1


def test_cannot_promote_or_demote_removed_packaged_workbasket():
"""Tests that packaged workbasket positions remain unchanged after
attempting to reposition a packaged workbasket that has since been removed
from the queue."""
with patch(
"publishing.tasks.create_xml_envelope_file.apply_async",
return_value=MagicMock(id=factory.Faker("uuid4")),
):
factories.PackagedWorkBasketFactory()

with patch(
"publishing.tasks.create_xml_envelope_file.apply_async",
return_value=MagicMock(id=factory.Faker("uuid4")),
):
factories.PackagedWorkBasketFactory()

queued_pwb = PackagedWorkBasket.objects.get(position=1)
removed_pwb = PackagedWorkBasket.objects.get(position=2)
removed_pwb.abandon()

removed_pwb = removed_pwb.promote_to_top_position()
assert removed_pwb.position == 0
queued_pwb.refresh_from_db()
assert queued_pwb.position == 1

removed_pwb = removed_pwb.demote_position()
assert removed_pwb.position == 0
queued_pwb.refresh_from_db()
assert queued_pwb.position == 1


def test_pause_and_unpause_queue(unpause_queue):
assert not OperationalStatus.is_queue_paused()
OperationalStatus.pause_queue(user=None)
Expand Down
7 changes: 6 additions & 1 deletion publishing/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from django.conf import settings
from django.contrib.auth.mixins import PermissionRequiredMixin
from django.core.exceptions import ValidationError
from django.db import OperationalError
from django.db.transaction import atomic
from django.http import HttpResponse
from django.shortcuts import redirect
Expand Down Expand Up @@ -101,6 +102,7 @@ def _promote_position(self, pk):
except (
PackagedWorkBasket.DoesNotExist,
PackagedWorkBasketInvalidQueueOperation,
OperationalError,
mattjamc marked this conversation as resolved.
Show resolved Hide resolved
):
# Nothing to do in the case of these exceptions.
pass
Expand All @@ -113,6 +115,7 @@ def _demote_position(self, pk):
except (
PackagedWorkBasket.DoesNotExist,
PackagedWorkBasketInvalidQueueOperation,
OperationalError,
):
# Nothing to do in the case of these exceptions.
pass
Expand All @@ -125,6 +128,7 @@ def _promote_to_top_position(self, pk):
except (
PackagedWorkBasket.DoesNotExist,
PackagedWorkBasketInvalidQueueOperation,
OperationalError,
):
# Nothing to do in the case of these exceptions.
pass
Expand All @@ -142,6 +146,7 @@ def _remove_from_queue(self, pk):
PackagedWorkBasket.DoesNotExist,
PackagedWorkBasketInvalidQueueOperation,
TransitionNotAllowed,
OperationalError,
):
# Nothing to do in the case of these exceptions.
return self.view_url
Expand Down Expand Up @@ -189,7 +194,7 @@ def _process_envelope(self, pk):
packaged_work_basket = PackagedWorkBasket.objects.get(pk=pk)
try:
packaged_work_basket.begin_processing()
except TransitionNotAllowed:
except (TransitionNotAllowed, OperationalError):
# No error page right now, just reshow the list view.
pass

Expand Down
Loading