Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(push): add an option to specify targets #775

Merged
merged 8 commits into from
May 13, 2024
128 changes: 111 additions & 17 deletions push
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,32 @@ from itertools import chain
from zipfile import ZipFile
import tempfile
from contextlib import contextmanager
import json
import re

_DEFAULT_EXTRAS = {'stdout': sys.stdout, 'stderr': sys.stderr}
_SSH_EXTRA_OPTS = ['-o', 'StrictHostKeyChecking=no',
'-o', 'UserKnownHostsFile=/dev/null']
_ROBOT_MANIFEST_FILE_PATH = "/usr/lib/firmware/opentrons-firmware.json"
TARGETS = [
"pipettes",
"pipettes-rev1",
"pipettes-single",
"pipettes-multi",
"pipettes-96",
"gripper",
"hepa-uv",
"gantry",
"gantry-x",
"gantry-y",
"head",
"rear-panel",
"bootloader",
]
_MULTI_SUBSYSTEM_TARGETS = {
"pipettes": ["pipettes-single", "pipettes-multi", "pipettes-96"],
"gantry": ["gantry-x", "gantry-y"]
}

class CantFindUtilityException(RuntimeError):
def __init__(self, which_util):
Expand All @@ -36,10 +58,17 @@ def _scp_to_robot(scp_util, host, local, remote, **extras):
_cmd(
[scp_util]
+ _SSH_EXTRA_OPTS
+ [local, 'root@{host}:{remote}'.format(host=host, remote=remote)],
+ [local, f'root@{host}:{remote}'],
**extras
)

def _scp_from_robot(scp_util, host, local, remote, **extras):
_cmd(
[scp_util]
+ _SSH_EXTRA_OPTS
+ [f'root@{host}:{remote}', local],
**extras
)

def _cmd(cmdlist, **extras):
_extras = {k: v for k, v in chain(_DEFAULT_EXTRAS.items(), extras.items())}
Expand All @@ -54,31 +83,78 @@ def _controlled_tempdir():
finally:
shutil.rmtree(td)

def _build_fw(zip_path, apps_path):
with ZipFile(zip_path, 'w') as zf:
for fname in os.listdir(apps_path):
zf.write(os.path.join(apps_path, fname), fname)
def _build_fw(zip_path, apps_path, targets):
if targets:
regex_list = [re.compile(f"{target}" + r"(.*)(.hex|.bin)") for target in targets]
with ZipFile(zip_path, 'w') as zf:
for fname in os.listdir(apps_path):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the above comment, the scripts/subsystem_versions.py script validates the filenames, it could be useful to check that.

# only write to zip file to be copied if filename matches target
if any([reg.search(fname) for reg in regex_list]):
zf.write(os.path.join(apps_path, fname), fname)
else:
with ZipFile(zip_path, 'w') as zf:
for fname in os.listdir(apps_path):
# write all image files to zip file
zf.write(os.path.join(apps_path, fname), fname)


def _subsystems_from_targets(targets):
# assuming all targets are valid at this point, convert
# presets that encompass multiple subsystems to their
# respective subsystems
for t in targets:
if t in _MULTI_SUBSYSTEM_TARGETS:
t_index = targets.index(t)
# replace the target with multiple subsystems
targets[t_index:t_index+1] = tuple(_MULTI_SUBSYSTEM_TARGETS[t])
return targets


def _update_shortsha(scp, host, json_data_path, targets):
shortsha = subprocess.check_output(["git", "rev-parse", "--short", "HEAD"]).decode().strip()
# copy data to local file
_scp_from_robot(scp, host, json_data_path, _ROBOT_MANIFEST_FILE_PATH)
with open(json_data_path, 'r+') as output_file:
manifest = json.load(output_file)
for target in _subsystems_from_targets(targets):
manifest['subsystems'][target]['shortsha'] = shortsha
output_file.seek(0)
json.dump(manifest, output_file)
# copy updated subsystem data to the robot
_scp_to_robot(scp, host, json_data_path, _ROBOT_MANIFEST_FILE_PATH)

