diff --git a/README.md b/README.md index 5a55d00..8556a3f 100644 --- a/README.md +++ b/README.md @@ -129,6 +129,17 @@ Job.objects.create(name='critical_job', priority=2) Jobs will be ordered by their `priority` (highest to lowest) and then the time which they were created (oldest to newest) and processed in that order. +### Scheduling jobs +If you'd like to create a job but have it run at some time in the future, you can use the `run_after` field on the Job model: + +```python +Job.objects.create(name='scheduled_job', run_after=timezone.now() + timedelta(minutes=10)) +``` + +Of course, the scheduled job will only be run if your `python manage.py worker` process is running at the time when the job is scheduled to run. Otherwise, it will run the next time you start your worker process after that time has passed. + +It's also worth noting that, by default, scheduled jobs run as part of the same queue as all other jobs, and so if a job is already being processed at the time when your scheduled job is due to run, it won't run until that job has finished. If increased precision is important, you might consider using the `queue_name` feature to run a separate worker dedicated to only running scheduled jobs. + ## Terminology ### Job diff --git a/django_dbq/migrations/0005_job_run_after.py b/django_dbq/migrations/0005_job_run_after.py new file mode 100644 index 0000000..67a2c0d --- /dev/null +++ b/django_dbq/migrations/0005_job_run_after.py @@ -0,0 +1,18 @@ +# Generated by Django 3.2rc1 on 2021-11-04 03:32 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("django_dbq", "0004_auto_20210818_0247"), + ] + + operations = [ + migrations.AddField( + model_name="job", + name="run_after", + field=models.DateTimeField(db_index=True, null=True), + ), + ] diff --git a/django_dbq/models.py b/django_dbq/models.py index 2f51a78..5669861 100644 --- a/django_dbq/models.py +++ b/django_dbq/models.py @@ -68,7 +68,12 @@ def delete_old(self): def to_process(self, queue_name): return self.select_for_update().filter( - queue_name=queue_name, state__in=(Job.STATES.READY, Job.STATES.NEW) + models.Q(queue_name=queue_name) + & models.Q(state__in=(Job.STATES.READY, Job.STATES.NEW)) + & models.Q( + models.Q(run_after__isnull=True) + | models.Q(run_after__lte=timezone.now()) + ) ) @@ -91,6 +96,7 @@ class STATES(TextChoices): workspace = JSONField(null=True) queue_name = models.CharField(max_length=20, default="default", db_index=True) priority = models.SmallIntegerField(default=0, db_index=True) + run_after = models.DateTimeField(null=True, db_index=True) class Meta: ordering = ["-priority", "created"] diff --git a/django_dbq/tests.py b/django_dbq/tests.py index 33df2f5..2c8e0e9 100644 --- a/django_dbq/tests.py +++ b/django_dbq/tests.py @@ -210,6 +210,20 @@ def test_gets_jobs_in_priority_and_date_order(self): self.assertEqual(Job.objects.get_ready_or_none("default"), job_1) self.assertFalse(Job.objects.to_process("default").filter(id=job_2.id).exists()) + def test_ignores_jobs_until_run_after_is_in_the_past(self): + job_1 = Job.objects.create(name="testjob") + job_2 = Job.objects.create(name="testjob", run_after=datetime(2021, 11, 4, 8)) + + with freezegun.freeze_time(datetime(2021, 11, 4, 7)): + self.assertEqual( + {job for job in Job.objects.to_process("default")}, {job_1} + ) + + with freezegun.freeze_time(datetime(2021, 11, 4, 9)): + self.assertEqual( + {job for job in Job.objects.to_process("default")}, {job_1, job_2} + ) + def test_get_next_ready_job_created(self): """ Created jobs should be picked too.