diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index b1cee329f8..1f57d08cf0 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -381,7 +381,15 @@ class Channel: without risking to overflow the system. """ - def __init__(self, name, parent, capacity=None, sequential=False, throttle=0): + def __init__( + self, + name, + parent, + capacity=None, + sequential=False, + throttle=0, + subcapacity=None, + ): self.name = name self.parent = parent if self.parent: @@ -391,9 +399,10 @@ def __init__(self, name, parent, capacity=None, sequential=False, throttle=0): self._running = set() self._failed = set() self._pause_until = 0 # utc seconds since the epoch - self.capacity = capacity + self.capacity = capacity or (parent and parent.subcapacity) self.throttle = throttle # seconds self.sequential = sequential + self.subcapacity = subcapacity @property def sequential(self): @@ -410,11 +419,13 @@ def configure(self, config): * capacity * sequential + * subcapacity * throttle """ assert self.fullname.endswith(config["name"]) self.capacity = config.get("capacity", None) self.sequential = bool(config.get("sequential", False)) + self.subcapacity = config.get("subcapacity", None) self.throttle = int(config.get("throttle", 0)) if self.sequential and self.capacity != 1: raise ValueError("A sequential channel must have a capacity of 1") @@ -874,7 +885,16 @@ def parse_simple_config(cls, config_string): f"Invalid channel config {config_string}: " f"duplicate key {k}" ) - config[k] = v + if k == "subcapacity": + try: + config[k] = int(v) + except Exception as ex: + raise ValueError( + f"Invalid channel config {config_string}: " + f"invalid subcapacity {v}" + ) from ex + else: + config[k] = v else: config["capacity"] = 1 res.append(config) diff --git a/queue_job/tests/test_runner_channels.py b/queue_job/tests/test_runner_channels.py index d323d00683..805dcb6920 100644 --- a/queue_job/tests/test_runner_channels.py +++ b/queue_job/tests/test_runner_channels.py @@ -1,6 +1,8 @@ # Copyright 2015-2016 Camptocamp SA # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) +from odoo.tests import BaseCase + # pylint: disable=odoo-addons-relative-import # we are testing, we want to test as we were an external consumer of the API from odoo.addons.queue_job.jobrunner import channels @@ -8,3 +10,34 @@ from .common import load_doctests load_tests = load_doctests(channels) + + +class TestChannelManager(BaseCase): + def test_subcapacity_default(self): + cm = channels.ChannelManager() + cm.simple_configure("root:4") + root = cm.get_channel_by_name("root") + self.assertEqual(root.capacity, 4) + self.assertIsNone(root.subcapacity) + child = cm.get_channel_by_name("child", autocreate=True) + self.assertIs(child.capacity, None) + self.assertIsNone(child.subcapacity) + + def test_subcapacity(self): + cm = channels.ChannelManager() + cm.simple_configure("root:4:subcapacity=1,override:2") + root = cm.get_channel_by_name("root") + self.assertEqual(root.subcapacity, 1) + child = cm.get_channel_by_name("override") + self.assertEqual(child.capacity, 2) + self.assertIsNone(child.subcapacity) + child = cm.get_channel_by_name("child", autocreate=True) + self.assertEqual(child.capacity, 1) + self.assertIsNone(child.subcapacity) + + def test_subcapacity_subchannel(self): + cm = channels.ChannelManager() + cm.simple_configure("root:4,sub:2:subcapacity=1") + child = cm.get_channel_by_name("sub.child", autocreate=True) + self.assertEqual(child.capacity, 1) + self.assertIsNone(child.subcapacity)