From ea9d3b66f9b2353884a23575a8222b42ece21833 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Todorovich?= Date: Fri, 6 Jun 2025 12:50:42 -0300 Subject: [PATCH] [IMP] queue_job: add_depends on RetryableJobError This commit adds the posibility to add new dependencies to a job when raising a RetryableJobError. --- queue_job/controllers/main.py | 13 ++++++++--- queue_job/exception.py | 6 ++++- queue_job/job.py | 5 ++++ test_queue_job/models/test_models.py | 15 ++++++++++++ test_queue_job/tests/test_job.py | 34 ++++++++++++++++++++++++++++ 5 files changed, 69 insertions(+), 4 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index ca3e02acaa..60868492a7 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -76,11 +76,16 @@ def runjob(self, db, job_uuid, **kw): http.request.session.db = db env = http.request.env(user=SUPERUSER_ID) - def retry_postpone(job, message, seconds=None): + def retry_postpone(job, message, seconds=None, add_depends=None): job.env.clear() with registry(job.env.cr.dbname).cursor() as new_cr: - job.env = api.Environment(new_cr, SUPERUSER_ID, {}) + new_env = api.Environment(new_cr, SUPERUSER_ID, {}) + job.env = new_env job.postpone(result=message, seconds=seconds) + if add_depends: + for dependency in add_depends: + dependency.env = new_env + dependency.store() job.set_pending(reset_retry=False) job.store() @@ -126,7 +131,9 @@ def retry_postpone(job, message, seconds=None): except RetryableJobError as err: # delay the job later, requeue - retry_postpone(job, str(err), seconds=err.seconds) + retry_postpone( + job, str(err), seconds=err.seconds, add_depends=err.add_depends + ) _logger.debug("%s postponed", job) # Do not trigger the error up because we don't want an exception # traceback in the logs we should have the traceback when all diff --git a/queue_job/exception.py b/queue_job/exception.py index 093344ed3d..7bc5cd1e1d 100644 --- a/queue_job/exception.py +++ b/queue_job/exception.py @@ -26,12 +26,16 @@ class RetryableJobError(JobError): by :const:`odoo.addons.queue_job.job.RETRY_INTERVAL` if nothing is defined. If ``ignore_retry`` is True, the retry counter will not be increased. + + If ``add_depends`` is provided, the jobs will be added as dependencies to + the current job. """ - def __init__(self, msg, seconds=None, ignore_retry=False): + def __init__(self, msg, seconds=None, ignore_retry=False, add_depends=None): super().__init__(msg) self.seconds = seconds self.ignore_retry = ignore_retry + self.add_depends = add_depends # TODO: remove support of NothingToDo: too dangerous diff --git a/queue_job/job.py b/queue_job/job.py index e03dd2b517..ff52236af5 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -576,6 +576,11 @@ def perform(self): try: self.result = self.func(*tuple(self.args), **self.kwargs) except RetryableJobError as err: + if err.add_depends: + from .delay import DelayableGraph + + DelayableGraph._ensure_same_graph_uuid([self] + err.add_depends) + self.add_depends(err.add_depends) if err.ignore_retry: self.retry -= 1 raise diff --git a/test_queue_job/models/test_models.py b/test_queue_job/models/test_models.py index ff9622106a..3ef4e3732f 100644 --- a/test_queue_job/models/test_models.py +++ b/test_queue_job/models/test_models.py @@ -64,6 +64,21 @@ def create_ir_logging(self, message, level="info"): } ) + def job_with_retry_and_new_dependency(self): + logging_domain = [ + ("name", "=", "test_queue_job"), + ("message", "=", "job_with_retry_and_new_dependency"), + ] + if not self.env["ir.logging"].search_count(logging_domain): + new_job = self.with_delay().create_ir_logging( + message="job_with_retry_and_new_dependency" + ) + raise RetryableJobError( + "Must be retried after creating the logging", + add_depends=[new_job], + ) + return True + def no_description(self): return diff --git a/test_queue_job/tests/test_job.py b/test_queue_job/tests/test_job.py index 35884cd2b3..f5a268a675 100644 --- a/test_queue_job/tests/test_job.py +++ b/test_queue_job/tests/test_job.py @@ -87,6 +87,40 @@ def test_infinite_retryable_error(self): test_job.perform() self.assertEqual(test_job.retry, 1) + def test_retryable_error_with_new_dependency(self): + job = Job(self.env["test.queue.job"].job_with_retry_and_new_dependency) + job.store() + with self.assertRaises(RetryableJobError): + job.perform() + job.store() + self.assertEqual(job.retry, 1) + self.assertEqual(len(job.depends_on), 1, "There's a new dependency for the job") + new_job = next(iter(job.depends_on)) + self.assertEqual( + len(new_job.reverse_depends_on), + 1, + "There's a reverse dependency for the new job", + ) + self.assertEqual( + next(iter(new_job.reverse_depends_on)), + job, + "They are bi-directionally linked", + ) + self.assertEqual(job.state, "wait_dependencies") + self.assertEqual(new_job.state, "pending") + job.store() + new_job.store() + # Now run the dependency + new_job.perform() + new_job.set_done() + new_job.store() + self.env.flush_all() + new_job.enqueue_waiting() + # Force a reload of the job from the db state + job = Job.load(self.env, job.uuid) + self.assertEqual(job.state, "pending") + job.perform() + def test_on_instance_method(self): class A: def method(self):