19
19
RetryableJobError ,
20
20
)
21
21
from odoo .addons .queue_job .job import Job
22
+ from odoo .addons .queue_job .jobrunner import QueueJobRunner
22
23
23
24
_logger = logging .getLogger (__name__ )
24
25
@@ -32,42 +33,68 @@ def _acquire_one_job(self):
32
33
33
34
:returns: queue.job record (locked for update)
34
35
"""
35
- # TODO: This method should respect channel priority and capacity,
36
- # rather than just fetching them by creation date.
37
- self . flush ()
36
+ runner = QueueJobRunner . from_environ_or_config ()
37
+ job_id = False
38
+ # Lock all pending jobs
38
39
self .env .cr .execute (
39
40
"""
40
41
SELECT id
41
42
FROM queue_job
42
43
WHERE state = 'pending'
43
44
AND (eta IS NULL OR eta <= (now() AT TIME ZONE 'UTC'))
44
45
ORDER BY priority, date_created
45
- LIMIT 1 FOR NO KEY UPDATE SKIP LOCKED
46
+ FOR NO KEY UPDATE
46
47
"""
47
48
)
48
- row = self .env .cr .fetchone ()
49
- return self .browse (row and row [0 ])
49
+ rows = self .env .cr .fetchall ()
50
+
51
+ channels = {}
52
+ for queue_job in self .search ([("state" , "=" , "started" )]):
53
+ if not queue_job .channel :
54
+ continue
55
+ channels [queue_job .channel ] = channels .get (queue_job .channel , 0 ) + 1
56
+ channels_without_capacity = set ()
57
+ for channel_str , running in channels .items ():
58
+ channel = runner .channel_manager .get_channel_by_name (channel_str )
59
+ if channel .capacity <= running :
60
+ channels_without_capacity .add (channel_str )
61
+ _logger .info (
62
+ "_acquire_one_job channels_without_capacity %s" ,
63
+ channels_without_capacity ,
64
+ )
65
+
66
+ for row in rows :
67
+ if not (row and row [0 ]):
68
+ continue
69
+ queue_job = self .browse (row [0 ])
70
+ if queue_job .channel and queue_job .channel in channels_without_capacity :
71
+ continue
72
+ job = Job ._load_from_db_record (queue_job )
73
+ job .set_started ()
74
+ job .store ()
75
+ _logger .info (
76
+ "_acquire_one_job queue.job %s[channel=%s,uuid=%s] started" ,
77
+ row [0 ],
78
+ job .channel ,
79
+ job .uuid ,
80
+ )
81
+ job_id = row [0 ]
82
+ break
83
+ self .flush ()
84
+ self .env .cr .commit () # pylint: disable=E8102
85
+ return self .browse (job_id )
50
86
51
87
def _process (self , commit = False ):
52
88
"""Process the job"""
53
89
self .ensure_one ()
54
90
job = Job ._load_from_db_record (self )
55
- # Set it as started
56
- job .set_started ()
57
- job .store ()
58
- _logger .debug ("%s started" , job .uuid )
59
- # TODO: Commit the state change so that the state can be read from the UI
60
- # while the job is processing. However, doing this will release the
61
- # lock on the db, so we need to find another way.
62
- # if commit:
63
- # self.flush()
64
- # self.env.cr.commit()
65
-
66
91
# Actual processing
67
92
try :
68
93
try :
69
94
with self .env .cr .savepoint ():
95
+ _logger .debug ("perform %s" , job .uuid )
70
96
job .perform ()
97
+ _logger .debug ("performed %s" , job .uuid )
71
98
job .set_done ()
72
99
job .store ()
73
100
except OperationalError as err :
0 commit comments