Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(push): add an option to specify targets #775

Merged
merged 8 commits into from
May 13, 2024
87 changes: 69 additions & 18 deletions push
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ from itertools import chain
from zipfile import ZipFile
import tempfile
from contextlib import contextmanager
import json
import re

_DEFAULT_EXTRAS = {'stdout': sys.stdout, 'stderr': sys.stderr}
_SSH_EXTRA_OPTS = ['-o', 'StrictHostKeyChecking=no',
'-o', 'UserKnownHostsFile=/dev/null']
_ROBOT_MANIFEST_FILE_PATH = "/usr/lib/firmware/opentrons-firmware.json"

class CantFindUtilityException(RuntimeError):
def __init__(self, which_util):
Expand All @@ -36,10 +39,17 @@ def _scp_to_robot(scp_util, host, local, remote, **extras):
_cmd(
[scp_util]
+ _SSH_EXTRA_OPTS
+ [local, 'root@{host}:{remote}'.format(host=host, remote=remote)],
+ [local, f'root@{host}:{remote}'],
**extras
)

def _scp_from_robot(scp_util, host, local, remote, **extras):
_cmd(
[scp_util]
+ _SSH_EXTRA_OPTS
+ [f'root@{host}:{remote}', local],
**extras
)

def _cmd(cmdlist, **extras):
_extras = {k: v for k, v in chain(_DEFAULT_EXTRAS.items(), extras.items())}
Expand All @@ -54,31 +64,63 @@ def _controlled_tempdir():
finally:
shutil.rmtree(td)

def _build_fw(zip_path, apps_path):
with ZipFile(zip_path, 'w') as zf:
for fname in os.listdir(apps_path):
zf.write(os.path.join(apps_path, fname), fname)

def _transfer_firmware(host, repo_path, scp, ssh, sensors):
def _build_fw(zip_path, apps_path, targets):
if targets:
regex_list = [re.compile(f"{target}" + r"(.*)(.hex|.bin)") for target in targets]
with ZipFile(zip_path, 'w') as zf:
for fname in os.listdir(apps_path):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the above comment, the scripts/subsystem_versions.py script validates the filenames, it could be useful to check that.

# only write to zip file to be copied if filename matches target
if any([reg.search(fname) for reg in regex_list]):
zf.write(os.path.join(apps_path, fname), fname)
else:
with ZipFile(zip_path, 'w') as zf:
for fname in os.listdir(apps_path):
# write all image files to zip file
zf.write(os.path.join(apps_path, fname), fname)

def _update_shortsha(scp, host, json_data_path, targets):
shortsha = subprocess.check_output(["git", "rev-parse", "--short", "HEAD"]).decode().strip()
# copy data to local file
_scp_from_robot(scp, host, json_data_path, _ROBOT_MANIFEST_FILE_PATH)
with open(json_data_path, 'r+') as output_file:
manifest = json.load(output_file)
for target in targets:
manifest['subsystems'][target]['shortsha'] = shortsha
output_file.seek(0)
json.dump(manifest, output_file)
# copy updated subsystem data to the robot
_scp_to_robot(scp, host, json_data_path, _ROBOT_MANIFEST_FILE_PATH)

def _transfer_firmware(host, repo_path, scp, ssh, sensors, targets):
dist_dir = "dist"
if sensors:
dist_dir = dist_dir+"-sensor"
apps_path = os.path.join(repo_path, dist_dir, 'applications')
with _controlled_tempdir() as td:
local_zip_path = os.path.join(td, 'fw.zip')
robot_zip_path = '/tmp/fw.zip'
_build_fw(local_zip_path, apps_path)
_build_fw(local_zip_path, apps_path, targets)
if targets:
local_temp_manifest_path = os.path.join(td, 'temp_manifest.json')
_update_shortsha(scp, host, local_temp_manifest_path, targets)
_scp_to_robot(scp, host, local_zip_path, robot_zip_path)
_ssh(ssh, host, 'unzip -o {zip_path} -d /usr/lib/firmware/'.format(zip_path=robot_zip_path))
_ssh(ssh, host, 'rm {zip_path}'.format(zip_path=robot_zip_path))

def _prep_firmware(repo_path, cmake, sensors):
preset = "firmware-g4"
def _prep_firmware(repo_path, cmake, sensors, targets):
working_dir = "./build-cross"
full_build_preset = "firmware-g4"

if sensors:
preset = preset+"-sensors"
working_dir = working_dir+"-sensor"
_cmd([cmake, '--build', f'--preset={preset}', '--target', 'firmware-applications', 'firmware-images'], cwd=repo_path)
full_build_preset = full_build_preset+"-sensors"
if targets:
for target in targets:
_cmd([cmake, '--build', 'build-cross', '--target', f'{target}-images'], cwd=repo_path)
else:
_cmd([cmake, '--build', f'--preset={full_build_preset}', '--target', 'firmware-applications', 'firmware-images'], cwd=repo_path)


_cmd([cmake, '--install', f'{working_dir}', '--component', 'Applications'], cwd=repo_path)

@contextmanager
Expand All @@ -104,19 +146,24 @@ def _find_utils():
def _restart_robot(host, ssh):
_ssh(ssh, host, 'nohup systemctl restart opentrons-robot-server &')

def _do_push(host, repo_path, build, restart, sensors):
def _do_push(host, repo_path, build, restart, sensors, targets):

ssh, scp, cmake = _find_utils()
if build:
_prep_firmware(repo_path, cmake, sensors)
_prep_firmware(repo_path, cmake, sensors, targets)
with _prep_robot(host, ssh):
_transfer_firmware(host, repo_path, scp, ssh, sensors)
_transfer_firmware(host, repo_path, scp, ssh, sensors, targets)
if restart:
_restart_robot(host, ssh)

def push(host, repo_path=None, build=True, restart=True, sensors=False):
def push(host, repo_path=None, build=True, restart=True, sensors=False, targets=[]):
# sensors is logically independent from targets here- if you specify both:
# - all hex files under firmware-g4-sensors will be built and installed, but
# - only the targets selected will actually be copied over to the robot

repo = repo_path or os.dirname(__file__)
try:
_do_push(host, repo, build, restart, sensors)
_do_push(host, repo, build, restart, sensors, targets)
return 0
except subprocess.CalledProcessError as e:
print(
Expand All @@ -133,7 +180,7 @@ def _push_from_argparse(args):
if args.key:
_SSH_EXTRA_OPTS.append('-i')
_SSH_EXTRA_OPTS.append(args.key)
return push(args.host, os.path.abspath(args.repo_path), not args.no_build, not args.no_restart, args.sensors)
return push(args.host, os.path.abspath(args.repo_path), not args.no_build, not args.no_restart, args.sensors, args.targets)

def _arg_parser(parent=None):
parents = []
Expand Down Expand Up @@ -169,6 +216,10 @@ def _arg_parser(parent=None):
action='store_true',
help='Private SSH key to use'
)
parser.add_argument(
'--targets',
nargs='*'
)
return parser

def _main():
Expand Down
Loading