diff --git a/broker/broker.py b/broker/broker.py index 569410a2..2171f00a 100644 --- a/broker/broker.py +++ b/broker/broker.py @@ -1,3 +1,4 @@ +from contextlib import contextmanager from logzero import logger from broker.providers import PROVIDERS, PROVIDER_ACTIONS, _provider_imports from broker.hosts import Host @@ -22,6 +23,7 @@ class mp_decorator: The decorated method is expected to return an itearable. """ + MAX_WORKERS = None """ If set to integer, the count of workers will be limited to that amount. If set to None, the max workers count of the EXECUTOR will match the count of items.""" @@ -211,7 +213,11 @@ def checkin(self, sequential=False, host=None, in_context=False): if not isinstance(hosts, list): hosts = [hosts] if in_context: - hosts = [host for host in hosts if not getattr(host, '_skip_context_checkin', False)] + hosts = [ + host + for host in hosts + if not getattr(host, "_skip_context_checkin", False) + ] if not hosts: logger.debug("Checkin called with no hosts, taking no action") return @@ -268,9 +274,7 @@ def extend(self, sequential=False, host=None): logger.debug("Extend called with no hosts, taking no action") return - with ThreadPoolExecutor( - max_workers=1 if sequential else len(hosts) - ) as workers: + with ThreadPoolExecutor(max_workers=1 if sequential else len(hosts)) as workers: completed_extends = as_completed( workers.submit(self._extend, _host) for _host in hosts ) @@ -323,6 +327,65 @@ def from_inventory(self, filter=None): inv_hosts = helpers.load_inventory(filter=filter) return [self.reconstruct_host(inv_host) for inv_host in inv_hosts] + @classmethod + @contextmanager + def multi_manager(cls, **multi_dict): + """Given a mapping of names to Broker argument dictionaries: + create multiple Broker instances, check them out in parallel, yield, then checkin. + + Example: + with Broker.multi_mode( + rhel7={ + "host_class": ContentHost, + "workflow": "deploy_base_rhel", + "deploy_rhel_version": "7", + }, + rhel8={ + "host_class": ContentHost, + "workflow": "deploy_base_rhel", + "deploy_rhel_version": "8", + } + ) as host_dict: + pass + + All are checked out at the same time. The user is presented with the hosts in + a dictionary by argument name e.g. host_dict["rhel7"] is a ContentHost object + """ + # create all the broker instances and perform checkouts in parallel + broker_instances = {name: cls(**kwargs) for name, kwargs in multi_dict.items()} + with ThreadPoolExecutor(max_workers=len(broker_instances)) as workers: + completed_checkouts = as_completed( + workers.submit(broker.checkout) for broker in broker_instances.values() + ) + for completed in completed_checkouts: + completed.result() + all_hosts = [] + for broker_inst in broker_instances.values(): + all_hosts.extend(broker_inst._hosts) + # run setup on all hosts in parallel + with ThreadPoolExecutor(max_workers=len(all_hosts)) as workers: + completed_setups = as_completed( + workers.submit(host.setup) for host in all_hosts + ) + for completed in completed_setups: + completed.result() + # yield control to the user + yield {name: broker._hosts for name, broker in broker_instances.items()} + # teardown all hosts in parallel + with ThreadPoolExecutor(max_workers=len(all_hosts)) as workers: + completed_teardowns = as_completed( + workers.submit(host.teardown) for host in all_hosts + ) + for completed in completed_teardowns: + completed.result() + # checkin all hosts in parallel + with ThreadPoolExecutor(max_workers=len(broker_instances)) as workers: + completed_checkins = as_completed( + workers.submit(broker.checkin) for broker in broker_instances.values() + ) + for completed in completed_checkins: + completed.result() + def __repr__(self): inner = ", ".join( f"{k}={v}" diff --git a/tests/functional/test_containers.py b/tests/functional/test_containers.py index f39f673a..4d6e49c9 100644 --- a/tests/functional/test_containers.py +++ b/tests/functional/test_containers.py @@ -32,6 +32,7 @@ def temp_inventory(): # ----- CLI Scenario Tests ----- + @pytest.mark.parametrize( "args_file", [f for f in SCENARIO_DIR.iterdir() if f.name.startswith("checkout_")] ) @@ -67,9 +68,10 @@ def test_containerhost_query(): # ----- Broker API Tests ----- + def test_container_e2e(): with Broker(container_host="ubi8:latest") as c_host: - assert c_host._cont_inst.top()['Processes'] + assert c_host._cont_inst.top()["Processes"] res = c_host.execute("hostname") assert res.stdout.strip() == c_host.hostname loc_settings_path = Path("broker_settings.yaml") @@ -78,7 +80,9 @@ def test_container_e2e(): res = c_host.execute(f"ls {remote_dir}") assert str(loc_settings_path) in res.stdout with NamedTemporaryFile() as tmp: - c_host.session.sftp_read(f"{remote_dir}/{loc_settings_path.name}", tmp.file.name) + c_host.session.sftp_read( + f"{remote_dir}/{loc_settings_path.name}", tmp.file.name + ) data = c_host.session.sftp_read( f"{remote_dir}/{loc_settings_path.name}", return_data=True ) @@ -88,20 +92,22 @@ def test_container_e2e(): assert ( loc_settings_path.read_bytes() == data ), "Local file is different from the received one (return_data=True)" - assert data == Path(tmp.file.name).read_bytes(), "Received files do not match" + assert ( + data == Path(tmp.file.name).read_bytes() + ), "Received files do not match" # test the tail_file context manager tailed_file = f"{remote_dir}/tail_me.txt" c_host.execute(f"echo 'hello world' > {tailed_file}") with c_host.session.tail_file(tailed_file) as tf: c_host.execute(f"echo 'this is a new line' >> {tailed_file}") - assert 'this is a new line' in tf.stdout - assert 'hello world' not in tf.stdout + assert "this is a new line" in tf.stdout + assert "hello world" not in tf.stdout def test_container_e2e_mp(): with Broker(container_host="ubi8:latest", _count=7) as c_hosts: for c_host in c_hosts: - assert c_host._cont_inst.top()['Processes'] + assert c_host._cont_inst.top()["Processes"] res = c_host.execute("hostname") assert res.stdout.strip() == c_host.hostname loc_settings_path = Path("broker_settings.yaml") @@ -110,7 +116,9 @@ def test_container_e2e_mp(): res = c_host.execute(f"ls {remote_dir}") assert str(loc_settings_path) in res.stdout with NamedTemporaryFile() as tmp: - c_host.session.sftp_read(f"{remote_dir}/{loc_settings_path.name}", tmp.file.name) + c_host.session.sftp_read( + f"{remote_dir}/{loc_settings_path.name}", tmp.file.name + ) data = c_host.session.sftp_read( f"{remote_dir}/{loc_settings_path.name}", return_data=True ) @@ -120,4 +128,21 @@ def test_container_e2e_mp(): assert ( loc_settings_path.read_bytes() == data ), "Local file is different from the received one (return_data=True)" - assert data == Path(tmp.file.name).read_bytes(), "Received files do not match" + assert ( + data == Path(tmp.file.name).read_bytes() + ), "Received files do not match" + + +def test_broker_multi_manager(): + with Broker.multi_manager( + ubi7={"container_host": "ubi7:latest"}, + ubi8={"container_host": "ubi8:latest", "_count": 2}, + ubi9={"container_host": "ubi9:latest"}, + ) as multi_hosts: + assert "ubi7" in multi_hosts and "ubi8" in multi_hosts and "ubi9" in multi_hosts + assert len(multi_hosts["ubi8"]) == 2 + assert multi_hosts["ubi7"][0]._cont_inst.top()["Processes"] + assert ( + multi_hosts["ubi8"][1].execute("hostname").stdout.strip() + == multi_hosts["ubi8"][1].hostname + ) diff --git a/tests/test_broker.py b/tests/test_broker.py index 8a91ba85..899f814e 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -1,27 +1,25 @@ -from broker import broker, helpers +from broker import broker, Broker, helpers from broker.providers import test_provider import pytest def test_empty_init(): """Broker should be able to init without any arguments""" - broker_inst = broker.Broker() - assert isinstance(broker_inst, broker.Broker) + broker_inst = Broker() + assert isinstance(broker_inst, Broker) def test_kwarg_assignment(): """Broker should copy all kwargs into its _kwargs attribute""" broker_kwargs = {"test": "value", "another": 17} - broker_inst = broker.Broker(**broker_kwargs) + broker_inst = Broker(**broker_kwargs) assert broker_inst._kwargs == broker_kwargs def test_full_init(): """Make sure all init checks and assignments work""" broker_hosts = ["test1.example.com", "test2.example.com", "test3.example.com"] - broker_inst = broker.Broker( - hosts=broker_hosts, test_action="blank", nick="test_nick" - ) + broker_inst = Broker(hosts=broker_hosts, test_action="blank", nick="test_nick") assert broker_inst._hosts == broker_hosts assert not broker_inst._kwargs.get("hosts") assert broker_inst._provider_actions == { @@ -33,7 +31,7 @@ def test_full_init(): def test_broker_e2e(): """Run through the base functionality of broker""" - broker_inst = broker.Broker(nick="test_nick") + broker_inst = Broker(nick="test_nick") host_checkout = broker_inst.checkout() assert len(broker_inst._hosts) == 1 broker_host = broker_inst._hosts[0] @@ -47,7 +45,7 @@ def test_broker_e2e(): def test_broker_empty_checkin(): """Try to checkin with no hosts on the instance""" - broker_inst = broker.Broker(nick="test_nick") + broker_inst = Broker(nick="test_nick") assert not broker_inst._hosts broker_inst.checkin() @@ -80,7 +78,7 @@ def test_mp_checkout(): # https://github.com/SatelliteQE/broker/pull/53 # With count like this, I've got reproducibility probability # arround 0.5 - broker_inst = broker.Broker(nick="test_nick", _count=VM_COUNT) + broker_inst = Broker(nick="test_nick", _count=VM_COUNT) broker_inst.checkout() assert len(broker_inst._hosts) == VM_COUNT broker_inst.checkin() @@ -88,7 +86,7 @@ def test_mp_checkout(): def test_mp_checkout_twice(): - broker_inst = broker.Broker(nick="test_nick", _count=2) + broker_inst = Broker(nick="test_nick", _count=2) def cycle(): assert len(broker_inst.checkout()) == 2 @@ -101,6 +99,20 @@ def cycle(): cycle() +def test_multi_manager(): + """Test that we get the proper data structure and names as expected + when using Broker.multi_manager. + """ + with Broker.multi_manager( + test_1={"nick": "test_nick"}, test_2={"nick": "test_nick", "_count": 2} + ) as host_dict: + assert "test_1" in host_dict and "test_2" in host_dict + assert len(host_dict["test_1"]) == 1 + assert len(host_dict["test_2"]) == 2 + assert host_dict["test_1"][0].hostname == "test.host.example.com" + assert host_dict["test_2"][1].hostname == "test.host.example.com" + + class SomeException(Exception): pass