Skip to content

Commit 22a8e69

Browse files
committed
[IMP] queue_job_cron_jobrunner: requeue lost started jobs
1 parent 3211ed3 commit 22a8e69

File tree

2 files changed

+37
-0
lines changed

2 files changed

+37
-0
lines changed

queue_job_cron_jobrunner/models/queue_job.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from datetime import datetime
88
from io import StringIO
99

10+
import psutil
1011
from psycopg2 import OperationalError
1112

1213
from odoo import _, api, fields, models, tools
@@ -161,6 +162,7 @@ def _process(self, commit=False):
161162
@api.model
162163
def _job_runner(self, commit=True):
163164
"""Short-lived job runner, triggered by async crons"""
165+
self._release_started_jobs(commit=commit)
164166
job = self._acquire_one_job(commit=commit)
165167
while job:
166168
job._process(commit=commit)
@@ -214,6 +216,24 @@ def _ensure_cron_trigger(self):
214216
if delayed_etas:
215217
self._cron_trigger(at=list(delayed_etas))
216218

219+
@api.model
220+
def _release_started_jobs(self, commit=False):
221+
pids = [x.pid for x in psutil.process_iter()]
222+
for record in self.search(
223+
[("state", "=", "started"), ("worker_pid", "not in", pids)]
224+
):
225+
job = Job._load_from_db_record(record)
226+
job.set_pending()
227+
job.store()
228+
_logger.info(
229+
"release started job %s[channel=%s,uuid=%s]",
230+
record.id,
231+
record.channel,
232+
record.uuid,
233+
)
234+
if commit: # pragma: no cover
235+
self.env.cr.commit() # pylint: disable=invalid-commit
236+
217237
@api.model_create_multi
218238
def create(self, vals_list):
219239
# When jobs are created, also create the cron trigger

queue_job_cron_jobrunner/tests/test_queue_job.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,3 +174,20 @@ def test_queue_job_creation_create_change_next_call(self):
174174
self.cron.nextcall = datetime(2021, 1, 21, 21, 21, 21)
175175
self.env["res.partner"].with_delay().create({"name": "test"})
176176
self.assertNotEqual(self.cron.nextcall, datetime(2022, 2, 22, 22, 22, 22))
177+
178+
def test_release_started_jobs(self):
179+
job_known_pid = self.env["res.partner"].with_delay().create({"name": "test"})
180+
job_known_pid.set_started()
181+
job_known_pid.store()
182+
known_pid = job_known_pid.db_record().worker_pid
183+
job_unknown_pid = self.env["res.partner"].with_delay().create({"name": "test"})
184+
job_unknown_pid.set_started()
185+
job_unknown_pid.store()
186+
job_unknown_pid.db_record().worker_pid = -1
187+
188+
self.env["queue.job"]._release_started_jobs(commit=False)
189+
190+
self.assertEqual(job_unknown_pid.db_record().state, "pending")
191+
self.assertEqual(job_unknown_pid.db_record().worker_pid, 0)
192+
self.assertEqual(job_known_pid.db_record().state, "started")
193+
self.assertEqual(job_known_pid.db_record().worker_pid, known_pid)

0 commit comments

Comments
 (0)