From f0bae5480f1818c2b71d9c44c8b2a7cf36152704 Mon Sep 17 00:00:00 2001 From: Jacob Rief - Django-Migration Date: Tue, 1 Dec 2020 16:42:19 +0100 Subject: [PATCH 1/9] work in progress on dblocks --- post_office/dblock.py | 120 +++++++++++++++ post_office/lockfile.py | 151 ------------------- post_office/migrations/0012_dbmutex.py | 23 +++ post_office/models.py | 20 +++ post_office/test_settings.py | 4 + post_office/tests/test_dblock.py | 201 +++++++++++++++++++++++++ post_office/tests/test_lockfile.py | 75 --------- 7 files changed, 368 insertions(+), 226 deletions(-) create mode 100644 post_office/dblock.py delete mode 100644 post_office/lockfile.py create mode 100644 post_office/migrations/0012_dbmutex.py create mode 100644 post_office/tests/test_dblock.py delete mode 100644 post_office/tests/test_lockfile.py diff --git a/post_office/dblock.py b/post_office/dblock.py new file mode 100644 index 00000000..ca59d920 --- /dev/null +++ b/post_office/dblock.py @@ -0,0 +1,120 @@ +import atexit + +from datetime import timedelta +import signal +import time +from uuid import uuid4 + +from django.core.exceptions import ImproperlyConfigured +from django.db import IntegrityError, DatabaseError, transaction +from django.utils.timezone import now + +from post_office.models import DBMutex + + +class LockedException(Exception): + pass + + +class TimeoutException(Exception): + pass + + +class db_lock: + """ + An entity that can lock a named resource and release it through database locking. + """ + GRANULARITY = timedelta(milliseconds=100) + locked_by = uuid4() + + def __init__(self, lock_id, timeout=timedelta(minutes=1), wait=False): + self.lock_id = lock_id[:50] + if not isinstance(timeout, timedelta): + raise ValueError("DB lock timeout must be of type timedelta.") + if timeout.total_seconds() < 1: + raise ImproperlyConfigured("DB lock timeout must be at least one second.") + self.timeout = timeout + self.wait = wait + self._mutex = None + + def __call__(self, func): + return self._decorate(func) + + def __enter__(self): + self.acquire() + return self + + def __exit__(self, type, value, traceback): + self.release() + + @property + def remaining_time(self): + """ + @:return: The remaining time until this lock expires. + """ + return self._mutex.expires_at - now() + + def acquire(self): + """ + Aquires a mutual exclusive lock in the database. + """ + def stop_on_alarm(*args): + raise TimeoutException() + + signal.signal(signal.SIGALRM, stop_on_alarm) + if self.wait: + # the following call may block, until lock is released by another process + mutex = DBMutex.objects.select_for_update().filter(lock_id=self.lock_id, expires_at__gt=now()).first() + print(f"1 in transaction: {mutex}") + while mutex: + remaining_time = mutex.expires_at - now() + print(f'Remain: {remaining_time}') + time.sleep(remaining_time.total_seconds() if remaining_time < self.GRANULARITY else self.GRANULARITY.total_seconds()) + try: + mutex.refresh_from_db() + except DBMutex.DoesNotExist: + mutex = None + print(f"2 in transaction: {mutex}") + self._mutex = DBMutex.objects.create( + lock_id=self.lock_id, locked_by=self.locked_by, expires_at=now() + self.timeout) + else: + try: + print(f"Creating mutex: ") + self._mutex = DBMutex.objects.create( + lock_id=self.lock_id, locked_by=self.locked_by, expires_at=now() + self.timeout) + print(self._mutex) + except IntegrityError: + raise LockedException("DB mutex for {} is locked.".format(self.lock_id)) + + # install a timeout handler, in case the lock expires without being released + signal.setitimer(signal.ITIMER_REAL, (self._mutex.expires_at - now()).total_seconds()) + + def release(self): + """ + Release a lock previously acquired. + """ + if self._mutex: + signal.setitimer(signal.ITIMER_REAL, 0) + print(f"Releasing: {self._mutex}") + self._mutex.delete() + self._mutex = None + + def _decorate(self, func): + def wrapper(*args, **kwargs): + with self: + result = func(*args, **kwargs) + return result + return wrapper + + @classmethod + def _release_all_locks(cls): + """ + Release all locks assigned to the running instance. + """ + try: + DBMutex.objects.filter(locked_by=cls.locked_by).delete() + except DatabaseError: + pass + + +atexit.register(db_lock._release_all_locks) diff --git a/post_office/lockfile.py b/post_office/lockfile.py deleted file mode 100644 index 4cf71318..00000000 --- a/post_office/lockfile.py +++ /dev/null @@ -1,151 +0,0 @@ -# This module is taken from https://gist.github.com/ionrock/3015700 - -# A file lock implementation that tries to avoid platform specific -# issues. It is inspired by a whole bunch of different implementations -# listed below. - -# - https://bitbucket.org/jaraco/yg.lockfile/src/6c448dcbf6e5/yg/lockfile/__init__.py -# - http://svn.zope.org/zc.lockfile/trunk/src/zc/lockfile/__init__.py?rev=121133&view=markup -# - http://stackoverflow.com/questions/489861/locking-a-file-in-python -# - http://www.evanfosmark.com/2009/01/cross-platform-file-locking-support-in-python/ -# - http://packages.python.org/lockfile/lockfile.html - -# There are some tests below and a blog posting conceptually the -# problems I wanted to try and solve. The tests reflect these ideas. - -# - http://ionrock.wordpress.com/2012/06/28/file-locking-in-python/ - -# I'm not advocating using this package. But if you do happen to try it -# out and have suggestions please let me know. - -import os -import time -import platform - - -class FileLocked(Exception): - pass - - -class FileLock: - - def __init__(self, lock_filename, timeout=None, force=False): - self.lock_filename = '%s.lock' % lock_filename - self.timeout = timeout - self.force = force - self._pid = str(os.getpid()) - # Store pid in a file in the same directory as desired lockname - self.pid_filename = os.path.join( - os.path.dirname(self.lock_filename), - self._pid, - ) + '.lock' - - def get_lock_pid(self): - try: - return int(open(self.lock_filename).read()) - except IOError: - # If we can't read symbolic link, there are two possibilities: - # 1. The symbolic link is dead (point to non existing file) - # 2. Symbolic link is not there - # In either case, we can safely release the lock - self.release() - except ValueError: - # most likely an empty or otherwise invalid lock file - self.release() - - def valid_lock(self): - """ - See if the lock exists and is left over from an old process. - """ - - lock_pid = self.get_lock_pid() - - # If we're unable to get lock_pid - if lock_pid is None: - return False - - # this is our process - if self._pid == lock_pid: - return True - - # it is/was another process - # see if it is running - try: - os.kill(lock_pid, 0) - except OSError: - self.release() - return False - - # it is running - return True - - def is_locked(self, force=False): - # We aren't locked - if not self.valid_lock(): - return False - - # We are locked, but we want to force it without waiting - if not self.timeout: - if self.force: - self.release() - return False - else: - # We're not waiting or forcing the lock - raise FileLocked() - - # Locked, but want to wait for an unlock - interval = .1 - intervals = int(self.timeout / interval) - - while intervals: - if self.valid_lock(): - intervals -= 1 - time.sleep(interval) - #print('stopping %s' % intervals) - else: - return True - - # check one last time - if self.valid_lock(): - if self.force: - self.release() - else: - # still locked :( - raise FileLocked() - - def acquire(self): - """Create a pid filename and create a symlink (the actual lock file) - across platforms that points to it. Symlink is used because it's an - atomic operation across platforms. - """ - - pid_file = os.open(self.pid_filename, os.O_CREAT | os.O_EXCL | os.O_RDWR) - os.write(pid_file, str(os.getpid()).encode('utf-8')) - os.close(pid_file) - - if hasattr(os, 'symlink') and platform.system() != 'Windows': - os.symlink(self.pid_filename, self.lock_filename) - else: - # Windows platforms doesn't support symlinks, at least not through the os API - self.lock_filename = self.pid_filename - - def release(self): - """Try to delete the lock files. Doesn't matter if we fail""" - if self.lock_filename != self.pid_filename: - try: - os.unlink(self.lock_filename) - except OSError: - pass - - try: - os.remove(self.pid_filename) - except OSError: - pass - - def __enter__(self): - if not self.is_locked(): - self.acquire() - return self - - def __exit__(self, type, value, traceback): - self.release() diff --git a/post_office/migrations/0012_dbmutex.py b/post_office/migrations/0012_dbmutex.py new file mode 100644 index 00000000..ba33911d --- /dev/null +++ b/post_office/migrations/0012_dbmutex.py @@ -0,0 +1,23 @@ +# Generated by Django 3.0.10 on 2020-11-28 21:18 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('post_office', '0011_models_help_text'), + ] + + operations = [ + migrations.CreateModel( + name='DBMutex', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('lock_id', models.CharField(max_length=50, unique=True)), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('expires_at', models.DateTimeField()), + ('locked_by', models.UUIDField(db_index=True)), + ], + ), + ] diff --git a/post_office/models.py b/post_office/models.py index 0fccaea8..01f36873 100644 --- a/post_office/models.py +++ b/post_office/models.py @@ -316,3 +316,23 @@ class Meta: def __str__(self): return self.name + + +class DBMutex(models.Model): + lock_id = models.CharField( + max_length=50, + unique=True, + ) + + created_at = models.DateTimeField( + auto_now_add=True, + ) + + expires_at = models.DateTimeField() + + locked_by = models.UUIDField( + db_index=True, + ) + + def __str__(self): + return f"" \ No newline at end of file diff --git a/post_office/test_settings.py b/post_office/test_settings.py index fca94fa3..c2ece944 100644 --- a/post_office/test_settings.py +++ b/post_office/test_settings.py @@ -10,7 +10,11 @@ DATABASES = { 'default': { + 'NAME': 'empty.sqlite3', # os.path.join(BASE_DIR, 'tests/db.sqlite3'), 'ENGINE': 'django.db.backends.sqlite3', + 'TEST': { + 'NAME': os.path.join(BASE_DIR, 'tests/db.sqlite3'), + } }, } diff --git a/post_office/tests/test_dblock.py b/post_office/tests/test_dblock.py new file mode 100644 index 00000000..484799f7 --- /dev/null +++ b/post_office/tests/test_dblock.py @@ -0,0 +1,201 @@ +import time +from datetime import timedelta +from multiprocessing import Process, Value + +from django.db import connection +from django.test import TransactionTestCase + +from post_office.dblock import db_lock, TimeoutException, LockedException +from post_office.models import DBMutex + + +class LockTest(TransactionTestCase): + def setUp(self): + if connection.vendor == 'sqlite': + cursor = connection.cursor() + cursor.execute('PRAGMA journal_mode=WAL;') + cursor.execute('PRAGMA synchronous=FULL;') + + def t_est_lock_expires_by_itself(self): + with self.assertRaises(TimeoutException): + with db_lock('test_dblock', timedelta(seconds=1)) as lock: + self.assertTrue(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) + time.sleep(1.001) # task runs too long + self.assertFalse(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) + + def t_est_aquire_and_release_locks(self): + lock1 = db_lock('test_dblock', timedelta(seconds=1)) + self.assertFalse(DBMutex.objects.filter(locked_by=lock1.locked_by, lock_id=lock1.lock_id).exists()) + lock1.acquire() + self.assertTrue(DBMutex.objects.filter(locked_by=lock1.locked_by, lock_id=lock1.lock_id).exists()) + lock2 = db_lock('test_dblock', timedelta(seconds=1)) + with self.assertRaises(LockedException): + lock2.acquire() + lock1.release() + self.assertFalse(DBMutex.objects.filter(locked_by=lock1.locked_by, lock_id=lock1.lock_id).exists()) + lock2.acquire() + lock3 = db_lock('test_dblock3', timedelta(seconds=60)) + lock3.acquire() + self.assertTrue(DBMutex.objects.filter(locked_by=lock3.locked_by, lock_id=lock3.lock_id).exists()) + self.assertTrue(DBMutex.objects.filter(locked_by=lock2.locked_by, lock_id=lock2.lock_id).exists()) + lock2.release() + self.assertTrue(DBMutex.objects.filter(locked_by=db_lock.locked_by).exists()) + lock3.release() + self.assertFalse(DBMutex.objects.filter(locked_by=db_lock.locked_by).exists()) + + def t_est_lock_using_decorator(self): + @db_lock('test_dblock', timedelta(seconds=1)) + def func(sleep_time): + time.sleep(sleep_time) + return 'some result' + + self.assertEqual(func(0.2), 'some result') + with self.assertRaises(TimeoutException): + func(2.0) + + def t_est_refuse_to_lock_from_concurrent_task(self): + def concurrent_task(): + print(f"Locking concurrent {time.monotonic() - time_stamp}") + with self.assertRaises(LockedException): + with db_lock('test_dblock', timedelta(seconds=1)): + pass + + time_stamp = time.monotonic() + proc = Process(target=concurrent_task) + proc.start() + print(f"Locking main {time.monotonic() - time_stamp}") + with db_lock('test_dblock', timedelta(seconds=1)) as lock: + print(f"Locked main {time.monotonic() - time_stamp}") + self.assertTrue(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) + time.sleep(0.5) + print(f"Unlocked main {time.monotonic() - time_stamp}") + proc.join() + + def t_est_wait_for_lock_in_concurrent_task(self): + def concurrent_task(mutex_id): + print(f"Locking concurrent {time.monotonic() - time_stamp}") + with db_lock('test_dblock', timedelta(seconds=1), wait=True) as lock: + print(f"Locked concurrent {time.monotonic() - time_stamp}") + self.assertGreater(time.monotonic() - time_stamp, 0.5) + self.assertLess(time.monotonic() - time_stamp, 1.5) + mutex_id.value = DBMutex.objects.get(locked_by=lock.locked_by, lock_id=lock.lock_id).id + + print("===================================") + time_stamp = time.monotonic() + mutex_id = Value('i', 0) + proc = Process(target=concurrent_task, args=(mutex_id,)) + proc.start() + print(f"Locking main {time.monotonic() - time_stamp}") + with db_lock('test_dblock', timedelta(seconds=1)) as lock: + print(f"Locked main {time.monotonic() - time_stamp}") + time.sleep(0.5) + main_mutex_id = DBMutex.objects.get(locked_by=lock.locked_by, lock_id=lock.lock_id).id + self.assertFalse(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) + print(f"Unlocked main {time.monotonic() - time_stamp}") + print(f"Before joining: {mutex_id.value}") + proc.join() + print(f"After joining: {mutex_id.value}") + self.assertNotEqual(mutex_id.value, main_mutex_id) + + def test_lock_timeout_in_concurrent_task(self): + def concurrent_task(): + print(f"Locking concurrent {time.monotonic() - time_stamp}") + with self.assertRaises(TimeoutException): + with db_lock('test_dblock', timedelta(seconds=1)): + print(f"Locked concurrent {time.monotonic() - time_stamp}") + time.sleep(2) + self.assertGreater(time.monotonic() - time_stamp, 1) + self.assertLess(time.monotonic() - time_stamp, 2) + print(f"Unlocked concurrent {time.monotonic() - time_stamp}") + + print("===================================") + time_stamp = time.monotonic() + proc = Process(target=concurrent_task) + proc.start() + time.sleep(0.5) + print(f"Running main {time.monotonic() - time_stamp}") + with self.assertRaises(LockedException): + db_lock('test_dblock', timedelta(seconds=1)).acquire() + print(f"Locked main {time.monotonic() - time_stamp}") + time.sleep(1) + with db_lock('test_dblock', timedelta(seconds=1)) as lock: + print(f"Locked main {time.monotonic() - time_stamp}") + self.assertTrue(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) + self.assertFalse(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) + print(f"Unlocked main {time.monotonic() - time_stamp}") + proc.join() + + # def t_est_allow_to_lock_again(self): + # with db_lock('test_dblock', timedelta(seconds=1)) as lock: + # self.assertTrue(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) + # time.sleep(0.25) + # self.assertFalse(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) + # with db_lock('test_dblock', timedelta(seconds=1)) as lock: + # self.assertLess(lock.remaining_time, timedelta(seconds=1)) + # self.assertGreater(lock.remaining_time, timedelta(milliseconds=900)) + # self.assertTrue(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) + # self.assertFalse(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) + # + # def t_est_locks_not_interfering(self): + # with db_lock('test_dblock1', timedelta(seconds=1)) as lock1: + # self.assertTrue(DBMutex.objects.filter(locked_by=lock1.locked_by, lock_id=lock1.lock_id).exists()) + # with db_lock('test_dblock2', timedelta(seconds=1)) as lock2: + # self.assertTrue(DBMutex.objects.filter(locked_by=lock2.locked_by, lock_id=lock2.lock_id).exists()) + # self.assertFalse(DBMutex.objects.filter(locked_by=lock1.locked_by, lock_id=lock1.lock_id).exists()) + # self.assertFalse(DBMutex.objects.filter(locked_by=lock2.locked_by, lock_id=lock2.lock_id).exists()) + + # def test_process_killed_force_unlock(self): + # pid = os.getpid() + # lockfile = '%s.lock' % pid + # setup_fake_lock('test.lock') + # + # with open(lockfile, 'w+') as f: + # f.write('9999999') + # assert os.path.exists(lockfile) + # with FileLock('test'): + # assert True + # + # def test_force_unlock_in_same_process(self): + # pid = os.getpid() + # lockfile = '%s.lock' % pid + # os.symlink(lockfile, 'test.lock') + # + # with open(lockfile, 'w+') as f: + # f.write(str(os.getpid())) + # + # with FileLock('test', force=True): + # assert True + # + # def test_exception_after_timeout(self): + # pid = os.getpid() + # lockfile = '%s.lock' % pid + # setup_fake_lock('test.lock') + # + # with open(lockfile, 'w+') as f: + # f.write(str(os.getpid())) + # + # try: + # with FileLock('test', timeout=1): + # assert False + # except FileLocked: + # assert True + # + # def test_force_after_timeout(self): + # pid = os.getpid() + # lockfile = '%s.lock' % pid + # setup_fake_lock('test.lock') + # + # with open(lockfile, 'w+') as f: + # f.write(str(os.getpid())) + # + # timeout = 1 + # start = time.time() + # with FileLock('test', timeout=timeout, force=True): + # assert True + # end = time.time() + # assert end - start > timeout + # + # def test_get_lock_pid(self): + # """Ensure get_lock_pid() works properly""" + # with FileLock('test', timeout=1, force=True) as lock: + # self.assertEqual(lock.get_lock_pid(), int(os.getpid())) diff --git a/post_office/tests/test_lockfile.py b/post_office/tests/test_lockfile.py deleted file mode 100644 index 852c2361..00000000 --- a/post_office/tests/test_lockfile.py +++ /dev/null @@ -1,75 +0,0 @@ -import time -import os - -from django.test import TestCase - -from ..lockfile import FileLock, FileLocked - - -def setup_fake_lock(lock_file_name): - pid = os.getpid() - lockfile = '%s.lock' % pid - try: - os.remove(lock_file_name) - except OSError: - pass - os.symlink(lockfile, lock_file_name) - - -class LockTest(TestCase): - - def test_process_killed_force_unlock(self): - pid = os.getpid() - lockfile = '%s.lock' % pid - setup_fake_lock('test.lock') - - with open(lockfile, 'w+') as f: - f.write('9999999') - assert os.path.exists(lockfile) - with FileLock('test'): - assert True - - def test_force_unlock_in_same_process(self): - pid = os.getpid() - lockfile = '%s.lock' % pid - os.symlink(lockfile, 'test.lock') - - with open(lockfile, 'w+') as f: - f.write(str(os.getpid())) - - with FileLock('test', force=True): - assert True - - def test_exception_after_timeout(self): - pid = os.getpid() - lockfile = '%s.lock' % pid - setup_fake_lock('test.lock') - - with open(lockfile, 'w+') as f: - f.write(str(os.getpid())) - - try: - with FileLock('test', timeout=1): - assert False - except FileLocked: - assert True - - def test_force_after_timeout(self): - pid = os.getpid() - lockfile = '%s.lock' % pid - setup_fake_lock('test.lock') - - with open(lockfile, 'w+') as f: - f.write(str(os.getpid())) - - timeout = 1 - start = time.time() - with FileLock('test', timeout=timeout, force=True): - assert True - end = time.time() - assert end - start > timeout - - def test_get_lock_pid(self): - """Ensure get_lock_pid() works properly""" - with FileLock('test', timeout=1, force=True) as lock: - self.assertEqual(lock.get_lock_pid(), int(os.getpid())) From 3611b86c9646ebc681187960daa5c75cffa64975 Mon Sep 17 00:00:00 2001 From: Jacob Rief - Django-Migration Date: Tue, 1 Dec 2020 17:11:44 +0100 Subject: [PATCH 2/9] cleanup the code --- post_office/dblock.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/post_office/dblock.py b/post_office/dblock.py index ca59d920..8604c170 100644 --- a/post_office/dblock.py +++ b/post_office/dblock.py @@ -1,12 +1,11 @@ import atexit - from datetime import timedelta import signal import time from uuid import uuid4 from django.core.exceptions import ImproperlyConfigured -from django.db import IntegrityError, DatabaseError, transaction +from django.db import IntegrityError, DatabaseError from django.utils.timezone import now from post_office.models import DBMutex @@ -64,29 +63,25 @@ def stop_on_alarm(*args): signal.signal(signal.SIGALRM, stop_on_alarm) if self.wait: # the following call may block, until lock is released by another process + granularity = self.GRANULARITY.total_seconds() mutex = DBMutex.objects.select_for_update().filter(lock_id=self.lock_id, expires_at__gt=now()).first() - print(f"1 in transaction: {mutex}") while mutex: - remaining_time = mutex.expires_at - now() - print(f'Remain: {remaining_time}') - time.sleep(remaining_time.total_seconds() if remaining_time < self.GRANULARITY else self.GRANULARITY.total_seconds()) + remaining = mutex.expires_at - now() + time.sleep(remaining.total_seconds() if remaining < self.GRANULARITY else granularity) try: mutex.refresh_from_db() except DBMutex.DoesNotExist: mutex = None - print(f"2 in transaction: {mutex}") self._mutex = DBMutex.objects.create( lock_id=self.lock_id, locked_by=self.locked_by, expires_at=now() + self.timeout) else: try: - print(f"Creating mutex: ") self._mutex = DBMutex.objects.create( lock_id=self.lock_id, locked_by=self.locked_by, expires_at=now() + self.timeout) - print(self._mutex) except IntegrityError: raise LockedException("DB mutex for {} is locked.".format(self.lock_id)) - # install a timeout handler, in case the lock expires without being released + # install a timeout handler, in case the lock expires before being released signal.setitimer(signal.ITIMER_REAL, (self._mutex.expires_at - now()).total_seconds()) def release(self): @@ -95,7 +90,6 @@ def release(self): """ if self._mutex: signal.setitimer(signal.ITIMER_REAL, 0) - print(f"Releasing: {self._mutex}") self._mutex.delete() self._mutex = None From 98a511cfd058ff9902b8772114128a04c2e42384 Mon Sep 17 00:00:00 2001 From: Jacob Rief - Django-Migration Date: Tue, 1 Dec 2020 17:12:28 +0100 Subject: [PATCH 3/9] enable all 6 tests to check locking --- post_office/tests/test_dblock.py | 106 ++++++------------------------- 1 file changed, 18 insertions(+), 88 deletions(-) diff --git a/post_office/tests/test_dblock.py b/post_office/tests/test_dblock.py index 484799f7..8b4fbec9 100644 --- a/post_office/tests/test_dblock.py +++ b/post_office/tests/test_dblock.py @@ -1,6 +1,6 @@ import time from datetime import timedelta -from multiprocessing import Process, Value +from multiprocessing import Process, Queue, Value from django.db import connection from django.test import TransactionTestCase @@ -11,19 +11,19 @@ class LockTest(TransactionTestCase): def setUp(self): - if connection.vendor == 'sqlite': + if connection.vendor == 'XXXsqlite': cursor = connection.cursor() cursor.execute('PRAGMA journal_mode=WAL;') cursor.execute('PRAGMA synchronous=FULL;') - def t_est_lock_expires_by_itself(self): + def test_lock_expires_by_itself(self): with self.assertRaises(TimeoutException): with db_lock('test_dblock', timedelta(seconds=1)) as lock: self.assertTrue(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) time.sleep(1.001) # task runs too long self.assertFalse(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) - def t_est_aquire_and_release_locks(self): + def test_aquire_and_release_locks(self): lock1 = db_lock('test_dblock', timedelta(seconds=1)) self.assertFalse(DBMutex.objects.filter(locked_by=lock1.locked_by, lock_id=lock1.lock_id).exists()) lock1.acquire() @@ -43,7 +43,7 @@ def t_est_aquire_and_release_locks(self): lock3.release() self.assertFalse(DBMutex.objects.filter(locked_by=db_lock.locked_by).exists()) - def t_est_lock_using_decorator(self): + def test_lock_using_decorator(self): @db_lock('test_dblock', timedelta(seconds=1)) def func(sleep_time): time.sleep(sleep_time) @@ -53,7 +53,7 @@ def func(sleep_time): with self.assertRaises(TimeoutException): func(2.0) - def t_est_refuse_to_lock_from_concurrent_task(self): + def test_refuse_to_lock_from_concurrent_task(self): def concurrent_task(): print(f"Locking concurrent {time.monotonic() - time_stamp}") with self.assertRaises(LockedException): @@ -71,7 +71,7 @@ def concurrent_task(): print(f"Unlocked main {time.monotonic() - time_stamp}") proc.join() - def t_est_wait_for_lock_in_concurrent_task(self): + def test_wait_for_lock_in_concurrent_task(self): def concurrent_task(mutex_id): print(f"Locking concurrent {time.monotonic() - time_stamp}") with db_lock('test_dblock', timedelta(seconds=1), wait=True) as lock: @@ -98,104 +98,34 @@ def concurrent_task(mutex_id): self.assertNotEqual(mutex_id.value, main_mutex_id) def test_lock_timeout_in_concurrent_task(self): - def concurrent_task(): + def concurrent_task(queue): print(f"Locking concurrent {time.monotonic() - time_stamp}") with self.assertRaises(TimeoutException): with db_lock('test_dblock', timedelta(seconds=1)): + queue.put('locked') print(f"Locked concurrent {time.monotonic() - time_stamp}") time.sleep(2) self.assertGreater(time.monotonic() - time_stamp, 1) self.assertLess(time.monotonic() - time_stamp, 2) print(f"Unlocked concurrent {time.monotonic() - time_stamp}") + queue.put('unlocked') print("===================================") time_stamp = time.monotonic() - proc = Process(target=concurrent_task) + queue = Queue() + proc = Process(target=concurrent_task, args=(queue,)) proc.start() - time.sleep(0.5) + while queue.get() != 'locked': + time.sleep(0.1) print(f"Running main {time.monotonic() - time_stamp}") with self.assertRaises(LockedException): db_lock('test_dblock', timedelta(seconds=1)).acquire() - print(f"Locked main {time.monotonic() - time_stamp}") - time.sleep(1) + while queue.get() != 'unlocked': + time.sleep(0.1) + print(f"Locking main {time.monotonic() - time_stamp}") with db_lock('test_dblock', timedelta(seconds=1)) as lock: print(f"Locked main {time.monotonic() - time_stamp}") self.assertTrue(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) - self.assertFalse(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) print(f"Unlocked main {time.monotonic() - time_stamp}") proc.join() - - # def t_est_allow_to_lock_again(self): - # with db_lock('test_dblock', timedelta(seconds=1)) as lock: - # self.assertTrue(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) - # time.sleep(0.25) - # self.assertFalse(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) - # with db_lock('test_dblock', timedelta(seconds=1)) as lock: - # self.assertLess(lock.remaining_time, timedelta(seconds=1)) - # self.assertGreater(lock.remaining_time, timedelta(milliseconds=900)) - # self.assertTrue(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) - # self.assertFalse(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) - # - # def t_est_locks_not_interfering(self): - # with db_lock('test_dblock1', timedelta(seconds=1)) as lock1: - # self.assertTrue(DBMutex.objects.filter(locked_by=lock1.locked_by, lock_id=lock1.lock_id).exists()) - # with db_lock('test_dblock2', timedelta(seconds=1)) as lock2: - # self.assertTrue(DBMutex.objects.filter(locked_by=lock2.locked_by, lock_id=lock2.lock_id).exists()) - # self.assertFalse(DBMutex.objects.filter(locked_by=lock1.locked_by, lock_id=lock1.lock_id).exists()) - # self.assertFalse(DBMutex.objects.filter(locked_by=lock2.locked_by, lock_id=lock2.lock_id).exists()) - - # def test_process_killed_force_unlock(self): - # pid = os.getpid() - # lockfile = '%s.lock' % pid - # setup_fake_lock('test.lock') - # - # with open(lockfile, 'w+') as f: - # f.write('9999999') - # assert os.path.exists(lockfile) - # with FileLock('test'): - # assert True - # - # def test_force_unlock_in_same_process(self): - # pid = os.getpid() - # lockfile = '%s.lock' % pid - # os.symlink(lockfile, 'test.lock') - # - # with open(lockfile, 'w+') as f: - # f.write(str(os.getpid())) - # - # with FileLock('test', force=True): - # assert True - # - # def test_exception_after_timeout(self): - # pid = os.getpid() - # lockfile = '%s.lock' % pid - # setup_fake_lock('test.lock') - # - # with open(lockfile, 'w+') as f: - # f.write(str(os.getpid())) - # - # try: - # with FileLock('test', timeout=1): - # assert False - # except FileLocked: - # assert True - # - # def test_force_after_timeout(self): - # pid = os.getpid() - # lockfile = '%s.lock' % pid - # setup_fake_lock('test.lock') - # - # with open(lockfile, 'w+') as f: - # f.write(str(os.getpid())) - # - # timeout = 1 - # start = time.time() - # with FileLock('test', timeout=timeout, force=True): - # assert True - # end = time.time() - # assert end - start > timeout - # - # def test_get_lock_pid(self): - # """Ensure get_lock_pid() works properly""" - # with FileLock('test', timeout=1, force=True) as lock: - # self.assertEqual(lock.get_lock_pid(), int(os.getpid())) + self.assertFalse(DBMutex.objects.filter(locked_by=db_lock.locked_by).exists()) From ddb48f5df8246d13bde70298d7ca1f6f2417600d Mon Sep 17 00:00:00 2001 From: Jacob Rief - Django-Migration Date: Tue, 1 Dec 2020 17:13:48 +0100 Subject: [PATCH 4/9] adopt imports --- post_office/tests/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/post_office/tests/__init__.py b/post_office/tests/__init__.py index f682b8bb..1b826368 100644 --- a/post_office/tests/__init__.py +++ b/post_office/tests/__init__.py @@ -1,6 +1,6 @@ from .test_backends import BackendTest from .test_commands import CommandTest -from .test_lockfile import LockTest +from .test_dblock import LockTest from .test_mail import MailTest from .test_models import ModelTest from .test_utils import UtilsTest From de4a90a656c33b4b6231b9c6ca72d080ee31da03 Mon Sep 17 00:00:00 2001 From: Jacob Rief - Django-Migration Date: Tue, 1 Dec 2020 17:18:41 +0100 Subject: [PATCH 5/9] remove all debug statements --- post_office/tests/test_dblock.py | 29 +---------------------------- 1 file changed, 1 insertion(+), 28 deletions(-) diff --git a/post_office/tests/test_dblock.py b/post_office/tests/test_dblock.py index 8b4fbec9..4185b1a3 100644 --- a/post_office/tests/test_dblock.py +++ b/post_office/tests/test_dblock.py @@ -2,7 +2,6 @@ from datetime import timedelta from multiprocessing import Process, Queue, Value -from django.db import connection from django.test import TransactionTestCase from post_office.dblock import db_lock, TimeoutException, LockedException @@ -10,12 +9,6 @@ class LockTest(TransactionTestCase): - def setUp(self): - if connection.vendor == 'XXXsqlite': - cursor = connection.cursor() - cursor.execute('PRAGMA journal_mode=WAL;') - cursor.execute('PRAGMA synchronous=FULL;') - def test_lock_expires_by_itself(self): with self.assertRaises(TimeoutException): with db_lock('test_dblock', timedelta(seconds=1)) as lock: @@ -55,7 +48,6 @@ def func(sleep_time): def test_refuse_to_lock_from_concurrent_task(self): def concurrent_task(): - print(f"Locking concurrent {time.monotonic() - time_stamp}") with self.assertRaises(LockedException): with db_lock('test_dblock', timedelta(seconds=1)): pass @@ -63,69 +55,50 @@ def concurrent_task(): time_stamp = time.monotonic() proc = Process(target=concurrent_task) proc.start() - print(f"Locking main {time.monotonic() - time_stamp}") with db_lock('test_dblock', timedelta(seconds=1)) as lock: - print(f"Locked main {time.monotonic() - time_stamp}") self.assertTrue(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) time.sleep(0.5) - print(f"Unlocked main {time.monotonic() - time_stamp}") proc.join() def test_wait_for_lock_in_concurrent_task(self): def concurrent_task(mutex_id): - print(f"Locking concurrent {time.monotonic() - time_stamp}") with db_lock('test_dblock', timedelta(seconds=1), wait=True) as lock: - print(f"Locked concurrent {time.monotonic() - time_stamp}") self.assertGreater(time.monotonic() - time_stamp, 0.5) self.assertLess(time.monotonic() - time_stamp, 1.5) mutex_id.value = DBMutex.objects.get(locked_by=lock.locked_by, lock_id=lock.lock_id).id - print("===================================") time_stamp = time.monotonic() mutex_id = Value('i', 0) proc = Process(target=concurrent_task, args=(mutex_id,)) proc.start() - print(f"Locking main {time.monotonic() - time_stamp}") with db_lock('test_dblock', timedelta(seconds=1)) as lock: - print(f"Locked main {time.monotonic() - time_stamp}") time.sleep(0.5) main_mutex_id = DBMutex.objects.get(locked_by=lock.locked_by, lock_id=lock.lock_id).id - self.assertFalse(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) - print(f"Unlocked main {time.monotonic() - time_stamp}") - print(f"Before joining: {mutex_id.value}") proc.join() - print(f"After joining: {mutex_id.value}") self.assertNotEqual(mutex_id.value, main_mutex_id) + self.assertFalse(DBMutex.objects.filter(locked_by=db_lock.locked_by).exists()) def test_lock_timeout_in_concurrent_task(self): def concurrent_task(queue): - print(f"Locking concurrent {time.monotonic() - time_stamp}") with self.assertRaises(TimeoutException): with db_lock('test_dblock', timedelta(seconds=1)): queue.put('locked') - print(f"Locked concurrent {time.monotonic() - time_stamp}") time.sleep(2) self.assertGreater(time.monotonic() - time_stamp, 1) self.assertLess(time.monotonic() - time_stamp, 2) - print(f"Unlocked concurrent {time.monotonic() - time_stamp}") queue.put('unlocked') - print("===================================") time_stamp = time.monotonic() queue = Queue() proc = Process(target=concurrent_task, args=(queue,)) proc.start() while queue.get() != 'locked': time.sleep(0.1) - print(f"Running main {time.monotonic() - time_stamp}") with self.assertRaises(LockedException): db_lock('test_dblock', timedelta(seconds=1)).acquire() while queue.get() != 'unlocked': time.sleep(0.1) - print(f"Locking main {time.monotonic() - time_stamp}") with db_lock('test_dblock', timedelta(seconds=1)) as lock: - print(f"Locked main {time.monotonic() - time_stamp}") self.assertTrue(DBMutex.objects.filter(locked_by=lock.locked_by, lock_id=lock.lock_id).exists()) - print(f"Unlocked main {time.monotonic() - time_stamp}") proc.join() self.assertFalse(DBMutex.objects.filter(locked_by=db_lock.locked_by).exists()) From 2ff7619df26062be1216c37773401b2d6eca5ee6 Mon Sep 17 00:00:00 2001 From: Jacob Rief - Django-Migration Date: Tue, 1 Dec 2020 20:09:37 +0100 Subject: [PATCH 6/9] added docstring to class db_lock --- post_office/dblock.py | 71 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 63 insertions(+), 8 deletions(-) diff --git a/post_office/dblock.py b/post_office/dblock.py index 8604c170..d7e1d9af 100644 --- a/post_office/dblock.py +++ b/post_office/dblock.py @@ -21,7 +21,56 @@ class TimeoutException(Exception): class db_lock: """ - An entity that can lock a named resource and release it through database locking. + An entity that can lock a shared resource and release it through database locking. + Locks can be acquired by different hosts, since the source of truth is handled by the database. + + Usage: + + ``` + # Lock a critical section of code + with db_lock('my_lock', timedelta(seconds=30)): + do_something() + ``` + The section inside the ``with`` statement will run for a maximum of 30 seconds. If that + time elapses before leaving the block, a ``TimeoutException`` is raised. + + If another process attempts to run a critical section of code, using the same resource + identifier while the above block is running, a ``LockedException`` is raised. + + ``` + # Blocking variant of the above + with db_lock('my_lock', wait=True): + do_something() + ``` + By using the parameter ``wait=True``, the critical section of code is only entered after + mutexes from other processes have been released. A ``LockedException`` will not be raised. + + ``` + # Running critical section until it expires + with db_lock('my_lock') as lock: + while lock.remaining_time > timedelta(seconds=1): + do_something() # never requires more than one second + ``` + By querying the remaining time left over for the acquisition of the lock, one can + avoid a ``TimeoutException`` to be raised. + + ``` + # Use a decorator to mark a whole function as critical + @db_lock('my_lock') + def do_something(): + # do somethinge here + ``` + This function may raise a ``LockedException`` or a ``TimeoutException``. + + ``` + # Lock critical section of code explicitly + lock = db_lock('my_lock') + lock.acquire() + do_something() + lock.release() + ``` + A lock can also be acquired and released explicitly. This is error-prone, because it relies + upon releasing the lock. """ GRANULARITY = timedelta(milliseconds=100) locked_by = uuid4() @@ -30,8 +79,8 @@ def __init__(self, lock_id, timeout=timedelta(minutes=1), wait=False): self.lock_id = lock_id[:50] if not isinstance(timeout, timedelta): raise ValueError("DB lock timeout must be of type timedelta.") - if timeout.total_seconds() < 1: - raise ImproperlyConfigured("DB lock timeout must be at least one second.") + if timeout < self.GRANULARITY: + raise ImproperlyConfigured("DB lock timeout must be at least {}.".format(self.GRANULARITY)) self.timeout = timeout self.wait = wait self._mutex = None @@ -61,10 +110,10 @@ def stop_on_alarm(*args): raise TimeoutException() signal.signal(signal.SIGALRM, stop_on_alarm) - if self.wait: + granularity = self.GRANULARITY.total_seconds() + while self.wait: # the following call may block, until lock is released by another process - granularity = self.GRANULARITY.total_seconds() - mutex = DBMutex.objects.select_for_update().filter(lock_id=self.lock_id, expires_at__gt=now()).first() + mutex = DBMutex.objects.filter(lock_id=self.lock_id, expires_at__gt=now()).first() while mutex: remaining = mutex.expires_at - now() time.sleep(remaining.total_seconds() if remaining < self.GRANULARITY else granularity) @@ -72,8 +121,14 @@ def stop_on_alarm(*args): mutex.refresh_from_db() except DBMutex.DoesNotExist: mutex = None - self._mutex = DBMutex.objects.create( - lock_id=self.lock_id, locked_by=self.locked_by, expires_at=now() + self.timeout) + try: + self._mutex = DBMutex.objects.create( + lock_id=self.lock_id, locked_by=self.locked_by, expires_at=now() + self.timeout) + break + except IntegrityError: # NOQA + # very rare: other process acquired a lock between exiting inner loop and + # creating DBMutex object + continue else: try: self._mutex = DBMutex.objects.create( From 135f088c2b500c41a8e042216faecd490940b945 Mon Sep 17 00:00:00 2001 From: Jacob Rief - Django-Migration Date: Tue, 1 Dec 2020 21:22:57 +0100 Subject: [PATCH 7/9] ignore empty file --- .gitignore | 1 + post_office/test_settings.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index eb36cfc2..63059bd8 100644 --- a/.gitignore +++ b/.gitignore @@ -36,5 +36,6 @@ ghostdriver.log *~ *.lock .mypy_cache +*.sqlite3 post_office_attachments diff --git a/post_office/test_settings.py b/post_office/test_settings.py index c2ece944..9635e44d 100644 --- a/post_office/test_settings.py +++ b/post_office/test_settings.py @@ -10,10 +10,10 @@ DATABASES = { 'default': { - 'NAME': 'empty.sqlite3', # os.path.join(BASE_DIR, 'tests/db.sqlite3'), + 'NAME': 'empty.sqlite3', # required, but unused 'ENGINE': 'django.db.backends.sqlite3', 'TEST': { - 'NAME': os.path.join(BASE_DIR, 'tests/db.sqlite3'), + 'NAME': 'testdb.sqlite3', } }, } From 070b34a3cda482f571f969eac5fda037a99fac91 Mon Sep 17 00:00:00 2001 From: Jacob Rief - Django-Migration Date: Wed, 2 Dec 2020 22:01:01 +0100 Subject: [PATCH 8/9] add command to manage dblocks --- post_office/management/commands/dblocks.py | 31 ++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 post_office/management/commands/dblocks.py diff --git a/post_office/management/commands/dblocks.py b/post_office/management/commands/dblocks.py new file mode 100644 index 00000000..a5d9d854 --- /dev/null +++ b/post_office/management/commands/dblocks.py @@ -0,0 +1,31 @@ +from django.core.management.base import BaseCommand +from django.utils.timezone import now, localtime + +from post_office.models import DBMutex + + +class Command(BaseCommand): + help = "Manage DB locks." + + def add_arguments(self, parser): + parser.add_argument('-d', '--delete', dest='delete_expired', action='store_true', + help="Delete expired locks.") + + parser.add_argument('--delete-all', action='store_true', + help="Delete all locks.") + + def handle(self, verbosity, delete_expired, delete_all, **options): + num_locks = 0 + if delete_all: + num_locks, _ = DBMutex.objects.all().delete() + elif delete_expired: + num_locks, _ = DBMutex.objects.filter(expires_at__lt=now()).delete() + if num_locks > 0: + self.stdout.write("Deleted {} lock(s).".format(num_locks)) + msg = "Lock: '{lock_id}'{expire}." + for entry in DBMutex.objects.all(): + if entry.expires_at < now(): + expire = " (expired)" + else: + expire = " (expires at {})".format(localtime(entry.expires_at.replace(microsecond=0))) + self.stdout.write(msg.format(lock_id=entry.lock_id, expire=expire)) From ebd654c623696a03ccf731b717871edd4e67bca7 Mon Sep 17 00:00:00 2001 From: Jacob Rief Date: Thu, 25 Jul 2024 16:54:28 +0200 Subject: [PATCH 9/9] fix typo --- post_office/dblock.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/post_office/dblock.py b/post_office/dblock.py index d7e1d9af..fbaedfb4 100644 --- a/post_office/dblock.py +++ b/post_office/dblock.py @@ -58,7 +58,7 @@ class db_lock: # Use a decorator to mark a whole function as critical @db_lock('my_lock') def do_something(): - # do somethinge here + # do something here ``` This function may raise a ``LockedException`` or a ``TimeoutException``.