Skip to content

Commit ac9fccb

Browse files
committed
[IMP] queue_job_cron_jobrunner: requeue lost started jobs
1 parent fa36c17 commit ac9fccb

File tree

2 files changed

+37
-0
lines changed

2 files changed

+37
-0
lines changed

queue_job_cron_jobrunner/models/queue_job.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from datetime import datetime
99
from io import StringIO
1010

11+
import psutil
1112
from psycopg2 import OperationalError
1213

1314
from odoo import _, api, fields, models, tools
@@ -162,6 +163,7 @@ def _process(self, commit=False):
162163
@api.model
163164
def _job_runner(self, commit=True):
164165
"""Short-lived job runner, triggered by async crons"""
166+
self._release_started_jobs(commit=commit)
165167
job = self._acquire_one_job(commit=commit)
166168
while job:
167169
job._process(commit=commit)
@@ -215,6 +217,24 @@ def _ensure_cron_trigger(self):
215217
if delayed_etas:
216218
self._cron_trigger(at=list(delayed_etas))
217219

220+
@api.model
221+
def _release_started_jobs(self, commit=False):
222+
pids = [x.pid for x in psutil.process_iter()]
223+
for record in self.search(
224+
[("state", "=", "started"), ("worker_pid", "not in", pids)]
225+
):
226+
job = Job._load_from_db_record(record)
227+
job.set_pending()
228+
job.store()
229+
_logger.info(
230+
"release started job %s[channel=%s,uuid=%s]",
231+
record.id,
232+
record.channel,
233+
record.uuid,
234+
)
235+
if commit: # pragma: no cover
236+
self.env.cr.commit() # pylint: disable=invalid-commit
237+
218238
@api.model_create_multi
219239
def create(self, vals_list):
220240
# When jobs are created, also create the cron trigger

queue_job_cron_jobrunner/tests/test_queue_job.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,3 +174,20 @@ def test_queue_job_creation_create_change_next_call(self):
174174
self.cron.nextcall = datetime(2021, 1, 21, 21, 21, 21)
175175
self.env["res.partner"].with_delay().create({"name": "test"})
176176
self.assertNotEqual(self.cron.nextcall, datetime(2022, 2, 22, 22, 22, 22))
177+
178+
def test_release_started_jobs(self):
179+
job_known_pid = self.env["res.partner"].with_delay().create({"name": "test"})
180+
job_known_pid.set_started()
181+
job_known_pid.store()
182+
known_pid = job_known_pid.db_record().worker_pid
183+
job_unknown_pid = self.env["res.partner"].with_delay().create({"name": "test"})
184+
job_unknown_pid.set_started()
185+
job_unknown_pid.store()
186+
job_unknown_pid.db_record().worker_pid = -1
187+
188+
self.env["queue.job"]._release_started_jobs(commit=False)
189+
190+
self.assertEqual(job_unknown_pid.db_record().state, "pending")
191+
self.assertEqual(job_unknown_pid.db_record().worker_pid, 0)
192+
self.assertEqual(job_known_pid.db_record().state, "started")
193+
self.assertEqual(job_known_pid.db_record().worker_pid, known_pid)

0 commit comments

Comments
 (0)