Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion hwci/boards/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@
# Copyright Tock Contributors 2024.

from .tockloader_board import TockloaderBoard
from .nrf52dk import Nrf52dk
from .mock_board import MockBoard
118 changes: 118 additions & 0 deletions hwci/boards/imix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Licensed under the Apache License, Version 2.0 or the MIT License.
# SPDX-License-Identifier: Apache-2.0 OR MIT
# Copyright Tock Contributors 2024.

import time
import os
import subprocess
import logging
from contextlib import contextmanager
import serial.tools.list_ports
from boards.tockloader_board import TockloaderBoard
from utils.serial_port import SerialPort
from gpio.gpio import GPIO
import yaml
import os
import traceback

class Imix(TockloaderBoard):
def __init__(self):
super().__init__()
self.arch = "cortex-m4"
self.kernel_path = os.path.join(
self.base_dir, "repos/tock")
self.kernel_board_path = os.path.join(
self.kernel_path, "boards/imix")
self.uart_port = self.get_uart_port()
self.uart_baudrate = self.get_uart_baudrate()
self.board = "imix"
self.program_method = "serial_bootloader"
self.serial = self.get_serial_port()
self.gpio = self.get_gpio_interface()
self.open_serial_during_flash = False
self.app_sha256_credential = True

def get_uart_port(self):
logging.info("Getting list of serial ports")
ports = list(serial.tools.list_ports.comports())
for port in ports:
if "imix IoT Module" in port.description:
logging.info(f"Found imix IoT programming port: {port.device}")
return port.device
if ports:
logging.info(f"Automatically selected port: {ports[0].device}")
return ports[0].device
else:
logging.error("No serial ports found")
raise Exception("No serial ports found")

def get_uart_baudrate(self):
return 115200 # Default baudrate for the board

def get_serial_port(self):
logging.info(
f"Using serial port: {self.uart_port} at baudrate {self.uart_baudrate}"
)
return SerialPort(self.uart_port, self.uart_baudrate, open_rts=False, open_dtr=True)

def get_gpio_interface(self):
return None

def cleanup(self):
if self.gpio:
for interface in self.gpio.gpio_interfaces.values():
interface.cleanup()
if self.serial:
self.serial.close()

def flash_kernel(self):
logging.info("Flashing the Tock OS kernel")
if not os.path.exists(self.kernel_path):
logging.error(f"Tock directory {self.kernel_path} not found")
raise FileNotFoundError(f"Tock directory {self.kernel_path} not found")

# Run make program from the board directory (this uses the Tock bootloader)
subprocess.run(
["make", "program"], cwd=self.kernel_board_path, check=True
)

def erase_board(self):
logging.info("Erasing the board")
# We erase all apps, but don't erase the kernel. Is there a simple way
# that we can prevent the installed kernel from starting (by
# overwriting its reset vector?)
subprocess.run(["tockloader", "erase-apps"], check=True)

def reset(self):
if self.serial.is_open():
logging.info("Performing a target reset by toggling RTS")
self.serial.set_rts(True)
time.sleep(0.1)
self.serial.set_rts(False)
else:
logging.info("Performing a target reset by reading address 0")
subprocess.run(["tockloader", "read", "0x0", "1"], check=True)

# The flash_app method is inherited from TockloaderBoard

@contextmanager
def change_directory(self, new_dir):
previous_dir = os.getcwd()
os.chdir(new_dir)
logging.info(f"Changed directory to: {os.getcwd()}")
try:
yield
finally:
os.chdir(previous_dir)
logging.info(f"Reverted to directory: {os.getcwd()}")


def load_target_spec():
# Assume the target spec file is in a fixed location
target_spec_path = os.path.join(os.getcwd(), "target_spec_imix.yaml")
with open(target_spec_path, "r") as f:
target_spec = yaml.safe_load(f)
return target_spec


board = Imix()
3 changes: 2 additions & 1 deletion hwci/boards/nrf52dk.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ def __init__(self):
self.kernel_path, "boards/nordic/nrf52840dk")
self.uart_port = self.get_uart_port()
self.uart_baudrate = self.get_uart_baudrate()
self.program_method = "openocd"
self.openocd_board = "nrf52dk"
self.board = "nrf52dk"
self.serial = self.get_serial_port()
self.gpio = self.get_gpio_interface()

