From 759eb00751471bb1679e3fcc82d0894d43e70bff Mon Sep 17 00:00:00 2001 From: Pete Wildsmith Date: Tue, 26 Apr 2016 16:33:27 +0100 Subject: [PATCH] Revert "Use multiprocessing to run tasks in child processes" This reverts commit e941ddb559e32f6732f9034687a9703aca3f62d6. --- django_dbq/management/commands/worker.py | 32 +++++++++--------------- 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/django_dbq/management/commands/worker.py b/django_dbq/management/commands/worker.py index 138da73..326a030 100644 --- a/django_dbq/management/commands/worker.py +++ b/django_dbq/management/commands/worker.py @@ -6,7 +6,6 @@ from simplesignals.process import WorkerProcessBase from time import sleep import logging -import multiprocessing logger = logging.getLogger(__name__) @@ -15,8 +14,18 @@ DEFAULT_QUEUE_NAME = 'default' -def run_next_task(job): - """Updates a job by running its next task""" +def process_job(queue_name): + """This function grabs the next available job for a given queue, and runs its next task.""" + + with transaction.atomic(): + job = Job.objects.get_ready_or_none(queue_name) + if not job: + return + + logger.info('Processing job: name="%s" queue="%s" id=%s state=%s next_task=%s', job.name, queue_name, job.pk, job.state, job.next_task) + job.state = Job.STATES.PROCESSING + job.save() + try: task_function = import_by_path(job.next_task) task_function(job) @@ -46,23 +55,6 @@ def run_next_task(job): raise -def process_job(queue_name): - """This function grabs the next available job for a given queue, and runs its next task.""" - - with transaction.atomic(): - job = Job.objects.get_ready_or_none(queue_name) - if not job: - return - - logger.info('Processing job: name="%s" queue="%s" id=%s state=%s next_task=%s', job.name, queue_name, job.pk, job.state, job.next_task) - job.state = Job.STATES.PROCESSING - job.save() - - child = multiprocessing.Process(target=run_next_task, args=(job,)) - child.start() - child.join() - - class Worker(WorkerProcessBase): process_title = "jobworker"