diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 44f01fe..7be073c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,11 +9,12 @@ jobs: strategy: matrix: - python: [3.6, 3.7, 3.8] - django: [2.2] + python: [3.6, 3.7, 3.8, 3.9] + django: [3.1, 3.2] database_url: - postgres://runner:password@localhost/project - mysql://root:root@127.0.0.1/project + - 'sqlite:///:memory:' services: postgres: diff --git a/.github/workflows/pypi.yml b/.github/workflows/pypi.yml new file mode 100644 index 0000000..fd2d1c3 --- /dev/null +++ b/.github/workflows/pypi.yml @@ -0,0 +1,26 @@ +name: Upload Python Package + +on: + release: + types: [created] + +jobs: + deploy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: '3.8' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install setuptools wheel twine + - name: Build and publish + env: + TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }} + TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }} + run: | + python setup.py sdist bdist_wheel + twine upload dist/* diff --git a/README.md b/README.md index 75969ad..5a55d00 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,14 @@ django-db-queue ========== -[![Build Status](https://travis-ci.org/dabapps/django-db-queue.svg)](https://travis-ci.org/dabapps/django-db-queue) [![pypi release](https://img.shields.io/pypi/v/django-db-queue.svg)](https://pypi.python.org/pypi/django-db-queue) -Simple databased-backed job queue. Jobs are defined in your settings, and are processed by management commands. +Simple database-backed job queue. Jobs are defined in your settings, and are processed by management commands. Asynchronous tasks are run via a *job queue*. This system is designed to support multi-step job workflows. Supported and tested against: -- Django 1.11 and 2.2 -- Python 3.5, 3.6, 3.7 and 3.8 - -This package may still work with older versions of Django and Python but they aren't explicitly supported. +- Django 3.1 and 3.2 +- Python 3.6, 3.7, 3.8 and 3.9 ## Getting Started @@ -28,6 +25,10 @@ Add `django_dbq` to your installed apps 'django_dbq', ) +### Upgrading from 1.x to 2.x + +Note that version 2.x only supports Django 3.1 or newer. If you need support for Django 2.2, please stick with the latest 1.x release. + ### Describe your job In e.g. project.common.jobs: @@ -209,15 +210,6 @@ jobs from the database which are in state `COMPLETE` or `FAILED` and were created more than 24 hours ago. This could be run, for example, as a cron task, to ensure the jobs table remains at a reasonable size. -##### manage.py create_job -For debugging/development purposes, a simple management command is supplied to create jobs: - - manage.py create_job --queue_name 'my_queue_name' --workspace '{"key": "value"}' - -The `workspace` flag is optional. If supplied, it must be a valid JSON string. - -`queue_name` is optional and defaults to `default` - ##### manage.py worker To start a worker: diff --git a/django_dbq/__init__.py b/django_dbq/__init__.py index 9c73af2..8c0d5d5 100644 --- a/django_dbq/__init__.py +++ b/django_dbq/__init__.py @@ -1 +1 @@ -__version__ = "1.3.1" +__version__ = "2.0.0" diff --git a/django_dbq/management/commands/create_job.py b/django_dbq/management/commands/create_job.py deleted file mode 100644 index c060218..0000000 --- a/django_dbq/management/commands/create_job.py +++ /dev/null @@ -1,59 +0,0 @@ -from django.conf import settings -from django.core.management.base import BaseCommand, CommandError -from django_dbq.models import Job -import json -import logging - - -logger = logging.getLogger(__name__) - - -class Command(BaseCommand): - - help = "Create a job" - args = "" - - def add_arguments(self, parser): - parser.add_argument("args", nargs="+") - parser.add_argument( - "--workspace", - action="store_true", - dest="workspace", - default=None, - help="JSON-formatted initial commandworkspace.", - ) - parser.add_argument( - "--queue_name", - action="store_true", - dest="queue_name", - default=None, - help="A specific queue to add this job to", - ) - - def handle(self, *args, **options): - if len(args) != 1: - raise CommandError("Please supply a single job name") - - name = args[0] - if name not in settings.JOBS: - raise CommandError('"%s" is not a valid job name' % name) - - workspace = options["workspace"] - if workspace: - workspace = json.loads(workspace) - - queue_name = options["queue_name"] - - kwargs = { - "name": name, - "workspace": workspace, - } - - if queue_name: - kwargs["queue_name"] = queue_name - - job = Job.objects.create(**kwargs) - self.stdout.write( - 'Created job: "%s", id=%s for queue "%s"' - % (job.name, job.pk, queue_name if queue_name else "default") - ) diff --git a/django_dbq/management/commands/worker.py b/django_dbq/management/commands/worker.py index a09a0a9..92e72e4 100644 --- a/django_dbq/management/commands/worker.py +++ b/django_dbq/management/commands/worker.py @@ -3,9 +3,9 @@ from django.utils import timezone from django.utils.module_loading import import_string from django_dbq.models import Job -from simplesignals.process import WorkerProcessBase from time import sleep import logging +import signal logger = logging.getLogger(__name__) @@ -74,17 +74,27 @@ def process_job(queue_name): raise -class Worker(WorkerProcessBase): - - process_title = "jobworker" - +class Worker: def __init__(self, name, rate_limit_in_seconds): self.queue_name = name self.rate_limit_in_seconds = rate_limit_in_seconds + self.alive = True self.last_job_finished = None - super(Worker, self).__init__() + self.init_signals() + + def init_signals(self): + signal.signal(signal.SIGINT, self.shutdown) + signal.signal(signal.SIGQUIT, self.shutdown) + signal.signal(signal.SIGTERM, self.shutdown) + + def shutdown(self, signum, frame): + self.alive = False + + def run(self): + while self.alive: + self.process_job() - def do_work(self): + def process_job(self): sleep(1) if ( self.last_job_finished diff --git a/django_dbq/migrations/0001_initial.py b/django_dbq/migrations/0001_initial.py index 0ec8c73..d5114d3 100644 --- a/django_dbq/migrations/0001_initial.py +++ b/django_dbq/migrations/0001_initial.py @@ -2,7 +2,6 @@ from __future__ import unicode_literals from django.db import models, migrations -import jsonfield.fields import uuid from django.db.models import UUIDField @@ -44,7 +43,7 @@ class Migration(migrations.Migration): ), ), ("next_task", models.CharField(max_length=100, blank=True)), - ("workspace", jsonfield.fields.JSONField(null=True)), + ("workspace", models.TextField(null=True)), ( "queue_name", models.CharField(db_index=True, max_length=20, default="default"), diff --git a/django_dbq/migrations/0004_auto_20210818_0247.py b/django_dbq/migrations/0004_auto_20210818_0247.py new file mode 100644 index 0000000..a1ff5ff --- /dev/null +++ b/django_dbq/migrations/0004_auto_20210818_0247.py @@ -0,0 +1,32 @@ +# Generated by Django 3.2rc1 on 2021-08-18 02:47 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("django_dbq", "0003_auto_20180713_1000"), + ] + + operations = [ + migrations.AlterField( + model_name="job", + name="state", + field=models.CharField( + choices=[ + ("NEW", "New"), + ("READY", "Ready"), + ("PROCESSING", "Processing"), + ("FAILED", "Failed"), + ("COMPLETE", "Complete"), + ], + db_index=True, + default="NEW", + max_length=20, + ), + ), + migrations.AlterField( + model_name="job", name="workspace", field=models.JSONField(null=True), + ), + ] diff --git a/django_dbq/models.py b/django_dbq/models.py index 24ee6aa..2f51a78 100644 --- a/django_dbq/models.py +++ b/django_dbq/models.py @@ -6,8 +6,7 @@ get_failure_hook_name, get_creation_hook_name, ) -from jsonfield import JSONField -from django.db.models import UUIDField, Count +from django.db.models import JSONField, UUIDField, Count, TextChoices import datetime import logging import uuid @@ -74,27 +73,19 @@ def to_process(self, queue_name): class Job(models.Model): - class STATES: + class STATES(TextChoices): NEW = "NEW" READY = "READY" PROCESSING = "PROCESSING" FAILED = "FAILED" COMPLETE = "COMPLETE" - STATE_CHOICES = [ - (STATES.NEW, "NEW"), - (STATES.READY, "READY"), - (STATES.PROCESSING, "PROCESSING"), - (STATES.FAILED, "FAILED"), - (STATES.COMPLETE, "COMPLETE"), - ] - id = UUIDField(primary_key=True, default=uuid.uuid4, editable=False) created = models.DateTimeField(auto_now_add=True, db_index=True) modified = models.DateTimeField(auto_now=True) name = models.CharField(max_length=100) state = models.CharField( - max_length=20, choices=STATE_CHOICES, default=STATES.NEW, db_index=True + max_length=20, choices=STATES.choices, default=STATES.NEW, db_index=True ) next_task = models.CharField(max_length=100, blank=True) workspace = JSONField(null=True) @@ -107,9 +98,7 @@ class Meta: objects = JobManager() def save(self, *args, **kwargs): - is_new = not Job.objects.filter(pk=self.pk).exists() - - if is_new: + if self._state.adding: self.next_task = get_next_task_name(self.name) self.workspace = self.workspace or {} @@ -121,7 +110,7 @@ def save(self, *args, **kwargs): ) return # cancel the save - return super(Job, self).save(*args, **kwargs) + return super().save(*args, **kwargs) def update_next_task(self): self.next_task = get_next_task_name(self.name, self.next_task) or "" diff --git a/django_dbq/serializers.py b/django_dbq/serializers.py deleted file mode 100644 index 12b40d5..0000000 --- a/django_dbq/serializers.py +++ /dev/null @@ -1,29 +0,0 @@ -from django.conf import settings -from django_dbq.models import Job -from rest_framework import serializers -import json - - -class JobSerializer(serializers.Serializer): - name = serializers.ChoiceField() - created = serializers.DateTimeField(read_only=True) - modified = serializers.DateTimeField(read_only=True) - state = serializers.CharField(read_only=True) - workspace = serializers.WritableField(required=False) - url = serializers.HyperlinkedIdentityField(view_name="job_detail") - - def __init__(self, *args, **kwargs): - super(JobSerializer, self).__init__(*args, **kwargs) - self.fields["name"].choices = ((key, key) for key in settings.JOBS) - - def validate_workspace(self, attrs, source): - workspace = attrs.get("workspace") - if workspace and isinstance(workspace, basestring): - try: - attrs["workspace"] = json.loads(workspace) - except ValueError: - raise serializers.ValidationError("Invalid JSON") - return attrs - - def restore_object(self, attrs, instance=None): - return Job(**attrs) diff --git a/django_dbq/tests.py b/django_dbq/tests.py index f354551..33df2f5 100644 --- a/django_dbq/tests.py +++ b/django_dbq/tests.py @@ -10,10 +10,7 @@ from django_dbq.management.commands.worker import process_job, Worker from django_dbq.models import Job -try: - from StringIO import StringIO -except ImportError: - from io import StringIO +from io import StringIO def test_task(job=None): @@ -37,34 +34,6 @@ def creation_hook(job): job.workspace["output"] = "creation hook ran" -@override_settings(JOBS={"testjob": {"tasks": ["a"]}}) -class JobManagementCommandTestCase(TestCase): - def test_create_job(self): - call_command("create_job", "testjob", stdout=StringIO()) - job = Job.objects.get() - self.assertEqual(job.name, "testjob") - self.assertEqual(job.queue_name, "default") - - def test_create_job_with_workspace(self): - workspace = '{"test": "test"}' - call_command("create_job", "testjob", workspace=workspace, stdout=StringIO()) - job = Job.objects.get() - self.assertEqual(job.workspace, {"test": "test"}) - - def test_create_job_with_queue_name(self): - call_command("create_job", "testjob", queue_name="lol", stdout=StringIO()) - job = Job.objects.get() - self.assertEqual(job.name, "testjob") - self.assertEqual(job.queue_name, "lol") - - def test_errors_raised_correctly(self): - with self.assertRaises(CommandError): - call_command("create_job", stdout=StringIO()) - - with self.assertRaises(CommandError): - call_command("create_job", "some_other_job", stdout=StringIO()) - - @override_settings(JOBS={"testjob": {"tasks": ["a"]}}) class WorkerManagementCommandTestCase(TestCase): def test_worker_no_args(self): @@ -148,7 +117,7 @@ def test_queue_depth_for_queue_with_zero_jobs(self): @freezegun.freeze_time() @mock.patch("django_dbq.management.commands.worker.sleep") @mock.patch("django_dbq.management.commands.worker.process_job") -class WorkerProcessDoWorkTestCase(TestCase): +class WorkerProcessProcessJobTestCase(TestCase): def setUp(self): super().setUp() self.MockWorker = mock.MagicMock() @@ -156,17 +125,17 @@ def setUp(self): self.MockWorker.rate_limit_in_seconds = 5 self.MockWorker.last_job_finished = None - def test_do_work_no_previous_job_run(self, mock_process_job, mock_sleep): - Worker.do_work(self.MockWorker) + def test_process_job_no_previous_job_run(self, mock_process_job, mock_sleep): + Worker.process_job(self.MockWorker) self.assertEqual(mock_sleep.call_count, 1) self.assertEqual(mock_process_job.call_count, 1) self.assertEqual(self.MockWorker.last_job_finished, timezone.now()) - def test_do_work_previous_job_too_soon(self, mock_process_job, mock_sleep): + def test_process_job_previous_job_too_soon(self, mock_process_job, mock_sleep): self.MockWorker.last_job_finished = timezone.now() - timezone.timedelta( seconds=2 ) - Worker.do_work(self.MockWorker) + Worker.process_job(self.MockWorker) self.assertEqual(mock_sleep.call_count, 1) self.assertEqual(mock_process_job.call_count, 0) self.assertEqual( @@ -174,11 +143,11 @@ def test_do_work_previous_job_too_soon(self, mock_process_job, mock_sleep): timezone.now() - timezone.timedelta(seconds=2), ) - def test_do_work_previous_job_long_time_ago(self, mock_process_job, mock_sleep): + def test_process_job_previous_job_long_time_ago(self, mock_process_job, mock_sleep): self.MockWorker.last_job_finished = timezone.now() - timezone.timedelta( seconds=7 ) - Worker.do_work(self.MockWorker) + Worker.process_job(self.MockWorker) self.assertEqual(mock_sleep.call_count, 1) self.assertEqual(mock_process_job.call_count, 1) self.assertEqual(self.MockWorker.last_job_finished, timezone.now()) diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 96ce7c6..0000000 --- a/requirements.txt +++ /dev/null @@ -1,5 +0,0 @@ -django-model-utils==2.3.1 -django-uuidfield==0.5.0 -jsonfield==1.0.3 -Django>=2.2<3.0 -simplesignals==0.3.0 diff --git a/setup.py b/setup.py index 25cc24a..5bb0f76 100644 --- a/setup.py +++ b/setup.py @@ -16,9 +16,7 @@ author_email = "contact@dabapps.com" license = "BSD" install_requires = [ - "jsonfield==2.0.2", - "Django>=2.2", - "simplesignals==0.3.0", + "Django>=3.1", ] long_description = """Simple database-backed job queue system""" diff --git a/test-requirements.txt b/test-requirements.txt index dd2ea67..249c8b3 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,4 +1,3 @@ --r requirements.txt mysqlclient==1.4.6 freezegun==0.3.12 mock==3.0.5 diff --git a/testsettings.py b/testsettings.py index 6cb7863..040eade 100644 --- a/testsettings.py +++ b/testsettings.py @@ -1,21 +1,15 @@ import os import dj_database_url + +DATABASE_URL = os.environ.get("DATABASE_URL", "sqlite:///:memory:") + DATABASES = { - "default": dj_database_url.parse(os.environ["DATABASE_URL"]), + "default": dj_database_url.parse(DATABASE_URL), } INSTALLED_APPS = ("django_dbq",) -MIDDLEWARE_CLASSES = ( - "django.contrib.sessions.middleware.SessionMiddleware", - "django.middleware.common.CommonMiddleware", - "django.middleware.csrf.CsrfViewMiddleware", - "django.contrib.auth.middleware.AuthenticationMiddleware", - "django.contrib.messages.middleware.MessageMiddleware", - "django.middleware.clickjacking.XFrameOptionsMiddleware", -) - SECRET_KEY = "abcde12345" LOGGING = {