From e03625172872c27c5a0e1ee3d06fca22b33252ec Mon Sep 17 00:00:00 2001 From: Dale Cannon Date: Wed, 17 Jul 2024 16:53:33 +0100 Subject: [PATCH 1/9] Acquire non-blocking lock on packaged workbasket rows while updating their positions --- publishing/models/packaged_workbasket.py | 18 +++++++++---- publishing/views.py | 32 +++++++++++++++++++----- 2 files changed, 39 insertions(+), 11 deletions(-) diff --git a/publishing/models/packaged_workbasket.py b/publishing/models/packaged_workbasket.py index 6289ecd2c..f5699c230 100644 --- a/publishing/models/packaged_workbasket.py +++ b/publishing/models/packaged_workbasket.py @@ -621,7 +621,9 @@ def pop_top(self) -> "PackagedWorkBasket": "because it is not at position 1.", ) - PackagedWorkBasket.objects.filter(position__gt=0).update( + PackagedWorkBasket.objects.select_for_update(nowait=True).filter( + position__gt=0, + ).update( position=F("position") - 1, ) self.refresh_from_db() @@ -648,7 +650,9 @@ def remove_from_queue(self) -> "PackagedWorkBasket": self.position = 0 self.save() - PackagedWorkBasket.objects.filter(position__gt=current_position).update( + PackagedWorkBasket.objects.select_for_update(nowait=True).filter( + position__gt=current_position, + ).update( position=F("position") - 1, ) self.refresh_from_db() @@ -666,7 +670,7 @@ def promote_to_top_position(self) -> "PackagedWorkBasket": position = self.position - PackagedWorkBasket.objects.filter( + PackagedWorkBasket.objects.select_for_update(nowait=True).filter( Q(position__gte=1) & Q(position__lt=position), ).update(position=F("position") + 1) @@ -684,7 +688,9 @@ def promote_position(self) -> "PackagedWorkBasket": if self.position == 1: return - obj_to_swap = PackagedWorkBasket.objects.get(position=self.position - 1) + obj_to_swap = PackagedWorkBasket.objects.select_for_update(nowait=True).get( + position=self.position - 1, + ) obj_to_swap.position += 1 self.position -= 1 PackagedWorkBasket.objects.bulk_update( @@ -704,7 +710,9 @@ def demote_position(self) -> "PackagedWorkBasket": if self.position == PackagedWorkBasket.objects.max_position(): return - obj_to_swap = PackagedWorkBasket.objects.get(position=self.position + 1) + obj_to_swap = PackagedWorkBasket.objects.select_for_update(nowait=True).get( + position=self.position + 1, + ) obj_to_swap.position -= 1 self.position += 1 PackagedWorkBasket.objects.bulk_update( diff --git a/publishing/views.py b/publishing/views.py index 6d8bef358..dac0e7ec5 100644 --- a/publishing/views.py +++ b/publishing/views.py @@ -3,6 +3,7 @@ from django.conf import settings from django.contrib.auth.mixins import PermissionRequiredMixin from django.core.exceptions import ValidationError +from django.db import OperationalError from django.db.transaction import atomic from django.http import HttpResponse from django.shortcuts import redirect @@ -94,45 +95,60 @@ def _unpause_queue(self, request): OperationalStatus.unpause_queue(user=request.user) return self.view_url + @atomic def _promote_position(self, pk): try: - packaged_work_basket = PackagedWorkBasket.objects.get(pk=pk) + packaged_work_basket = PackagedWorkBasket.objects.select_for_update( + nowait=True, + ).get(pk=pk) packaged_work_basket.promote_position() except ( PackagedWorkBasket.DoesNotExist, PackagedWorkBasketInvalidQueueOperation, + OperationalError, ): # Nothing to do in the case of these exceptions. pass return self.view_url + @atomic def _demote_position(self, pk): try: - packaged_work_basket = PackagedWorkBasket.objects.get(pk=pk) + packaged_work_basket = PackagedWorkBasket.objects.select_for_update( + nowait=True, + ).get(pk=pk) packaged_work_basket.demote_position() except ( PackagedWorkBasket.DoesNotExist, PackagedWorkBasketInvalidQueueOperation, + OperationalError, ): # Nothing to do in the case of these exceptions. pass return self.view_url + @atomic def _promote_to_top_position(self, pk): try: - packaged_work_basket = PackagedWorkBasket.objects.get(pk=pk) + packaged_work_basket = PackagedWorkBasket.objects.select_for_update( + nowait=True, + ).get(pk=pk) packaged_work_basket.promote_to_top_position() except ( PackagedWorkBasket.DoesNotExist, PackagedWorkBasketInvalidQueueOperation, + OperationalError, ): # Nothing to do in the case of these exceptions. pass return self.view_url + @atomic def _remove_from_queue(self, pk): try: - packaged_work_basket = PackagedWorkBasket.objects.get(pk=pk) + packaged_work_basket = PackagedWorkBasket.objects.select_for_update( + nowait=True, + ).get(pk=pk) packaged_work_basket.abandon() return reverse( "workbaskets:workbasket-ui-changes", @@ -142,6 +158,7 @@ def _remove_from_queue(self, pk): PackagedWorkBasket.DoesNotExist, PackagedWorkBasketInvalidQueueOperation, TransitionNotAllowed, + OperationalError, ): # Nothing to do in the case of these exceptions. return self.view_url @@ -184,12 +201,15 @@ def post(self, request, *args, **kwargs): return redirect(reverse("publishing:envelope-queue-ui-list")) + @atomic def _process_envelope(self, pk): if not OperationalStatus.is_queue_paused(): - packaged_work_basket = PackagedWorkBasket.objects.get(pk=pk) + packaged_work_basket = PackagedWorkBasket.objects.select_for_update( + nowait=True, + ).get(pk=pk) try: packaged_work_basket.begin_processing() - except TransitionNotAllowed: + except (TransitionNotAllowed, OperationalError): # No error page right now, just reshow the list view. pass From cccc663bbbcb0d9934a1290dded0cd73cf196581 Mon Sep 17 00:00:00 2001 From: Dale Cannon Date: Wed, 17 Jul 2024 16:54:32 +0100 Subject: [PATCH 2/9] Add double-click prevention attribute to packaging queue buttons --- publishing/jinja2/includes/envelope-queue.jinja | 1 + publishing/jinja2/includes/packaged-workbasket-queue.jinja | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/publishing/jinja2/includes/envelope-queue.jinja b/publishing/jinja2/includes/envelope-queue.jinja index 991688258..b4409068f 100644 --- a/publishing/jinja2/includes/envelope-queue.jinja +++ b/publishing/jinja2/includes/envelope-queue.jinja @@ -30,6 +30,7 @@ class="govuk-link fake-link process-envelope" name="process_envelope" value="{{ obj.pk }}" + data-prevent-double-click="true" > Start processing diff --git a/publishing/jinja2/includes/packaged-workbasket-queue.jinja b/publishing/jinja2/includes/packaged-workbasket-queue.jinja index 8f28a2244..b9ea93da8 100644 --- a/publishing/jinja2/includes/packaged-workbasket-queue.jinja +++ b/publishing/jinja2/includes/packaged-workbasket-queue.jinja @@ -6,6 +6,7 @@ class="govuk-link fake-link" name="promote_position" value="{{ obj.pk }}" + data-prevent-double-click="true" > Move down @@ -115,6 +118,7 @@ class="govuk-link fake-link" name="promote_to_top_position" value="{{ obj.pk }}" + data-prevent-double-click="true" > Move to top @@ -128,6 +132,7 @@ class="govuk-link fake-link" name="remove_from_queue" value="{{ obj.pk }}" + data-prevent-double-click="true" > Remove From 82f9719be223396e85f4bab90ba33d2c133103d3 Mon Sep 17 00:00:00 2001 From: Dale Cannon Date: Wed, 17 Jul 2024 16:58:52 +0100 Subject: [PATCH 3/9] Acquire blocking lock on all packaged workbasket rows while aggregating the highest queue position --- publishing/models/packaged_workbasket.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/publishing/models/packaged_workbasket.py b/publishing/models/packaged_workbasket.py index f5699c230..a30303e0c 100644 --- a/publishing/models/packaged_workbasket.py +++ b/publishing/models/packaged_workbasket.py @@ -142,15 +142,19 @@ def create(self, workbasket: WorkBasket, **kwargs): f"{packaged_work_baskets}.", ) - position = ( - PackagedWorkBasket.objects.aggregate( - out=Coalesce( - Max("position"), - Value(0), - ), - )["out"] - + 1 - ) + packaged_workbaskets = PackagedWorkBasket.objects.select_for_update().all() + if packaged_workbaskets: + position = ( + packaged_workbaskets.aggregate( + out=Coalesce( + Max("position"), + Value(0), + ), + )["out"] + + 1 + ) + else: + position = 1 new_obj = super().create(workbasket=workbasket, position=position, **kwargs) From ce5ca591ecc5ecc041f6106a1d319bab9b107526 Mon Sep 17 00:00:00 2001 From: Dale Cannon Date: Fri, 19 Jul 2024 12:00:39 +0100 Subject: [PATCH 4/9] Check that the packaged workbasket is still a queue member before updating queue positions --- publishing/models/packaged_workbasket.py | 11 +++++---- publishing/tests/test_models.py | 31 ++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/publishing/models/packaged_workbasket.py b/publishing/models/packaged_workbasket.py index a30303e0c..9410d0b16 100644 --- a/publishing/models/packaged_workbasket.py +++ b/publishing/models/packaged_workbasket.py @@ -669,7 +669,7 @@ def promote_to_top_position(self) -> "PackagedWorkBasket": """Promote the instance to the top position of the package processing queue so that it occupies position 1.""" - if self.position == 1: + if self.position <= 1: return self position = self.position @@ -680,6 +680,7 @@ def promote_to_top_position(self) -> "PackagedWorkBasket": self.position = 1 self.save() + self.refresh_from_db() return self @@ -689,8 +690,8 @@ def promote_position(self) -> "PackagedWorkBasket": """Promote the instance by one position up the package processing queue.""" - if self.position == 1: - return + if self.position <= 1: + return self obj_to_swap = PackagedWorkBasket.objects.select_for_update(nowait=True).get( position=self.position - 1, @@ -711,8 +712,8 @@ def demote_position(self) -> "PackagedWorkBasket": """Demote the instance by one position down the package processing queue.""" - if self.position == PackagedWorkBasket.objects.max_position(): - return + if self.position in {0, PackagedWorkBasket.objects.max_position()}: + return self obj_to_swap = PackagedWorkBasket.objects.select_for_update(nowait=True).get( position=self.position + 1, diff --git a/publishing/tests/test_models.py b/publishing/tests/test_models.py index 5db6427fa..dd7371d6f 100644 --- a/publishing/tests/test_models.py +++ b/publishing/tests/test_models.py @@ -333,6 +333,37 @@ def test_demote_position(): assert initially_second.position == 1 +def test_cannot_promote_or_demote_removed_packaged_workbasket(): + """Tests that packaged workbasket positions remain unchanged after + attempting to reposition a packaged workbasket that has since been removed + from the queue.""" + with patch( + "publishing.tasks.create_xml_envelope_file.apply_async", + return_value=MagicMock(id=factory.Faker("uuid4")), + ): + factories.PackagedWorkBasketFactory() + + with patch( + "publishing.tasks.create_xml_envelope_file.apply_async", + return_value=MagicMock(id=factory.Faker("uuid4")), + ): + factories.PackagedWorkBasketFactory() + + queued_pwb = PackagedWorkBasket.objects.get(position=1) + removed_pwb = PackagedWorkBasket.objects.get(position=2) + removed_pwb.abandon() + + removed_pwb = removed_pwb.promote_to_top_position() + assert removed_pwb.position == 0 + queued_pwb.refresh_from_db() + assert queued_pwb.position == 1 + + removed_pwb = removed_pwb.demote_position() + assert removed_pwb.position == 0 + queued_pwb.refresh_from_db() + assert queued_pwb.position == 1 + + def test_pause_and_unpause_queue(unpause_queue): assert not OperationalStatus.is_queue_paused() OperationalStatus.pause_queue(user=None) From 639282e9d62218d253fef00d2b9b39bb083aa5c2 Mon Sep 17 00:00:00 2001 From: Dale Cannon Date: Wed, 24 Jul 2024 12:54:56 +0100 Subject: [PATCH 5/9] Bump pre-commit hook version --- .pre-commit-config.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d8d39eb78..ae284714a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -4,7 +4,7 @@ repos: hooks: - id: add-trailing-comma - repo: https://github.com/myint/autoflake.git - rev: v2.2.1 + rev: v2.3.1 hooks: - id: autoflake args: [ @@ -13,7 +13,7 @@ repos: "--remove-unused-variable", ] - repo: https://github.com/pycqa/isort - rev: 5.12.0 + rev: 5.13.2 hooks: - id: isort args: [--force-single-line-imports] @@ -28,7 +28,7 @@ repos: "--pre-summary-newline", ] - repo: https://github.com/psf/black - rev: 24.3.0 + rev: 24.4.2 hooks: - id: black - repo: https://github.com/ikamensh/flynt/ @@ -36,7 +36,7 @@ repos: hooks: - id: flynt - repo: https://github.com/uktrade/pii-secret-check-hooks - rev: 0.0.0.35 + rev: 0.0.0.36 hooks: - id: pii_secret_filename files: '' From 420a197ee00929ccd39f0dad5818983840b43cfb Mon Sep 17 00:00:00 2001 From: Dale Cannon Date: Wed, 24 Jul 2024 12:56:30 +0100 Subject: [PATCH 6/9] Apply given lock mode when locking a table --- common/util.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/common/util.py b/common/util.py index d7db72e75..1c559638f 100644 --- a/common/util.py +++ b/common/util.py @@ -23,6 +23,7 @@ import magic import wrapt from defusedxml.common import DTDForbidden +from django.apps import apps from django.conf import settings from django.db import transaction from django.db.models import F @@ -444,7 +445,11 @@ def wrapper(wrapped, instance, args, kwargs): with atomic(): with transaction.get_connection().cursor() as cursor: for model in models: - cursor.execute(f"LOCK TABLE {model._meta.db_table}") + if isinstance(model, str): + model = apps.get_model(model) + cursor.execute( + f"LOCK TABLE {model._meta.db_table} in {lock} MODE", + ) return wrapped(*args, **kwargs) From f43e51045db49fd34f57c39d10015448a4aa34a9 Mon Sep 17 00:00:00 2001 From: Dale Cannon Date: Wed, 24 Jul 2024 13:07:28 +0100 Subject: [PATCH 7/9] Use TableLock decorator on PackagedWorkBasketManager.create() --- publishing/models/packaged_workbasket.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/publishing/models/packaged_workbasket.py b/publishing/models/packaged_workbasket.py index 9410d0b16..de96f43f0 100644 --- a/publishing/models/packaged_workbasket.py +++ b/publishing/models/packaged_workbasket.py @@ -24,6 +24,7 @@ from django_fsm import transition from common.models.mixins import TimestampedMixin +from common.util import TableLock from notifications.models import EnvelopeAcceptedNotification from notifications.models import EnvelopeReadyForProcessingNotification from notifications.models import EnvelopeRejectedNotification @@ -124,6 +125,7 @@ class PackagedWorkBasketInvalidQueueOperation(Exception): class PackagedWorkBasketManager(Manager): @atomic + @TableLock.acquire_lock("publishing.PackagedWorkBasket", lock="EXCLUSIVE") def create(self, workbasket: WorkBasket, **kwargs): """Create a new instance, associating with workbasket.""" if workbasket.status in WorkflowStatus.unchecked_statuses(): @@ -142,19 +144,15 @@ def create(self, workbasket: WorkBasket, **kwargs): f"{packaged_work_baskets}.", ) - packaged_workbaskets = PackagedWorkBasket.objects.select_for_update().all() - if packaged_workbaskets: - position = ( - packaged_workbaskets.aggregate( - out=Coalesce( - Max("position"), - Value(0), - ), - )["out"] - + 1 - ) - else: - position = 1 + position = ( + PackagedWorkBasket.objects.aggregate( + out=Coalesce( + Max("position"), + Value(0), + ), + )["out"] + + 1 + ) new_obj = super().create(workbasket=workbasket, position=position, **kwargs) From 64e3221a252f8cfa9c8ddfa5f9d851330b83f712 Mon Sep 17 00:00:00 2001 From: Dale Cannon Date: Wed, 24 Jul 2024 13:03:00 +0100 Subject: [PATCH 8/9] Lock rows from inside respective instance methods --- publishing/models/packaged_workbasket.py | 15 ++++++++++++++ publishing/views.py | 25 +++++------------------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/publishing/models/packaged_workbasket.py b/publishing/models/packaged_workbasket.py index de96f43f0..afec4f7c5 100644 --- a/publishing/models/packaged_workbasket.py +++ b/publishing/models/packaged_workbasket.py @@ -439,6 +439,7 @@ def begin_processing_condition_no_instances_currently_processing(self) -> bool: return not PackagedWorkBasket.objects.currently_processing() + @atomic @pop_top_after @save_after @transition( @@ -468,6 +469,7 @@ def begin_processing(self): multiple instances it's necessary for this method to perform a save() operation upon successful transitions. """ + PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk) self.processing_started_at = datetime.now() self.save() @@ -642,6 +644,10 @@ def remove_from_queue(self) -> "PackagedWorkBasket": Management of the queued instance's `processing_state` is not altered by this function and should be managed separately by the caller. """ + + PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk) + self.refresh_from_db() + if self.position == 0: raise PackagedWorkBasketInvalidQueueOperation( "Unable to remove instance with a position value of 0 from " @@ -667,6 +673,9 @@ def promote_to_top_position(self) -> "PackagedWorkBasket": """Promote the instance to the top position of the package processing queue so that it occupies position 1.""" + PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk) + self.refresh_from_db() + if self.position <= 1: return self @@ -688,6 +697,9 @@ def promote_position(self) -> "PackagedWorkBasket": """Promote the instance by one position up the package processing queue.""" + PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk) + self.refresh_from_db() + if self.position <= 1: return self @@ -710,6 +722,9 @@ def demote_position(self) -> "PackagedWorkBasket": """Demote the instance by one position down the package processing queue.""" + PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk) + self.refresh_from_db() + if self.position in {0, PackagedWorkBasket.objects.max_position()}: return self diff --git a/publishing/views.py b/publishing/views.py index dac0e7ec5..d8b590246 100644 --- a/publishing/views.py +++ b/publishing/views.py @@ -95,12 +95,9 @@ def _unpause_queue(self, request): OperationalStatus.unpause_queue(user=request.user) return self.view_url - @atomic def _promote_position(self, pk): try: - packaged_work_basket = PackagedWorkBasket.objects.select_for_update( - nowait=True, - ).get(pk=pk) + packaged_work_basket = PackagedWorkBasket.objects.get(pk=pk) packaged_work_basket.promote_position() except ( PackagedWorkBasket.DoesNotExist, @@ -111,12 +108,9 @@ def _promote_position(self, pk): pass return self.view_url - @atomic def _demote_position(self, pk): try: - packaged_work_basket = PackagedWorkBasket.objects.select_for_update( - nowait=True, - ).get(pk=pk) + packaged_work_basket = PackagedWorkBasket.objects.get(pk=pk) packaged_work_basket.demote_position() except ( PackagedWorkBasket.DoesNotExist, @@ -127,12 +121,9 @@ def _demote_position(self, pk): pass return self.view_url - @atomic def _promote_to_top_position(self, pk): try: - packaged_work_basket = PackagedWorkBasket.objects.select_for_update( - nowait=True, - ).get(pk=pk) + packaged_work_basket = PackagedWorkBasket.objects.get(pk=pk) packaged_work_basket.promote_to_top_position() except ( PackagedWorkBasket.DoesNotExist, @@ -143,12 +134,9 @@ def _promote_to_top_position(self, pk): pass return self.view_url - @atomic def _remove_from_queue(self, pk): try: - packaged_work_basket = PackagedWorkBasket.objects.select_for_update( - nowait=True, - ).get(pk=pk) + packaged_work_basket = PackagedWorkBasket.objects.get(pk=pk) packaged_work_basket.abandon() return reverse( "workbaskets:workbasket-ui-changes", @@ -201,12 +189,9 @@ def post(self, request, *args, **kwargs): return redirect(reverse("publishing:envelope-queue-ui-list")) - @atomic def _process_envelope(self, pk): if not OperationalStatus.is_queue_paused(): - packaged_work_basket = PackagedWorkBasket.objects.select_for_update( - nowait=True, - ).get(pk=pk) + packaged_work_basket = PackagedWorkBasket.objects.get(pk=pk) try: packaged_work_basket.begin_processing() except (TransitionNotAllowed, OperationalError): From 70cce1a0db64af48b5ff8ac2a2011599e1aa64a6 Mon Sep 17 00:00:00 2001 From: Dale Cannon Date: Thu, 25 Jul 2024 11:15:44 +0100 Subject: [PATCH 9/9] Capitalise SQL keyword for consistency --- common/util.py | 2 +- publishing/models/packaged_workbasket.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/common/util.py b/common/util.py index 5057f28ae..7f06fb9c1 100644 --- a/common/util.py +++ b/common/util.py @@ -439,7 +439,7 @@ def wrapper(wrapped, instance, args, kwargs): if isinstance(model, str): model = apps.get_model(model) cursor.execute( - f"LOCK TABLE {model._meta.db_table} in {lock} MODE", + f"LOCK TABLE {model._meta.db_table} IN {lock} MODE", ) return wrapped(*args, **kwargs) diff --git a/publishing/models/packaged_workbasket.py b/publishing/models/packaged_workbasket.py index afec4f7c5..67ff040d3 100644 --- a/publishing/models/packaged_workbasket.py +++ b/publishing/models/packaged_workbasket.py @@ -125,7 +125,7 @@ class PackagedWorkBasketInvalidQueueOperation(Exception): class PackagedWorkBasketManager(Manager): @atomic - @TableLock.acquire_lock("publishing.PackagedWorkBasket", lock="EXCLUSIVE") + @TableLock.acquire_lock("publishing.PackagedWorkBasket", lock=TableLock.EXCLUSIVE) def create(self, workbasket: WorkBasket, **kwargs): """Create a new instance, associating with workbasket.""" if workbasket.status in WorkflowStatus.unchecked_statuses():