Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor towards using a database mutex #351

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ ghostdriver.log
*~
*.lock
.mypy_cache
*.sqlite3

post_office_attachments
169 changes: 169 additions & 0 deletions post_office/dblock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import atexit
from datetime import timedelta
import signal
import time
from uuid import uuid4

from django.core.exceptions import ImproperlyConfigured
from django.db import IntegrityError, DatabaseError
from django.utils.timezone import now

from post_office.models import DBMutex


class LockedException(Exception):
pass


class TimeoutException(Exception):
pass


class db_lock:
"""
An entity that can lock a shared resource and release it through database locking.
Locks can be acquired by different hosts, since the source of truth is handled by the database.

Usage:

```
# Lock a critical section of code
with db_lock('my_lock', timedelta(seconds=30)):
do_something()
```
The section inside the ``with`` statement will run for a maximum of 30 seconds. If that
time elapses before leaving the block, a ``TimeoutException`` is raised.

If another process attempts to run a critical section of code, using the same resource
identifier while the above block is running, a ``LockedException`` is raised.

```
# Blocking variant of the above
with db_lock('my_lock', wait=True):
do_something()
```
By using the parameter ``wait=True``, the critical section of code is only entered after
mutexes from other processes have been released. A ``LockedException`` will not be raised.

```
# Running critical section until it expires
with db_lock('my_lock') as lock:
while lock.remaining_time > timedelta(seconds=1):
do_something() # never requires more than one second
```
By querying the remaining time left over for the acquisition of the lock, one can
avoid a ``TimeoutException`` to be raised.

```
# Use a decorator to mark a whole function as critical
@db_lock('my_lock')
def do_something():
# do something here
```
This function may raise a ``LockedException`` or a ``TimeoutException``.

```
# Lock critical section of code explicitly
lock = db_lock('my_lock')
lock.acquire()
do_something()
lock.release()
```
A lock can also be acquired and released explicitly. This is error-prone, because it relies
upon releasing the lock.
"""
GRANULARITY = timedelta(milliseconds=100)
locked_by = uuid4()

def __init__(self, lock_id, timeout=timedelta(minutes=1), wait=False):
self.lock_id = lock_id[:50]
if not isinstance(timeout, timedelta):
raise ValueError("DB lock timeout must be of type timedelta.")
if timeout < self.GRANULARITY:
raise ImproperlyConfigured("DB lock timeout must be at least {}.".format(self.GRANULARITY))
self.timeout = timeout
self.wait = wait
self._mutex = None

def __call__(self, func):
return self._decorate(func)

def __enter__(self):
self.acquire()
return self

def __exit__(self, type, value, traceback):
self.release()

@property
def remaining_time(self):
"""
@:return: The remaining time until this lock expires.
"""
return self._mutex.expires_at - now()

def acquire(self):
"""
Aquires a mutual exclusive lock in the database.
"""
def stop_on_alarm(*args):
raise TimeoutException()

signal.signal(signal.SIGALRM, stop_on_alarm)
granularity = self.GRANULARITY.total_seconds()
while self.wait:
# the following call may block, until lock is released by another process
mutex = DBMutex.objects.filter(lock_id=self.lock_id, expires_at__gt=now()).first()
while mutex:
remaining = mutex.expires_at - now()
time.sleep(remaining.total_seconds() if remaining < self.GRANULARITY else granularity)
try:
mutex.refresh_from_db()
except DBMutex.DoesNotExist:
mutex = None
try:
self._mutex = DBMutex.objects.create(
lock_id=self.lock_id, locked_by=self.locked_by, expires_at=now() + self.timeout)
break
except IntegrityError: # NOQA
# very rare: other process acquired a lock between exiting inner loop and
# creating DBMutex object
continue
else:
try:
self._mutex = DBMutex.objects.create(
lock_id=self.lock_id, locked_by=self.locked_by, expires_at=now() + self.timeout)
except IntegrityError:
raise LockedException("DB mutex for {} is locked.".format(self.lock_id))

# install a timeout handler, in case the lock expires before being released
signal.setitimer(signal.ITIMER_REAL, (self._mutex.expires_at - now()).total_seconds())

def release(self):
"""
Release a lock previously acquired.
"""
if self._mutex:
signal.setitimer(signal.ITIMER_REAL, 0)
self._mutex.delete()
self._mutex = None

def _decorate(self, func):
def wrapper(*args, **kwargs):
with self:
result = func(*args, **kwargs)
return result
return wrapper

@classmethod
def _release_all_locks(cls):
"""
Release all locks assigned to the running instance.
"""
try:
DBMutex.objects.filter(locked_by=cls.locked_by).delete()
except DatabaseError:
pass


atexit.register(db_lock._release_all_locks)
153 changes: 0 additions & 153 deletions post_office/lockfile.py

This file was deleted.

12 changes: 7 additions & 5 deletions post_office/mail.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from multiprocessing.dummy import Pool as ThreadPool

from .connections import connections
from .lockfile import default_lockfile, FileLock, FileLocked
from .dblock import db_lock, TimeoutException, LockedException
from .logutils import setup_loghandlers
from .models import Email, EmailTemplate, Log, PRIORITY, STATUS
from .settings import (
Expand Down Expand Up @@ -444,13 +444,13 @@ def send(email):
return len(sent_emails), num_failed, num_requeued


def send_queued_mail_until_done(lockfile=default_lockfile, processes=1, log_level=None):
def send_queued_mail_until_done(processes=1, log_level=None):
"""
Send mail in queue batch by batch, until all emails have been processed.
"""
try:
with FileLock(lockfile):
logger.info('Acquired lock for sending queued emails at %s.lock', lockfile)
with db_lock('send_queued_mail_until_done'):
logger.info('Acquired lock for sending queued emails')
while True:
try:
send_queued(processes, log_level)
Expand All @@ -463,5 +463,7 @@ def send_queued_mail_until_done(lockfile=default_lockfile, processes=1, log_leve

if not get_queued().exists():
break
except FileLocked:
except TimeoutException:
logger.info('Sending queued mail required too long, terminating now.')
except LockedException:
logger.info('Failed to acquire lock, terminating now.')
31 changes: 31 additions & 0 deletions post_office/management/commands/dblocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from django.core.management.base import BaseCommand
from django.utils.timezone import now, localtime

from post_office.models import DBMutex


class Command(BaseCommand):
help = "Manage DB locks."

def add_arguments(self, parser):
parser.add_argument('-d', '--delete', dest='delete_expired', action='store_true',
help="Delete expired locks.")

parser.add_argument('--delete-all', action='store_true',
help="Delete all locks.")

def handle(self, verbosity, delete_expired, delete_all, **options):
num_locks = 0
if delete_all:
num_locks, _ = DBMutex.objects.all().delete()
elif delete_expired:
num_locks, _ = DBMutex.objects.filter(expires_at__lt=now()).delete()
if num_locks > 0:
self.stdout.write("Deleted {} lock(s).".format(num_locks))
msg = "Lock: '{lock_id}'{expire}."
for entry in DBMutex.objects.all():
if entry.expires_at < now():
expire = " (expired)"
else:
expire = " (expires at {})".format(localtime(entry.expires_at.replace(microsecond=0)))
self.stdout.write(msg.format(lock_id=entry.lock_id, expire=expire))
Loading
Loading