Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 1 addition & 28 deletions queue_job/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,7 @@ Configuration
.. [1] It works with the threaded Odoo server too, although this way
of running Odoo is obviously not for production purposes.

* Be sure to check out *Jobs Garbage Collector* CRON and change *enqueued_delta* and *started_delta* parameters to your needs.

* ``enqueued_delta``: Spent time in minutes after which an enqueued job is considered stuck.
Set it to 0 to disable this check.
* ``started_delta``: Spent time in minutes after which a started job is considered stuck.
This parameter should not be less than ``--limit-time-real // 60`` parameter in your configuration.
Set it to 0 to disable this check. Set it to -1 to automate it, based in the server's ``--limit-time-real`` config parameter.

.. code-block:: python

# `model` corresponds to 'queue.job' model
model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1)
* Jobs that remain in `enqueued` or `started` state (because, for instance, their worker has been killed) will be automatically re-queued.

Usage
=====
Expand Down Expand Up @@ -595,22 +584,6 @@ Known issues / Roadmap
* After creating a new database or installing ``queue_job`` on an
existing database, Odoo must be restarted for the runner to detect it.

* When Odoo shuts down normally, it waits for running jobs to finish.
However, when the Odoo server crashes or is otherwise force-stopped,
running jobs are interrupted while the runner has no chance to know
they have been aborted. In such situations, jobs may remain in
``started`` or ``enqueued`` state after the Odoo server is halted.
Since the runner has no way to know if they are actually running or
not, and does not know for sure if it is safe to restart the jobs,
it does not attempt to restart them automatically. Such stale jobs
therefore fill the running queue and prevent other jobs to start.
You must therefore requeue them manually, either from the Jobs view,
or by running the following SQL statement *before starting Odoo*:

.. code-block:: sql

update queue_job set state='pending' where state in ('started', 'enqueued')

Changelog
=========

Expand Down
2 changes: 1 addition & 1 deletion queue_job/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{
"name": "Job Queue",
"version": "15.0.2.3.12",
"version": "15.0.2.3.13",
"author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)",
"website": "https://github.com/OCA/queue",
"license": "LGPL-3",
Expand Down
2 changes: 2 additions & 0 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def _try_perform_job(self, env, job):
job.set_started()
job.store()
env.cr.commit()
job.lock()

_logger.debug("%s started", job)

job.perform()
Expand Down
11 changes: 0 additions & 11 deletions queue_job/data/queue_data.xml
Original file line number Diff line number Diff line change
@@ -1,17 +1,6 @@
<?xml version="1.0" encoding="utf-8" ?>
<odoo>
<data noupdate="1">
<record id="ir_cron_queue_job_garbage_collector" model="ir.cron">
<field name="name">Jobs Garbage Collector</field>
<field name="interval_number">5</field>
<field name="interval_type">minutes</field>
<field name="numbercall">-1</field>
<field ref="model_queue_job" name="model_id" />
<field name="state">code</field>
<field
name="code"
>model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1)</field>
</record>
<!-- Queue-job-related subtypes for messaging / Chatter -->
<record id="mt_job_failed" model="mail.message.subtype">
<field name="name">Job failed</field>
Expand Down
56 changes: 56 additions & 0 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,61 @@ def load_many(cls, env, job_uuids):
recordset = cls.db_records_from_uuids(env, job_uuids)
return {cls._load_from_db_record(record) for record in recordset}

def add_lock_record(self):
"""
Create row in db to be locked while the job is being performed.
"""
self.env.cr.execute(
"""
INSERT INTO
queue_job_lock (id, queue_job_id)
SELECT
id, id
FROM
queue_job
WHERE
uuid = %s
ON CONFLICT(id)
DO NOTHING;
""",
[self.uuid],
)

def lock(self):
"""
Lock row of job that is being performed

If a job cannot be locked,
it means that the job wasn't started,
a RetryableJobError is thrown.
"""
self.env.cr.execute(
"""
SELECT
*
FROM
queue_job_lock
WHERE
queue_job_id in (
SELECT
id
FROM
queue_job
WHERE
uuid = %s
AND state='started'
)
FOR UPDATE;
""",
[self.uuid],
)

