Skip to content

Commit 70b10c4

Browse files
committed
[IMP] queue_job_cron_jobrunner: channel
1 parent 813ad66 commit 70b10c4

File tree

1 file changed

+62
-18
lines changed

1 file changed

+62
-18
lines changed

queue_job_cron_jobrunner/models/queue_job.py

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
RetryableJobError,
2020
)
2121
from odoo.addons.queue_job.job import Job
22+
from odoo.addons.queue_job.jobrunner import QueueJobRunner
2223

2324
_logger = logging.getLogger(__name__)
2425

@@ -32,42 +33,77 @@ def _acquire_one_job(self):
3233
3334
:returns: queue.job record (locked for update)
3435
"""
35-
# TODO: This method should respect channel priority and capacity,
36-
# rather than just fetching them by creation date.
37-
self.flush()
36+
runner = QueueJobRunner.from_environ_or_config()
3837
self.env.cr.execute(
3938
"""
4039
SELECT id
4140
FROM queue_job
4241
WHERE state = 'pending'
4342
AND (eta IS NULL OR eta <= (now() AT TIME ZONE 'UTC'))
4443
ORDER BY priority, date_created
45-
LIMIT 1 FOR NO KEY UPDATE SKIP LOCKED
44+
FOR NO KEY UPDATE
4645
"""
4746
)
48-
row = self.env.cr.fetchone()
49-
return self.browse(row and row[0])
47+
rows = self.env.cr.fetchall()
48+
49+
channels = {}
50+
for queue_job in self.search([("state", "=", "started")]):
51+
if not queue_job.channel:
52+
continue
53+
channels[queue_job.channel] = channels.get(queue_job.channel, 0) + 1
54+
channels_without_capacity = set()
55+
for channel_str, running in channels.items():
56+
channel = runner.channel_manager.get_channel_by_name(channel_str)
57+
if channel.capacity <= running:
58+
channels_without_capacity.add(channel_str)
59+
_logger.info(
60+
"_acquire_one_job channels_without_capacity %s",
61+
channels_without_capacity,
62+
)
63+
64+
result = self.browse()
65+
for row in rows:
66+
if not (row and row[0]):
67+
continue
68+
queue_job = self.browse(row[0])
69+
if queue_job.channel and queue_job.channel in channels_without_capacity:
70+
continue
71+
job = Job._load_from_db_record(queue_job)
72+
job.set_started()
73+
job.store()
74+
_logger.info(
75+
"_acquire_one_job queue.job %s[channel=%s,uuid=%s] started",
76+
row[0],
77+
job.channel,
78+
job.uuid,
79+
)
80+
result = queue_job
81+
break
82+
self.flush()
83+
self.env.cr.commit() # pylint: disable=E8102
84+
return result
5085

5186
def _process(self, commit=False):
5287
"""Process the job"""
5388
self.ensure_one()
5489
job = Job._load_from_db_record(self)
55-
# Set it as started
56-
job.set_started()
57-
job.store()
58-
_logger.debug("%s started", job.uuid)
59-
# TODO: Commit the state change so that the state can be read from the UI
60-
# while the job is processing. However, doing this will release the
61-
# lock on the db, so we need to find another way.
62-
# if commit:
63-
# self.flush()
64-
# self.env.cr.commit()
65-
6690
# Actual processing
6791
try:
6892
try:
6993
with self.env.cr.savepoint():
94+
_logger.info(
95+
"perform %s[channel=%s,uuid=%s]",
96+
self.id,
97+
self.channel,
98+
self.uuid,
99+
)
70100
job.perform()
101+
_logger.info(
102+
"performed %s[channel=%s,uuid=%s]",
103+
self.id,
104+
self.channel,
105+
self.uuid,
106+
)
71107
job.set_done()
72108
job.store()
73109
except OperationalError as err:
@@ -87,20 +123,28 @@ def _process(self, commit=False):
87123
msg = _("Job interrupted and set to Done: nothing to do.")
88124
job.set_done(msg)
89125
job.store()
126+
_logger.info(
127+
"interrupted %s[channel=%s,uuid=%s]", self.id, self.channel, self.uuid
128+
)
90129

91130
except RetryableJobError as err:
92131
# delay the job later, requeue
93132
job.postpone(result=str(err), seconds=5)
94133
job.set_pending(reset_retry=False)
95134
job.store()
96-
_logger.debug("%s postponed", job)
135+
_logger.info(
136+
"postponed %s[channel=%s,uuid=%s]", self.id, self.channel, self.uuid
137+
)
97138

98139
except (FailedJobError, Exception):
99140
with StringIO() as buff:
100141
traceback.print_exc(file=buff)
101142
_logger.error(buff.getvalue())
102143
job.set_failed(exc_info=buff.getvalue())
103144
job.store()
145+
_logger.info(
146+
"failed %s[channel=%s,uuid=%s]", self.id, self.channel, self.uuid
147+
)
104148

105149
if commit: # pragma: no cover
106150
self.env["base"].flush()

0 commit comments

Comments
 (0)