def get_uart_port(self):
logging.info("Getting list of serial ports")
logging.info("Getting list of serial ports!")
ports = list(serial.tools.list_ports.comports())
for port in ports:
if "J-Link" in port.description:
Expand Down
30 changes: 23 additions & 7 deletions hwci/boards/tockloader_board.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def __init__(self):
self.board = None # Should be set in subclass
self.arch = None # Should be set in subclass
self.base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
self.program_method = "serial_bootloader"
self.app_sha256_credential = False

def flash_app(self, app):
if type(app) == str:
Expand All @@ -38,32 +40,46 @@ def flash_app(self, app):
logging.error(f"App directory {app_dir} not found")
raise FileNotFoundError(f"App directory {app_dir} not found")


make_args = [
"make",
f"TOCK_TARGETS={self.arch}"
]
# if self.app_sha256_credential:
# make_args.append("ELF2TAB_ARGS=\"--sha256\"")

# Build the app using absolute paths
logging.info(f"Building app: {app_name}")
if app_name != "lua-hello":
subprocess.run(
["make", f"TOCK_TARGETS={self.arch}"], cwd=app_dir, check=True
)
subprocess.run(make_args, cwd=app_dir, check=True)
else:
# if the app is lua-hello, we need to build the libtock-c submodule first so we need to change directory
# into the libtock-c directory so it knows we are in a git repostiory
self.change_directory(libtock_c_dir)
subprocess.run(
["make", f"TOCK_TARGETS={self.arch}"], cwd=app_dir, check=True
)
subprocess.run(make_args, cwd=app_dir, check=True)

tab_path = os.path.join(app_dir, tab_file)
if not os.path.exists(tab_path):
logging.error(f"Tab file {tab_path} not found")
raise FileNotFoundError(f"Tab file {tab_path} not found")

if self.program_method == "serial_bootloader":
program_method_arg = "--serial"
elif self.program_method == "jlink":
program_method_arg = "--jlink"
elif self.program_method == "openocd":
program_method_arg = "--openocd"
else:
raise NotImplemented(f"Unknown program method: {self.program_method}")