# 1 job should be locked
if 1 != len(self.env.cr.fetchall()):
raise RetryableJobError(
f"Trying to lock job that wasn't started, uuid: {self.uuid}"
)

@classmethod
def _load_from_db_record(cls, job_db_record):
stored = job_db_record
Expand Down Expand Up @@ -819,6 +874,7 @@ def set_started(self):
self.state = STARTED
self.date_started = datetime.now()
self.worker_pid = os.getpid()
self.add_lock_record()

def set_done(self, result=None):
self.state = DONE
Expand Down
166 changes: 125 additions & 41 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,22 +114,6 @@
* After creating a new database or installing queue_job on an
existing database, Odoo must be restarted for the runner to detect it.

* When Odoo shuts down normally, it waits for running jobs to finish.
However, when the Odoo server crashes or is otherwise force-stopped,
running jobs are interrupted while the runner has no chance to know
they have been aborted. In such situations, jobs may remain in
``started`` or ``enqueued`` state after the Odoo server is halted.
Since the runner has no way to know if they are actually running or
not, and does not know for sure if it is safe to restart the jobs,
it does not attempt to restart them automatically. Such stale jobs
therefore fill the running queue and prevent other jobs to start.
You must therefore requeue them manually, either from the Jobs view,
or by running the following SQL statement *before starting Odoo*:

.. code-block:: sql

update queue_job set state='pending' where state in ('started', 'enqueued')

.. rubric:: Footnotes

.. [1] From a security standpoint, it is safe to have an anonymous HTTP
Expand Down Expand Up @@ -207,35 +191,14 @@ def _connection_info_for(db_name):


def _async_http_get(scheme, host, port, user, password, db_name, job_uuid):
# Method to set failed job (due to timeout, etc) as pending,
# to avoid keeping it as enqueued.
def set_job_pending():
connection_info = _connection_info_for(db_name)
conn = psycopg2.connect(**connection_info)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with closing(conn.cursor()) as cr:
cr.execute(
"UPDATE queue_job SET state=%s, "
"date_enqueued=NULL, date_started=NULL "
"WHERE uuid=%s and state=%s "
"RETURNING uuid",
(PENDING, job_uuid, ENQUEUED),
)
if cr.fetchone():
_logger.warning(
"state of job %s was reset from %s to %s",
job_uuid,
ENQUEUED,
PENDING,
)

# TODO: better way to HTTP GET asynchronously (grequest, ...)?
# if this was python3 I would be doing this with
# asyncio, aiohttp and aiopg
def urlopen():
url = "{}://{}:{}/queue_job/runjob?db={}&job_uuid={}".format(
scheme, host, port, db_name, job_uuid
)
# pylint: disable=except-pass
try:
auth = None
if user:
Expand All @@ -249,10 +212,10 @@ def urlopen():
# for codes between 500 and 600
response.raise_for_status()
except requests.Timeout:
set_job_pending()
# A timeout is a normal behaviour, it shouldn't be logged as an exception
pass
except Exception:
_logger.exception("exception in GET %s", url)
set_job_pending()

thread = threading.Thread(target=urlopen)
thread.daemon = True
Expand Down Expand Up @@ -333,6 +296,13 @@ def keep_alive(self):
with closing(self.conn.cursor()) as cr:
cr.execute(query)

def set_job_pending(self, uuid):
with closing(self.conn.cursor()) as cr:
cr.execute(
"UPDATE queue_job SET state=%s, " "date_enqueued=NULL " "WHERE uuid=%s",
(PENDING, uuid),
)

def set_job_enqueued(self, uuid):
with closing(self.conn.cursor()) as cr:
cr.execute(
Expand All @@ -343,8 +313,104 @@ def set_job_enqueued(self, uuid):
(ENQUEUED, uuid),
)

