Skip to content

Commit bb5c144

Browse files
committed
Add requeue automatically of previous enqueued jobs
1 parent 82fff58 commit bb5c144

File tree

2 files changed

+68
-1
lines changed

2 files changed

+68
-1
lines changed

queue_job/jobrunner/runner.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@
139139
from odoo.tools import config
140140

141141
from . import queue_job_config
142-
from .channels import ENQUEUED, NOT_DONE, ChannelManager
142+
from .channels import ENQUEUED, NOT_DONE, PENDING, ChannelManager
143143

144144
SELECT_TIMEOUT = 60
145145
ERROR_RECOVERY_DELAY = 5
@@ -296,6 +296,13 @@ def keep_alive(self):
296296
with closing(self.conn.cursor()) as cr:
297297
cr.execute(query)
298298

299+
def set_job_pending(self, uuid):
300+
with closing(self.conn.cursor()) as cr:
301+
cr.execute(
302+
"UPDATE queue_job SET state=%s, " "date_enqueued=NULL " "WHERE uuid=%s",
303+
(PENDING, uuid),
304+
)
305+
299306
def set_job_enqueued(self, uuid):
300307
with closing(self.conn.cursor()) as cr:
301308
cr.execute(
@@ -358,6 +365,19 @@ def _query_requeue_dead_jobs(self):
358365
)
359366
FOR UPDATE SKIP LOCKED
360367
)
368+
OR
369+
id in (
370+
SELECT
371+
id
372+
FROM
373+
queue_job
374+
WHERE
375+
state = 'started' AND NOT EXISTS (
376+
SELECT 1 FROM queue_job_lock
377+
WHERE queue_job_id = queue_job.id
378+
)
379+
FOR UPDATE SKIP LOCKED
380+
)
361381
RETURNING uuid
362382
"""
363383

@@ -467,6 +487,18 @@ def initialize_databases(self):
467487
self.db_by_name[db_name] = db
468488
with db.select_jobs("state in %s", (NOT_DONE,)) as cr:
469489
for job_data in cr:
490+
# In case we have enqueued jobs we move them to pending,
491+
# otherwise they remain enqueued and occupy channels slots.
492+
if job_data[6] == "enqueued":
493+
try:
494+
self.db_by_name[db_name].set_job_pending(job_data[1])
495+
job_data = (*job_data[:6], "pending")
496+
except Exception:
497+
_logger.warning(
498+
"error setting job %s to pending",
499+
job_data[1],
500+
exc_info=True,
501+
)
470502
self.channel_manager.notify(db_name, *job_data)
471503
_logger.info("queue job runner ready for db %s", db_name)
472504

queue_job/tests/test_requeue_dead_job.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,38 @@ def test_requeue_dead_jobs(self):
131131
# because we committed the cursor, the savepoint of the test method is
132132
# gone, and this would break TransactionCase cleanups
133133
self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id)
134+
135+
def test_requeue_dead_jobs_started_before_patch(self):
136+
uuid = "test_requeue_dead_jobs_before_locking"
137+
138+
queue_job = self.create_dummy_job(uuid)
139+
job_obj = Job.load(self.env, queue_job.uuid)
140+
141+
job_obj.set_enqueued()
142+
# simulate enqueuing was in the past
143+
job_obj.date_enqueued = datetime.now() - timedelta(minutes=1)
144+
job_obj.set_started()
145+
# Delete the job lock to simulate job started before new implementation
146+
self.env.cr.execute(
147+
"DELETE FROM queue_job_lock WHERE queue_job_id=%s", (queue_job.id,)
148+
)
149+
150+
job_obj.store()
151+
self.env.cr.commit() # pylint: disable=E8102
152+
153+
# requeue dead jobs using current cursor
154+
query = Database(self.env.cr.dbname)._query_requeue_dead_jobs()
155+
self.env.cr.execute(query)
156+
157+
uuids_requeued = self.env.cr.fetchall()
158+
159+
self.assertEqual(len(uuids_requeued), 1)
160+
self.assertEqual(uuids_requeued[0][0], uuid)
161+
162+
# clean up
163+
queue_job.unlink()
164+
self.env.cr.commit() # pylint: disable=E8102
165+
166+
# because we committed the cursor, the savepoint of the test method is
167+
# gone, and this would break TransactionCase cleanups
168+
self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id)

0 commit comments

Comments
 (0)