logging.info(f"Installing app: {app_name}")
subprocess.run(
[
"tockloader",
"install",
"--board",
self.board,
"--openocd",
program_method_arg,
tab_path,
],
check=True,
Expand Down
3 changes: 3 additions & 0 deletions hwci/target_spec_imix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
model: imix
serial_number: 0xfoobar
pin_mappings:
41 changes: 41 additions & 0 deletions hwci/tests/console_recv_short_and_long.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed under the Apache License, Version 2.0 OR the MIT License.
# SPDX-License-Identifier: Apache-2.0 OR MIT

import logging
from utils.test_helpers import OneshotTest
import re


class StackSizeTest01(OneshotTest):
def __init__(self):
super().__init__(apps=[
"tests/console/console_recv_short",
"tests/console/console_recv_long",
])

def oneshot_test(self, board):
serial = board.serial
return

# Wait for "Stack Test App"
output = serial.expect("Stack Test App", timeout=10)
if not output:
raise Exception("Did not receive 'Stack Test App' message")

# Wait for "Current stack pointer: 0x..."
output = serial.expect(r"Current stack pointer: 0x[0-9a-fA-F]+", timeout=5)
if not output:
raise Exception("Did not receive 'Current stack pointer' message")

# Optionally, you can extract and log the stack pointer value
match = re.search(r"Current stack pointer: (0x[0-9a-fA-F]+)", output.decode())
if match:
stack_pointer = match.group(1)
logging.info(f"Stack pointer is at {stack_pointer}")
else:
raise Exception("Failed to parse stack pointer value")

logging.info("Stack size test 01 completed successfully")


test = StackSizeTest01()
12 changes: 8 additions & 4 deletions hwci/tests/console_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@ def oneshot_test(self, board):

# Wait for the application to initialize
logging.info("Waiting for the application to initialize...")
time.sleep(2) # Allow time for the app to start
time.sleep(1) # Allow time for the app to start

# Simulate user input by writing to the serial port
test_input = b"Hello, Tock!"
serial.write(test_input)
serial.flush_buffer()
test_input = b"Hello, Tock!\r\n"
for b in test_input:
time.sleep(0.01) # imix doesn't like this being sent too quickly!
serial.write(bytes([b]))
#serial.write(test_input)
logging.info(f"Sent test input: {test_input.decode('utf-8')}")
time.sleep(7) # Wait for the application to process

# Wait for the expected output from the application
logging.info("Waiting for the application to output the result...")
pattern = r"Userspace call to read console returned: (.*)"
output = serial.expect(pattern, timeout=10)
print(output)

if output:
received_line = output.decode("utf-8", errors="replace").strip()
Expand Down
69 changes: 55 additions & 14 deletions hwci/utils/serial_port.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,56 @@


class SerialPort:
def __init__(self, port, baudrate=115200):
def __init__(self, port, baudrate=115200, open_rts=None, open_dtr=None):
self.port = port
self.baudrate = baudrate
try:
self.ser = serial.Serial(port, baudrate=baudrate, timeout=1)
self.child = fdpexpect.fdspawn(self.ser.fileno())
logging.info(f"Opened serial port {port} at baudrate {baudrate}")
except serial.SerialException as e:
logging.error(f"Failed to open serial port {port}: {e}")
raise
self.open_rts = open_rts
self.open_dtr = open_dtr
self.ser = None
self.child = None

def open(self):
if self.ser is None:
try:
self.ser = serial.Serial(self.port, baudrate=self.baudrate, timeout=1, exclusive=True)
self.child = fdpexpect.fdspawn(self.ser.fileno())
if self.open_rts is not None:
self.ser.rts = self.open_rts
if self.open_dtr is not None:
self.ser.dtr = self.open_dtr
logging.info(f"Opened serial port {self.port} at baudrate {self.baudrate}")
except serial.SerialException as e:
logging.error(f"Failed to open serial port {port}: {e}")
raise

def is_open(self):
return self.ser is not None

def close(self):
if self.ser is not None:
self.ser.close()
logging.info(f"Closed serial port {self.port}")
self.ser = None
self.child = None

def set_rts(self, rts):
assert self.ser is not None, "Serial port is not open!"
self.ser.rts = rts

def set_dtr(self, dtr):
assert self.ser is not None, "Serial port is not open!"
self.ser.dtr = dtr

def flush_buffer(self):
assert self.ser is not None, "Serial port is not open!"

self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
logging.info("Flushed serial buffers")

def expect(self, pattern, timeout=10, timeout_error=True):
assert self.ser is not None, "Serial port is not open!"

try:
index = self.child.expect(pattern, timeout=timeout)
return self.child.after
Expand All @@ -45,26 +78,31 @@ def expect(self, pattern, timeout=10, timeout_error=True):
return None

def write(self, data):
assert self.ser is not None, "Serial port is not open!"

logging.debug(f"Writing data: {data}")
for byte in data:
self.ser.write(bytes([byte]))
time.sleep(0.1)

def close(self):
self.ser.close()
logging.info(f"Closed serial port {self.port}")


class MockSerialPort:
def __init__(self):
self.buffer = queue.Queue()
self.accumulated_data = b""
self.open = False

def open(self):
self.open = True

def write(self, data):
assert self.open, "Serial port is not open!"

logging.debug(f"Writing data: {data}")
self.buffer.put(data)

def expect(self, pattern, timeout=10, timeout_error=True):
assert self.open, "Serial port is not open!"

end_time = time.time() + timeout
compiled_pattern = re.compile(pattern.encode())
while time.time() < end_time:
Expand All @@ -81,12 +119,15 @@ def expect(self, pattern, timeout=10, timeout_error=True):
return None

def flush_buffer(self):
assert self.open, "Serial port is not open!"

self.accumulated_data = b""
while not self.buffer.empty():
self.buffer.get()

def close(self):
pass
assert self.open, "Serial port is not open!"
self.open = False

def reset_input_buffer(self):
self.flush_buffer()
Expand Down
Loading