Skip to content

Commit

Permalink
Add initial support for a Beaker provider.
Browse files Browse the repository at this point in the history
This change adds a very basic Beaker provider and its bind.
For now, this relies on the beaker-client cli and config through that.
Currently the only supported checkout/execute method is via job xml.
  • Loading branch information
JacobCallahan committed May 16, 2023
1 parent e34979b commit f8aaf04
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Copy the example settings file to `broker_settings.yaml` and edit it.

(optional) If you are using the Container provider, install the extra dependency based on your container runtime of choice with either `pip install broker[podman]` or `pip install broker[docker]`.

(optional) If you are using the Beaker provider, install the extra dependency with `dnf install krb5-devel` and then `pip install broker[beaker]`.

To run Broker outside of its base directory, specify the directory with the `BROKER_DIRECTORY` environment variable.

Configure the `broker_settings.yaml` file to set configuration values for broker's interaction with its providers.
Expand Down
180 changes: 180 additions & 0 deletions broker/binds/beaker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import json
import subprocess
import time
from logzero import logger
from pathlib import Path
from xml.etree import ElementTree as ET
from broker import helpers
from broker.exceptions import BeakerBindError


def _elementree_to_dict(etree):
"""Converts an ElementTree object to a dictionary"""
data = {}
if etree.attrib:
data.update(etree.attrib)
if etree.text:
data["text"] = etree.text
for child in etree:
child_data = _elementree_to_dict(child)
if (tag := child.tag) in data:
if not isinstance(data[tag], list):
data[tag] = [data[tag]]
data[tag].append(child_data)
else:
data[tag] = child_data
return data


class BeakerBind:

def __init__(self, hub_url, auth="krbv", **kwargs):
self.hub_url = hub_url
self._base_args = ["--insecure", f"--hub={self.hub_url}"]
if auth == "basic":
# If we're not using system kerberos auth, add in explicit basic auth
self.username = kwargs.pop("username", None)
self.password = kwargs.pop("password", None)
self._base_args.extend([
f"--username {self.username}",
f"--password {self.password}",
])
self.__dict__.update(kwargs)

