diff --git a/README.md b/README.md index f8e2831..3cdef77 100644 --- a/README.md +++ b/README.md @@ -274,11 +274,12 @@ to ensure the jobs table remains at a reasonable size. To start a worker: ``` -manage.py worker [queue_name] [--rate_limit] +manage.py worker [queue_name] [--rate_limit] [--drain] ``` - `queue_name` is optional, and will default to `default` - The `--rate_limit` flag is optional, and will default to `1`. It is the minimum number of seconds that must have elapsed before a subsequent job can be run. +- The `--drain` flag, if provided, will cause the worker to drain all currently queued jobs and then exit. This is useful if your hosting environment does not support persistent background processes, but does support custom cron jobs. ##### manage.py queue_depth If you'd like to check your queue depth from the command line, you can run `manage.py queue_depth [queue_name [queue_name ...]]` and any diff --git a/django_dbq/management/commands/worker.py b/django_dbq/management/commands/worker.py index 9215aad..fb032c2 100644 --- a/django_dbq/management/commands/worker.py +++ b/django_dbq/management/commands/worker.py @@ -15,9 +15,10 @@ class Worker: - def __init__(self, name, rate_limit_in_seconds): + def __init__(self, name, rate_limit_in_seconds, drain=False): self.queue_name = name self.rate_limit_in_seconds = rate_limit_in_seconds + self.drain = drain self.alive = True self.last_job_finished = None self.current_job = None @@ -59,6 +60,8 @@ def _process_job(self): with transaction.atomic(): job = Job.objects.get_ready_or_none(self.queue_name) if not job: + if self.drain: + self.alive = False return logger.info( @@ -125,6 +128,13 @@ def add_arguments(self, parser): default=1, type=int, ) + parser.add_argument( + "--drain", + help="Process all jobs in the queue and then exit", + action="store_true", + dest="drain", + default=False, + ) parser.add_argument( "--dry-run", action="store_true", @@ -142,13 +152,14 @@ def handle(self, *args, **options): queue_name = options["queue_name"] rate_limit_in_seconds = options["rate_limit"] + drain = options["drain"] self.stdout.write( 'Starting job worker for queue "%s" with rate limit of one job per %s second(s)' % (queue_name, rate_limit_in_seconds) ) - worker = Worker(queue_name, rate_limit_in_seconds) + worker = Worker(queue_name, rate_limit_in_seconds, drain=drain) if options["dry_run"]: return diff --git a/django_dbq/tests.py b/django_dbq/tests.py index 3ae7ab9..3998268 100644 --- a/django_dbq/tests.py +++ b/django_dbq/tests.py @@ -293,6 +293,16 @@ def test_process_job_wrong_queue(self): self.assertEqual(job.state, Job.STATES.NEW) +@override_settings(JOBS={"testjob": {"tasks": ["django_dbq.tests.test_task"]}}) +class DrainTestCase(TestCase): + def test_drain(self): + jobs = [Job.objects.create(name="testjob") for _ in range(3)] + call_command("worker", drain=True) + for job in jobs: + job.refresh_from_db() + self.assertEqual(job.state, Job.STATES.COMPLETE) + + @override_settings( JOBS={ "testjob": {