Skip to content

Commit

Permalink
Add Broker.multi_manager
Browse files Browse the repository at this point in the history
This change introduces a way to carry out multiple kinds of checkouts at
the same time when using Broker as a library.
  • Loading branch information
JacobCallahan committed May 25, 2023
1 parent 220609f commit be3e948
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 23 deletions.
71 changes: 67 additions & 4 deletions broker/broker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from contextlib import contextmanager
from logzero import logger
from broker.providers import PROVIDERS, PROVIDER_ACTIONS, _provider_imports
from broker.hosts import Host
Expand All @@ -22,6 +23,7 @@ class mp_decorator:
The decorated method is expected to return an itearable.
"""

MAX_WORKERS = None
""" If set to integer, the count of workers will be limited to that amount.
If set to None, the max workers count of the EXECUTOR will match the count of items."""
Expand Down Expand Up @@ -211,7 +213,11 @@ def checkin(self, sequential=False, host=None, in_context=False):
if not isinstance(hosts, list):
hosts = [hosts]
if in_context:
hosts = [host for host in hosts if not getattr(host, '_skip_context_checkin', False)]
hosts = [
host
for host in hosts
if not getattr(host, "_skip_context_checkin", False)
]
if not hosts:
logger.debug("Checkin called with no hosts, taking no action")
return
Expand Down Expand Up @@ -268,9 +274,7 @@ def extend(self, sequential=False, host=None):
logger.debug("Extend called with no hosts, taking no action")
return

with ThreadPoolExecutor(
max_workers=1 if sequential else len(hosts)
) as workers:
with ThreadPoolExecutor(max_workers=1 if sequential else len(hosts)) as workers:
completed_extends = as_completed(
workers.submit(self._extend, _host) for _host in hosts
)
Expand Down Expand Up @@ -323,6 +327,65 @@ def from_inventory(self, filter=None):
inv_hosts = helpers.load_inventory(filter=filter)
return [self.reconstruct_host(inv_host) for inv_host in inv_hosts]

@classmethod
@contextmanager
def multi_manager(cls, **multi_dict):
"""Given a mapping of names to Broker argument dictionaries:
create multiple Broker instances, check them out in parallel, yield, then checkin.
Example:
with Broker.multi_mode(
rhel7={
"host_class": ContentHost,
"workflow": "deploy_base_rhel",
"deploy_rhel_version": "7",
},
rhel8={
"host_class": ContentHost,
"workflow": "deploy_base_rhel",
"deploy_rhel_version": "8",
}
) as host_dict:
pass
All are checked out at the same time. The user is presented with the hosts in
a dictionary by argument name e.g. host_dict["rhel7"] is a ContentHost object
"""
# create all the broker instances and perform checkouts in parallel
broker_instances = {name: cls(**kwargs) for name, kwargs in multi_dict.items()}
with ThreadPoolExecutor(max_workers=len(broker_instances)) as workers:
completed_checkouts = as_completed(
workers.submit(broker.checkout) for broker in broker_instances.values()
)
for completed in completed_checkouts:
completed.result()
all_hosts = []
for broker_inst in broker_instances.values():
all_hosts.extend(broker_inst._hosts)
# run setup on all hosts in parallel
with ThreadPoolExecutor(max_workers=len(all_hosts)) as workers:
completed_setups = as_completed(
workers.submit(host.setup) for host in all_hosts
)
for completed in completed_setups:
completed.result()
# yield control to the user
yield {name: broker._hosts for name, broker in broker_instances.items()}
# teardown all hosts in parallel
with ThreadPoolExecutor(max_workers=len(all_hosts)) as workers:
completed_teardowns = as_completed(
workers.submit(host.teardown) for host in all_hosts
)
for completed in completed_teardowns:
completed.result()
# checkin all hosts in parallel
with ThreadPoolExecutor(max_workers=len(broker_instances)) as workers:
completed_checkins = as_completed(
workers.submit(broker.checkin) for broker in broker_instances.values()
)
for completed in completed_checkins:
completed.result()

def __repr__(self):
inner = ", ".join(
f"{k}={v}"
Expand Down
41 changes: 33 additions & 8 deletions tests/functional/test_containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def temp_inventory():

# ----- CLI Scenario Tests -----


@pytest.mark.parametrize(
"args_file", [f for f in SCENARIO_DIR.iterdir() if f.name.startswith("checkout_")]
)
Expand Down Expand Up @@ -67,9 +68,10 @@ def test_containerhost_query():

# ----- Broker API Tests -----


def test_container_e2e():
with Broker(container_host="ubi8:latest") as c_host:
assert c_host._cont_inst.top()['Processes']
assert c_host._cont_inst.top()["Processes"]
res = c_host.execute("hostname")
assert res.stdout.strip() == c_host.hostname
loc_settings_path = Path("broker_settings.yaml")
Expand All @@ -78,7 +80,9 @@ def test_container_e2e():
res = c_host.execute(f"ls {remote_dir}")
assert str(loc_settings_path) in res.stdout
with NamedTemporaryFile() as tmp:
c_host.session.sftp_read(f"{remote_dir}/{loc_settings_path.name}", tmp.file.name)
c_host.session.sftp_read(
f"{remote_dir}/{loc_settings_path.name}", tmp.file.name
)
data = c_host.session.sftp_read(
f"{remote_dir}/{loc_settings_path.name}", return_data=True
)
Expand All @@ -88,20 +92,22 @@ def test_container_e2e():
assert (
loc_settings_path.read_bytes() == data
), "Local file is different from the received one (return_data=True)"
assert data == Path(tmp.file.name).read_bytes(), "Received files do not match"
assert (
data == Path(tmp.file.name).read_bytes()
), "Received files do not match"
# test the tail_file context manager
tailed_file = f"{remote_dir}/tail_me.txt"
c_host.execute(f"echo 'hello world' > {tailed_file}")
with c_host.session.tail_file(tailed_file) as tf:
c_host.execute(f"echo 'this is a new line' >> {tailed_file}")
assert 'this is a new line' in tf.stdout
assert 'hello world' not in tf.stdout
assert "this is a new line" in tf.stdout
assert "hello world" not in tf.stdout


def test_container_e2e_mp():
with Broker(container_host="ubi8:latest", _count=7) as c_hosts:
for c_host in c_hosts:
assert c_host._cont_inst.top()['Processes']
assert c_host._cont_inst.top()["Processes"]
res = c_host.execute("hostname")
assert res.stdout.strip() == c_host.hostname
loc_settings_path = Path("broker_settings.yaml")
Expand All @@ -110,7 +116,9 @@ def test_container_e2e_mp():
res = c_host.execute(f"ls {remote_dir}")
assert str(loc_settings_path) in res.stdout
with NamedTemporaryFile() as tmp:
c_host.session.sftp_read(f"{remote_dir}/{loc_settings_path.name}", tmp.file.name)
c_host.session.sftp_read(
f"{remote_dir}/{loc_settings_path.name}", tmp.file.name
)
data = c_host.session.sftp_read(
f"{remote_dir}/{loc_settings_path.name}", return_data=True
)
Expand All @@ -120,4 +128,21 @@ def test_container_e2e_mp():
assert (
loc_settings_path.read_bytes() == data
), "Local file is different from the received one (return_data=True)"
assert data == Path(tmp.file.name).read_bytes(), "Received files do not match"
assert (
data == Path(tmp.file.name).read_bytes()
), "Received files do not match"


def test_broker_multi_manager():
with Broker.multi_manager(
ubi7={"container_host": "ubi7:latest"},
ubi8={"container_host": "ubi8:latest", "_count": 2},
ubi9={"container_host": "ubi9:latest"},
) as multi_hosts:
assert "ubi7" in multi_hosts and "ubi8" in multi_hosts and "ubi9" in multi_hosts
assert len(multi_hosts["ubi8"]) == 2
assert multi_hosts["ubi7"][0]._cont_inst.top()["Processes"]
assert (
multi_hosts["ubi8"][1].execute("hostname").stdout.strip()
== multi_hosts["ubi8"][1].hostname
)
34 changes: 23 additions & 11 deletions tests/test_broker.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
from broker import broker, helpers
from broker import broker, Broker, helpers
from broker.providers import test_provider
import pytest


def test_empty_init():
"""Broker should be able to init without any arguments"""
broker_inst = broker.Broker()
assert isinstance(broker_inst, broker.Broker)
broker_inst = Broker()
assert isinstance(broker_inst, Broker)


def test_kwarg_assignment():
"""Broker should copy all kwargs into its _kwargs attribute"""
broker_kwargs = {"test": "value", "another": 17}
broker_inst = broker.Broker(**broker_kwargs)
broker_inst = Broker(**broker_kwargs)
assert broker_inst._kwargs == broker_kwargs


def test_full_init():
"""Make sure all init checks and assignments work"""
broker_hosts = ["test1.example.com", "test2.example.com", "test3.example.com"]
broker_inst = broker.Broker(
hosts=broker_hosts, test_action="blank", nick="test_nick"
)
broker_inst = Broker(hosts=broker_hosts, test_action="blank", nick="test_nick")
assert broker_inst._hosts == broker_hosts
assert not broker_inst._kwargs.get("hosts")
assert broker_inst._provider_actions == {
Expand All @@ -33,7 +31,7 @@ def test_full_init():

def test_broker_e2e():
"""Run through the base functionality of broker"""
broker_inst = broker.Broker(nick="test_nick")
broker_inst = Broker(nick="test_nick")
host_checkout = broker_inst.checkout()
assert len(broker_inst._hosts) == 1
broker_host = broker_inst._hosts[0]
Expand All @@ -47,7 +45,7 @@ def test_broker_e2e():

def test_broker_empty_checkin():
"""Try to checkin with no hosts on the instance"""
broker_inst = broker.Broker(nick="test_nick")
broker_inst = Broker(nick="test_nick")
assert not broker_inst._hosts
broker_inst.checkin()

Expand Down Expand Up @@ -80,15 +78,15 @@ def test_mp_checkout():
# https://github.com/SatelliteQE/broker/pull/53
# With count like this, I've got reproducibility probability
# arround 0.5
broker_inst = broker.Broker(nick="test_nick", _count=VM_COUNT)
broker_inst = Broker(nick="test_nick", _count=VM_COUNT)
broker_inst.checkout()
assert len(broker_inst._hosts) == VM_COUNT
broker_inst.checkin()
assert len(broker_inst._hosts) == 0


def test_mp_checkout_twice():
broker_inst = broker.Broker(nick="test_nick", _count=2)
broker_inst = Broker(nick="test_nick", _count=2)

def cycle():
assert len(broker_inst.checkout()) == 2
Expand All @@ -101,6 +99,20 @@ def cycle():
cycle()


def test_multi_manager():
"""Test that we get the proper data structure and names as expected
when using Broker.multi_manager.
"""
with Broker.multi_manager(
test_1={"nick": "test_nick"}, test_2={"nick": "test_nick", "_count": 2}
) as host_dict:
assert "test_1" in host_dict and "test_2" in host_dict
assert len(host_dict["test_1"]) == 1
assert len(host_dict["test_2"]) == 2
assert host_dict["test_1"][0].hostname == "test.host.example.com"
assert host_dict["test_2"][1].hostname == "test.host.example.com"


class SomeException(Exception):
pass

Expand Down

0 comments on commit be3e948

Please sign in to comment.