Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
shopfloor_base: improve lock for update action
Browse files Browse the repository at this point in the history
Add the skip locked option.
TDu committed Nov 8, 2023
1 parent 622d496 commit d855784
Showing 3 changed files with 51 additions and 4 deletions.
25 changes: 21 additions & 4 deletions shopfloor_base/actions/lock.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright 2022 Michael Tietz (MT Software) <mtietz@mt-software.de>
# Copyright 2023 Camptocamp SA (http://www.camptocamp.com)
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html).
import hashlib
import struct
@@ -26,7 +27,23 @@ def advisory(self, name):
self.env.cr.execute("SELECT pg_advisory_xact_lock(%s);", (int_lock,))
self.env.cr.fetchone()[0]

def for_update(self, records, log_exceptions=False):
"""Lock a table FOR UPDATE"""
sql = "SELECT id FROM %s WHERE ID IN %%s FOR UPDATE" % records._table
self.env.cr.execute(sql, (tuple(records.ids),), log_exceptions=False)
def for_update(self, records, log_exceptions=False, skip_locked=False):
"""Lock rows for update on a specific table.
This function will try to obtain a lock on the rows (records parameter) and
wait until they are available for update.
Using the SKIP LOCKED parameter (better used with only one record), it will
not wait for the row to be available but return False if the lock could not
be obtained.
"""
query = "SELECT id FROM %s WHERE ID IN %%s FOR UPDATE"
if skip_locked:
query += " SKIP LOCKED"
sql = query % records._table
self.env.cr.execute(sql, (tuple(records.ids),), log_exceptions=log_exceptions)
if skip_locked:
rows = self.env.cr.fetchall()
return len(rows) == len(records)
return True
1 change: 1 addition & 0 deletions shopfloor_base/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from . import test_actions_data
from . import test_actions_lock
from . import test_menu_service
from . import test_profile_service
from . import test_scan_anything_service
29 changes: 29 additions & 0 deletions shopfloor_base/tests/test_actions_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2023 Camptocamp SA (http://www.camptocamp.com)
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html).
from contextlib import closing

from .common import CommonCase


class ActionsLockCase(CommonCase):
@classmethod
def setUpClassBaseData(cls):
super().setUpClassBaseData()
cls.partner = cls.env.ref("base.res_partner_12")
with cls.work_on_actions(cls) as work:
cls.lock = work.component(usage="lock")

def test_select_for_update_skip_locked_ok(self):
"""Check the lock is obtained and True is returned."""
result = self.lock.for_update(self.partner, skip_locked=True)
self.assertTrue(result)

def test_select_for_update_skip_locked_not_ok(self):
"""Check the lock is NOT obtained and False is returned."""
with closing(self.registry.cursor()) as cr:
# Simulate another user locked a row
cr.execute(
"SELECT id FROM res_partner WHERE id=%s FOR UPDATE", (self.partner.id,)
)
result = self.lock.for_update(self.partner, skip_locked=True)
self.assertFalse(result)

0 comments on commit d855784

Please sign in to comment.