def _transfer_firmware(host, repo_path, scp, ssh, sensors):
def _transfer_firmware(host, repo_path, scp, ssh, sensors, targets):
dist_dir = "dist"
if sensors:
dist_dir = dist_dir+"-sensor"
apps_path = os.path.join(repo_path, dist_dir, 'applications')
with _controlled_tempdir() as td:
local_zip_path = os.path.join(td, 'fw.zip')
robot_zip_path = '/tmp/fw.zip'
_build_fw(local_zip_path, apps_path)
_build_fw(local_zip_path, apps_path, targets)
if targets:
local_temp_manifest_path = os.path.join(td, 'temp_manifest.json')
_update_shortsha(scp, host, local_temp_manifest_path, targets)
_scp_to_robot(scp, host, local_zip_path, robot_zip_path)
_ssh(ssh, host, 'unzip -o {zip_path} -d /usr/lib/firmware/'.format(zip_path=robot_zip_path))
_ssh(ssh, host, 'rm {zip_path}'.format(zip_path=robot_zip_path))

def _prep_firmware(repo_path, cmake, sensors):
preset = "firmware-g4"
def _prep_firmware(repo_path, cmake, sensors, targets):
working_dir = "./build-cross"
full_build_preset = "firmware-g4"

if sensors:
preset = preset+"-sensors"
working_dir = working_dir+"-sensor"
_cmd([cmake, '--build', f'--preset={preset}', '--target', 'firmware-applications', 'firmware-images'], cwd=repo_path)
full_build_preset = full_build_preset+"-sensors"
# if sensors is true, disregard targets within the scope of this function
targets = None
if targets:
for target in targets:
_cmd([cmake, '--build', 'build-cross', '--target', f'{target}-images'], cwd=repo_path)
else:
_cmd([cmake, '--build', f'--preset={full_build_preset}', '--target', 'firmware-applications', 'firmware-images'], cwd=repo_path)


_cmd([cmake, '--install', f'{working_dir}', '--component', 'Applications'], cwd=repo_path)

@contextmanager
Expand All @@ -101,22 +177,36 @@ def _find_utils():
raise CantFindUtilityException('cmake')
return ssh, scp, cmake

def _check_targets(targets):
for t in targets:
if t not in TARGETS:
print(f"preset {t} is not in target options, ignoring")
targets.remove(t)
return targets

def _restart_robot(host, ssh):
_ssh(ssh, host, 'nohup systemctl restart opentrons-robot-server &')

def _do_push(host, repo_path, build, restart, sensors):
def _do_push(host, repo_path, build, restart, sensors, targets):

ssh, scp, cmake = _find_utils()
if targets:
targets = _check_targets(targets)
if build:
_prep_firmware(repo_path, cmake, sensors)
_prep_firmware(repo_path, cmake, sensors, targets)
with _prep_robot(host, ssh):
_transfer_firmware(host, repo_path, scp, ssh, sensors)
_transfer_firmware(host, repo_path, scp, ssh, sensors, targets)
if restart:
_restart_robot(host, ssh)

def push(host, repo_path=None, build=True, restart=True, sensors=False):
def push(host, repo_path=None, build=True, restart=True, sensors=False, targets=[]):
# sensors is logically independent from targets here- if you specify both:
# - all hex files under firmware-g4-sensors will be built and installed, but
# - only the targets selected will actually be copied over to the robot

repo = repo_path or os.dirname(__file__)
try:
_do_push(host, repo, build, restart, sensors)
_do_push(host, repo, build, restart, sensors, targets)
return 0
except subprocess.CalledProcessError as e:
print(
Expand All @@ -133,7 +223,7 @@ def _push_from_argparse(args):
if args.key:
_SSH_EXTRA_OPTS.append('-i')
_SSH_EXTRA_OPTS.append(args.key)
return push(args.host, os.path.abspath(args.repo_path), not args.no_build, not args.no_restart, args.sensors)
return push(args.host, os.path.abspath(args.repo_path), not args.no_build, not args.no_restart, args.sensors, args.targets)

def _arg_parser(parent=None):
parents = []
Expand Down Expand Up @@ -169,6 +259,10 @@ def _arg_parser(parent=None):
action='store_true',
help='Private SSH key to use'
)
parser.add_argument(
'--targets',
nargs='*'
)
return parser

def _main():
Expand Down
Loading