def _query_requeue_dead_jobs(self):
return """
UPDATE
queue_job
SET
state=(
CASE
WHEN
max_retries IS NOT NULL AND
retry IS NOT NULL AND
retry>max_retries
THEN 'failed'
ELSE 'pending'
END),
retry=(CASE WHEN state='started' THEN COALESCE(retry,0)+1 ELSE retry END),
exc_name=(
CASE
WHEN
max_retries IS NOT NULL AND
retry IS NOT NULL AND
retry>max_retries
THEN 'JobFoundDead'
ELSE exc_name
END),
exc_info=(
CASE
WHEN
max_retries IS NOT NULL AND
retry IS NOT NULL AND
retry>max_retries
THEN 'Job found dead after too many retries'
ELSE exc_info
END)
WHERE
id in (
SELECT
queue_job_id
FROM
queue_job_lock
WHERE
queue_job_id in (
SELECT
id
FROM
queue_job
WHERE
state IN ('enqueued','started')
AND date_enqueued <
(now() AT TIME ZONE 'utc' - INTERVAL '10 sec')
)
FOR UPDATE SKIP LOCKED
)
OR
id in (
SELECT
id
FROM
queue_job
WHERE
state = 'started' AND NOT EXISTS (
SELECT 1 FROM queue_job_lock
WHERE queue_job_id = queue_job.id
)
FOR UPDATE SKIP LOCKED
)
RETURNING uuid
"""

class QueueJobRunner(object):
def requeue_dead_jobs(self):
"""
Set started and enqueued jobs but not locked to pending

A job is locked when it's being executed
When a job is killed, it releases the lock

If the number of retries exceeds the number of max retries,
the job is set as 'failed' with the error 'JobFoundDead'.

Adding a buffer on 'date_enqueued' to check
that it has been enqueued for more than 10sec.
This prevents from requeuing jobs before they are actually started.

When Odoo shuts down normally, it waits for running jobs to finish.
However, when the Odoo server crashes or is otherwise force-stopped,
running jobs are interrupted while the runner has no chance to know
they have been aborted.
"""

with closing(self.conn.cursor()) as cr:
query = self._query_requeue_dead_jobs()

cr.execute(query)

for (uuid,) in cr.fetchall():
_logger.warning("Re-queued dead job with uuid: %s", uuid)


class QueueJobRunner:
def __init__(
self,
scheme="http",
Expand Down Expand Up @@ -421,9 +487,26 @@ def initialize_databases(self):
self.db_by_name[db_name] = db
with db.select_jobs("state in %s", (NOT_DONE,)) as cr:
for job_data in cr:
# In case we have enqueued jobs we move them to pending,
# otherwise they remain enqueued and occupy channels slots.
if job_data[6] == "enqueued":
try:
self.db_by_name[db_name].set_job_pending(job_data[1])
job_data = (*job_data[:6], "pending")
except Exception:
_logger.warning(
"error setting job %s to pending",
job_data[1],
exc_info=True,
)
self.channel_manager.notify(db_name, *job_data)
_logger.info("queue job runner ready for db %s", db_name)

def requeue_dead_jobs(self):
for db in self.db_by_name.values():
if db.has_queue_job:
db.requeue_dead_jobs()

def run_jobs(self):
now = _odoo_now()
for job in self.channel_manager.get_jobs_to_run(now):
Expand Down Expand Up @@ -516,6 +599,7 @@ def run(self):
_logger.info("database connections ready")
# inner loop does the normal processing
while not self._stop:
self.requeue_dead_jobs()
self.process_notifications()
self.run_jobs()
self.wait_notification()
Expand Down
22 changes: 22 additions & 0 deletions queue_job/migrations/15.0.2.3.13/pre-migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)


def migrate(cr, version):
# Deactivate cron garbage collector
cr.execute(
"""
UPDATE
ir_cron
SET
active=False
WHERE id IN (
SELECT res_id
FROM
ir_model_data
WHERE
module='queue_job'
AND model='ir.cron'
AND name='ir_cron_queue_job_garbage_collector'
);
"""
)
Loading