def _exec_command(self, *cmd_args, **cmd_kwargs):
exec_cmd, cmd_args = ["bkr"], list(cmd_args)
# check through kwargs and if any are True add to cmd_args
del_keys = []
for k, v in cmd_kwargs.items():
if isinstance(v, bool):
del_keys.append(k)
if v:
cmd_args.append(f"--{k}")
for k in del_keys:
del cmd_kwargs[k]
exec_cmd.extend(cmd_args)
exec_cmd.extend(self._base_args)
exec_cmd.extend([
f"--{k.replace('_', '-')}={v}"
for k, v in cmd_kwargs.items()
])
logger.debug(f"Executing beaker command: {exec_cmd}")
proc = subprocess.Popen(
exec_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = proc.communicate()
result = helpers.Result(
stdout=stdout.decode(),
stderr=stderr.decode(),
status=proc.returncode,
)
if result.status != 0:
raise BeakerBindError(
f"Beaker command failed:\n"
f"Command={' '.join(exec_cmd)}\n"
f"Result={result}",
)
return result

def job_submit(self, job_xml, wait=False):
# wait behavior seems buggy to me, so best to avoid it
if not Path(job_xml).exists():
raise FileNotFoundError(f"Job XML file {job_xml} not found")
result = self._exec_command("job-submit", job_xml, wait=wait)
if not wait:
# get the job id from the output
# format is "Submitted: ['J:7849837'] where the number is the job id
for line in result.stdout.splitlines():
if line.startswith("Submitted:"):
return line.split("'")[1].replace("J:", "")

def job_watch(self, job_id):
job_id = f"J:{job_id}" if not job_id.startswith("J:") else job_id
return self._exec_command("job-watch", job_id)

def job_results(self, job_id, format="beaker-results-xml", pretty=False):
job_id = f"J:{job_id}" if not job_id.startswith("J:") else job_id
return self._exec_command("job-results", job_id, format=format, prettyxml=pretty)

def job_clone(self, job_id, wait=True):
job_id = f"J:{job_id}" if not job_id.startswith("J:") else job_id
return self._exec_command("job-clone", job_id, wait=wait)

def job_cancel(self, job_id):
job_id = f"J:{job_id}" if not job_id.startswith("J:") else job_id
return self._exec_command("job-cancel", job_id)

def job_delete(self, job_id):
job_id = f"J:{job_id}" if not job_id.startswith("J:") else job_id
return self._exec_command("job-delete", job_id)

def system_release(self, system_id):
return self._exec_command("system-release", system_id)

def system_list(self, **kwargs):
"""Due to the number of arguments, we will not validate before submitting
Accepted arguments are:
available available to be used by this user
free available to this user and not currently being used
removed which have been removed
mine owned by this user
type=TYPE of TYPE
status=STATUS with STATUS
pool=POOL in POOL
arch=ARCH with ARCH
dev-vendor-id=VENDOR-ID with a device that has VENDOR-ID
dev-device-id=DEVICE-ID with a device that has DEVICE-ID
dev-sub-vendor-id=SUBVENDOR-ID with a device that has SUBVENDOR-ID
dev-sub-device-id=SUBDEVICE-ID with a device that has SUBDEVICE-ID
dev-driver=DRIVER with a device that has DRIVER
dev-description=DESCRIPTION with a device that has DESCRIPTION
xml-filter=XML matching the given XML filter
host-filter=NAME matching pre-defined host filter
"""
# convert the flags passed in kwargs to arguments
args = []
for key in {"available", "free", "removed", "mine"}:
if kwargs.pop(key, False):
args.append(f"--{key}")
return self._exec_command("system-list", *args, **kwargs)

def user_systems(self): # to be used for inventory sync
return self.system_list(mine=True).stdout.splitlines()

def system_details(self, system_id, format="json"):
return self._exec_command("system-details", system_id, format=format)

def execute_job(self, job_xml, max_wait="24hr"):
"""Submit a job, periodically checking the status until it completes
then return a dictionary of the results.
"""
job_id = self.job_submit(job_xml, wait=False)
max_wait = time.time() + helpers.translate_timeout(max_wait)
while time.time() < max_wait:
result = self.job_results(job_id, pretty=True)
if 'result="Pass"' in result.stdout:
return _elementree_to_dict(ET.fromstring(result))
elif 'result="Fail"' in result.stdout or "Exception: " in result.stdout:
raise BeakerBindError(f"Job {job_id} failed:\n{result}")
time.sleep(60)

def system_details_curated(self, system_id):
full_details = json.loads(self.system_details(system_id).stdout)
return {
"hostname": full_details["fqdn"],
"mac_address": full_details["mac_address"],
"reserved_on": full_details["current_reservation"]["start_time"],
"expires_on": full_details["current_reservation"]["finish_time"],
"reserved_for": "{display_name} <{email_address}>".format(
display_name=full_details["current_reservation"]["user"]["display_name"],
email_address=full_details["current_reservation"]["user"]["email_address"],
),
"owner": "{display_name} <{email_address}>".format(
display_name=full_details["owner"]["display_name"],
email_address=full_details["owner"]["email_address"],
),
"id": full_details["id"]
}
8 changes: 8 additions & 0 deletions broker/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,11 @@ def __init__(self, host=None, message="Unspecified exception"):
if host:
self.message = f"{host.hostname or host.name}: {message}"
super().__init__(message=self.message)


class ContainerBindError(BrokerError):
error_code = 11


class BeakerBindError(BrokerError):
error_code = 12
103 changes: 103 additions & 0 deletions broker/providers/beaker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import click
import inspect
from dynaconf import Validator
from logzero import logger
from broker.providers import Provider
from broker.binds.beaker import BeakerBind

class Beaker(Provider):

_validators = [
Validator("beaker.hub_url", must_exist=True),
]
_checkout_options = [
click.option(
"--job-xml",
type=str,
help="Path to the job XML file to submit",
),
]
_execute_options = [
click.option(
"--job-xml",
type=str,
help="Path to the job XML file to submit",
),
]
_extend_options = [] # TODO: find out the appropriate option


def __init__(self, **kwargs):
super().__init__(**kwargs)
self.runtime = BeakerBind(self.hub_url, **kwargs)

def _host_release(self):
caller_host = inspect.stack()[1][0].f_locals["host"]
broker_args = getattr(caller_host, "_broker_args", {}).get("_broker_args", {})
return self.runtime.system_release(broker_args["id"])

def _set_attributes(self, host_inst, broker_args=None, misc_attrs=None):
host_inst.__dict__.update(
{
"_prov_inst": self,
"_broker_provider": "Beaker",
"_broker_provider_instance": self.instance,
"_broker_args": broker_args,
"release": self._host_release,
}
)
if isinstance(misc_attrs, dict):
host_inst.__dict__.update(misc_attrs)

def _compile_host_info(self, host):
curated_host_info = self.runtime.system_details_curated(host)
curated_host_info.update(
{
"_prov_inst": self,
"_broker_provider": "Beaker",
"_broker_provider_instance": self.instance,
"_broker_args": getattr(host, "_broker_args", {}),
}
)
return curated_host_info

def construct_host(self, provider_params, host_classes, **kwargs):
"""Constructs broker host from a beaker system information
:param provider_params: a beaker system information dictionary
:param host_classes: host object
:return: constructed broker host object
"""
logger.debug(
f"constructing with {provider_params=}\n{host_classes=}\n{kwargs=}"
)
if not provider_params:
host_inst = host_classes[kwargs.get("type", "host")](**kwargs)
# cont_inst = self._cont_inst_by_name(host_inst.name)
self._set_attributes(host_inst, broker_args=kwargs)
return host_inst

@Provider.register_action("job_xml")
def submit_job(self, job_xml):
job_id = self.runtime.job_submit(job_xml)
logger.info(f"Submitted job {job_id}")
# wait for the job to finish
result = self.runtime.job_watch(job_id)

@Provider.register_action("jobs")
def nick_help(self, **kwargs):
pass

def release(self, host_obj):
pass

def extend(self):
pass

def get_inventory(self, **kwargs):
hosts = self.runtime.user_systems()
with click.progressbar(hosts, label='Compiling host information') as hosts_bar:
compiled_host_info = [self._compile_host_info(host) for host in hosts_bar]
return compiled_host_info
2 changes: 2 additions & 0 deletions broker_settings.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ Container:
# name_prefix: test
results_limit: 50
auto_map_ports: False
Beaker:
hub_url:
TestProvider:
instances:
- test1:
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ docker =
docker
paramiko
podman = podman-py
beaker = beaker-client

[options.entry_points]
console_scripts =
Expand Down

0 comments on commit f8aaf04

Please sign in to comment.