diff --git a/tools/can_tester/README.md b/tools/can_tester/README.md new file mode 100644 index 0000000..56bdb0b --- /dev/null +++ b/tools/can_tester/README.md @@ -0,0 +1,211 @@ +# CAN Bus Testing Tool + +PC-side CAN bus testing and debugging tool for the STM32LowLevel firmware. +Provides both an **interactive CLI** and a **web dashboard** for sending +commands and monitoring CAN bus traffic in real time. + +## Hardware + +- **USB2CAN adapter**: Innomaker USB2CAN MS124 ([Part-DB #233](https://part-db.teamisaac.it/en/part/233/info)) +- **Driver**: `gs_usb` — uses the WinUSB class driver on Windows, native kernel driver on Linux +- **Bitrate**: 1 Mbit/s arbitration / 2 Mbit/s data (CAN FD with BRS, Extended 29-bit identifiers) +- **Connector**: D-SUB 9-pin CAN. CAN-H on pin 7, CAN-L on pin 2, GND on pin 3 + +## Prerequisites + +- Python ≥ 3.10 +- The USB2CAN adapter plugged in via USB +- CAN bus wiring between the adapter and at least one STM32 module + +## Installation + +```bash +cd STM32LowLevel +pip install -r tools/can_tester/requirements.txt +``` + +> All commands below assume the working directory is `STM32LowLevel/` and +> `PYTHONPATH` includes `.` (or you use `python -m` from that directory). + +--- + +## Platform Setup + +### Linux (SocketCAN) + +The `gs_usb` kernel module is loaded automatically when you plug in the adapter. + +```bash +# Verify the device is detected +dmesg | grep gs_usb + +# Bring up the CAN FD interface at 1 Mbit/s arbitration + 2 Mbit/s data +sudo ip link set can0 up type can bitrate 1000000 dbitrate 2000000 fd on + +# Verify +ip -details link show can0 +``` + +### Windows (x64) + +If you have issues with the version of the driver that Windows automatically installs, you can manually switch to the WinUSB driver: + +1. Plug in the USB2CAN adapter. +2. Open **Device Manager** → find the device under *Universal Serial Bus devices* or *Other devices* + (it may appear as "USB2CAN" or "CAN"). +3. Right-click → **Update driver** → **Browse my computer for drivers** + → **Let me pick from a list** → select **Universal Serial Bus devices** + → choose **WinUSB Device** +4. No additional driver download is needed — the `gs_usb` Python package + talks to WinUSB directly via `pyusb` / `libusb`. + +> **Reference**: The [Innomaker USB2CAN GitHub repo](https://github.com/INNO-MAKER/usb2can) +> has the official user manual, Windows software, and firmware notes. + +### Windows ARM64 (e.g. Snapdragon laptops) + +The standard `libusb` wheel ships only x64 binaries. +On ARM64 Windows you must build `libusb` from source: + +1. **Install Visual Studio 2022** with the *ARM64* build tools + (C++ Desktop workload + ARM64 component). + +2. **Clone and build libusb**: + ```powershell + git clone https://github.com/libusb/libusb.git + cd libusb + # Open the VS solution and build the ARM64-Release target, or: + msbuild msvc\libusb_static_2022.vcxproj /p:Configuration=Release /p:Platform=ARM64 + ``` + +3. **Copy the DLL** next to your Python interpreter: + ```powershell + copy ARM64\Release\dll\libusb-1.0.dll "$env:LOCALAPPDATA\Programs\Python\Python313-arm64\" + ``` + +4. The CAN tester's `__init__.py` automatically detects ARM64 Windows and + patches `pyusb` to load `libusb-1.0.dll` from that location — no + manual environment variables needed. + +5. Then follow the same WinUSB driver steps described above. + +--- + +## Usage + +### Web Dashboard + +```bash +# Start with USB2CAN adapter (gs_usb) +python -m tools.can_tester.web_dashboard -i gs_usb -c 0 --port 8080 + +# Start with SocketCAN (Linux) +python -m tools.can_tester.web_dashboard -i socketcan -c can0 + +# Demo mode — no hardware required (virtual bus) +python -m tools.can_tester.web_dashboard --demo + +# Custom port +python -m tools.can_tester.web_dashboard -i gs_usb -c 0 -p 9000 +``` + +Open `http://localhost:8080` in a browser. Features: + +- Real-time message feed via SSE (auto-reconnecting) +- Filter by subsystem (arm, traction, joint, feedback) +- Module-aware command panel — shows arm commands for MOD1, joint commands for MOD2/MOD3 +- Target module selector with CAN address display +- Command descriptions with value ranges +- Emergency stop button +- Collapsible message statistics + +### Interactive CLI + +```bash +# Linux (SocketCAN) +python -m tools.can_tester -i socketcan -c can0 + +# Windows / Linux (gs_usb) +python -m tools.can_tester -i gs_usb -c 0 + +# Target a specific module +python -m tools.can_tester -i gs_usb -c 0 -m MK2_MOD2 +``` + +#### CLI Commands + +| Command | Description | +|---------|-------------| +| `send traction ` | Set traction motor speeds (RPM) | +| `send arm_j2 ` | Set arm elbow pitch (radians) | +| `send arm_j3 ` | Set arm roll J3 (radians) | +| `send arm_j4 ` | Set arm wrist pitch (radians) | +| `send arm_j5 ` | Set arm wrist roll (radians) | +| `send arm_1a1b <θ> <φ>` | Set arm J1 differential (radians) | +| `send beak open\|close` | Control beak gripper | +| `send reset_arm` | Move arm to home position | +| `send reboot_arm` | Reboot arm Dynamixel motors | +| `send reboot_traction` | Reboot traction motors | +| `stop` | Emergency stop all motors | +| `monitor [filter]` | Start live monitoring (filters: `all`, `arm`, `traction`, `joint`, `feedback`) | +| `test ` | Run a test sequence (see `procedure.md`) | +| `status` | Show message statistics | + +#### Test Sequences + +| Name | Description | +|------|-------------| +| `traction` | Forward → stop → reverse → stop | +| `traction_diff` | Turn left → turn right → spin | +| `arm_init` | Reset arm to home position | +| `arm_joints` | Move each arm joint individually | +| `arm_beak` | Beak open/close cycle | +| `arm_reboot` | Reboot and re-initialize arm motors | +| `full` | Complete diagnostic sequence | + +--- + +## Architecture + +``` +can_tester/ +├── __init__.py # ARM64 Windows libusb workaround +├── __main__.py # CLI entry point +├── protocol.py # CAN ID / MsgType definitions (synced with communication.h) +├── header_parser.py # C preprocessor for communication.h / mod_config.h validation +├── codec.py # Payload encode/decode (struct-based) +├── sender.py # High-level message sender +├── monitor.py # Real-time bus monitor with named filters +├── cli.py # Interactive REPL +├── web_dashboard.py # Flask web UI with SSE +├── test_sequences.py # Pre-built test routines +├── test_dry.py # Offline validation tests +├── requirements.txt # Python dependencies +├── procedure.md # Testing procedures and checklists +└── README.md # This file +``` + +## CAN Protocol + +See the [CAN bus protocol documentation](https://docs.teamisaac.it/doc/can-bus-protocol-t40e2NOEqp) +for the full message ID table. + +Extended CAN identifiers (29-bit) are encoded as: +- **Byte 0** `[0:7]`: Source address (sender module ID) +- **Byte 1** `[8:15]`: Destination address +- **Byte 2** `[16:23]`: Message type (PDU Format) + +| Address | Module | Subsystems | +|---------|--------|------------| +| `0x00` | Central (Jetson / PC) | — | +| `0x21` | MK2_MOD1 — Head | Traction + Robotic Arm (6-DOF + beak) | +| `0x22` | MK2_MOD2 — Body | Traction + Inter-module Joint | +| `0x23` | MK2_MOD3 — Tail | Traction + Inter-module Joint | + +## References + +- [CAN bus protocol](https://docs.teamisaac.it/doc/can-bus-protocol-t40e2NOEqp) +- [STM32LowLevel firmware](https://github.com/Team-Isaac-Polito/STM32LowLevel) +- [Innomaker USB2CAN repo](https://github.com/INNO-MAKER/usb2can) — user manual, Windows software, firmware +- [python-can documentation](https://python-can.readthedocs.io/) +- [Part-DB entry for USB2CAN MS124](https://part-db.teamisaac.it/en/part/233/info) diff --git a/tools/can_tester/__init__.py b/tools/can_tester/__init__.py new file mode 100644 index 0000000..92b9ead --- /dev/null +++ b/tools/can_tester/__init__.py @@ -0,0 +1,45 @@ +# STM32LowLevel CAN bus testing toolkit + +# ── Ensure libusb backend is available on ARM64 Windows ────────────────────── +# pyusb's auto-discovery doesn't find libusb-1.0.dll on ARM64 Windows. +# If the DLL exists next to the Python interpreter (or the base interpreter +# when running inside a venv), monkey-patch gs_usb to use it. +# This is a no-op on Linux or if the backend is already found. +import sys as _sys +import os as _os +import platform as _platform + +if _platform.system() == "Windows" and _platform.machine() == "ARM64": + # Check both venv prefix and base prefix (global install) + _candidate_dirs = list(dict.fromkeys([_sys.prefix, _sys.base_prefix])) + _dll_path = None + for _d in _candidate_dirs: + _p = _os.path.join(_d, "libusb-1.0.dll") + if _os.path.isfile(_p): + _dll_path = _p + break + + if _dll_path: + try: + import usb.backend.libusb1 as _libusb1 + _backend = _libusb1.get_backend(find_library=lambda _x: _dll_path) + if _backend is not None: + # Patch gs_usb.gs_usb to use our backend + import gs_usb.gs_usb as _gs + _orig_scan = _gs.GsUsb.scan + + @classmethod + def _patched_scan(cls): + import usb.core + devs = [] + for dev in usb.core.find( + find_all=True, + custom_match=cls.is_gs_usb_device, + backend=_backend, + ): + devs.append(cls(dev)) + return devs + + _gs.GsUsb.scan = _patched_scan + except ImportError: + pass diff --git a/tools/can_tester/__main__.py b/tools/can_tester/__main__.py new file mode 100644 index 0000000..96be704 --- /dev/null +++ b/tools/can_tester/__main__.py @@ -0,0 +1,4 @@ +"""Allow running as `python -m tools.can_tester`.""" +from .cli import main + +main() diff --git a/tools/can_tester/cli.py b/tools/can_tester/cli.py new file mode 100644 index 0000000..97313c0 --- /dev/null +++ b/tools/can_tester/cli.py @@ -0,0 +1,328 @@ +#!/usr/bin/env python3 +""" +Interactive CLI for STM32LowLevel CAN bus testing. + +Usage: + python -m tools.can_tester --interface socketcan --channel can0 + python -m tools.can_tester --interface gs_usb --channel 0 # Innomaker USB2CAN on Windows/Linux + python -m tools.can_tester --interface slcan --channel COM3 # Serial CAN adapter + +Commands: + send traction Set traction motor speeds + send arm_j2 Set arm elbow pitch + send arm_j3 Set arm roll J3 + send arm_j4 Set arm wrist pitch + send arm_j5 Set arm wrist roll + send arm_1a1b Set arm differential J1 + send beak open|close Control beak gripper + send reset_arm Move arm to home position + send set_home [permanent] Set current position as home (default: interim) + send reboot_arm Reboot arm motors + send reboot_traction Reboot traction motors + stop Emergency stop all motors + monitor [filter] Start monitoring (filter: all|arm|traction|joint|feedback) + test Run test sequence (traction|arm_init|arm_joints|arm_beak|full) + status Show monitor statistics + help Show this help + quit Exit +""" + +from __future__ import annotations + +import argparse +import sys +import threading +import time +from typing import Optional + +import can + +from .protocol import ModuleAddress +from .sender import CanSender +from .monitor import CanMonitor, NAMED_FILTERS +from .test_sequences import TESTS + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="STM32LowLevel CAN bus testing CLI", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "--interface", "-i", + default="gs_usb", + help="python-can interface type (default: gs_usb)", + ) + parser.add_argument( + "--channel", "-c", + default="0", + help="CAN channel (default: 0)", + ) + parser.add_argument( + "--bitrate", "-b", + type=int, + default=1000000, + help="CAN arbitration bitrate in bps (default: 1000000 = 1 Mbit/s)", + ) + parser.add_argument( + "--data-bitrate", "-D", + type=int, + default=2000000, + help="CAN FD data-phase bitrate in bps (default: 2000000 = 2 Mbit/s)", + ) + parser.add_argument( + "--module", "-m", + default="MK2_MOD1", + choices=["MK2_MOD1", "MK2_MOD2", "MK2_MOD3"], + help="Target module (default: MK2_MOD1)", + ) + return parser.parse_args() + + +class CLI: + """Interactive command-line interface for CAN testing.""" + + def __init__(self, bus: can.BusABC, target: int): + self.bus = bus + self.sender = CanSender(bus) + self.monitor = CanMonitor(bus) + self.target = target + self._monitor_thread: Optional[threading.Thread] = None + + def run(self) -> None: + """Main REPL loop.""" + print("STM32LowLevel CAN Tester — type 'help' for commands") + print(f"Target module: {ModuleAddress(self.target).name}") + print() + + while True: + try: + line = input("can> ").strip() + except (EOFError, KeyboardInterrupt): + print("\nExiting...") + break + + if not line: + continue + + parts = line.split() + cmd = parts[0].lower() + + try: + if cmd == "quit" or cmd == "exit" or cmd == "q": + break + elif cmd == "help" or cmd == "?": + self._help() + elif cmd == "send": + self._handle_send(parts[1:]) + elif cmd == "stop": + self.sender.stop_all() + print("Emergency stop sent to all modules.") + elif cmd == "monitor": + self._handle_monitor(parts[1:]) + elif cmd == "test": + self._handle_test(parts[1:]) + elif cmd == "status": + self._show_status() + else: + print(f"Unknown command: '{cmd}'. Type 'help' for commands.") + except Exception as e: + print(f"Error: {e}") + + self.monitor.stop() + + def _help(self) -> None: + print(__doc__) + + def _handle_send(self, args: list[str]) -> None: + if not args: + print("Usage: send [args...]") + return + + subcmd = args[0].lower() + + if subcmd == "traction" and len(args) == 3: + left, right = float(args[1]), float(args[2]) + self.sender.traction(left, right, self.target) + print(f"Traction: left={left} RPM, right={right} RPM") + + elif subcmd == "arm_1a1b" and len(args) == 3: + theta, phi = float(args[1]), float(args[2]) + self.sender.arm_pitch_1a1b(theta, phi) + print(f"Arm J1 differential: θ={theta}, φ={phi} rad") + + elif subcmd == "arm_j2" and len(args) == 2: + angle = float(args[1]) + self.sender.arm_pitch_j2(angle) + print(f"Arm J2 pitch: {angle} rad") + + elif subcmd == "arm_j3" and len(args) == 2: + angle = float(args[1]) + self.sender.arm_roll_j3(angle) + print(f"Arm J3 roll: {angle} rad") + + elif subcmd == "arm_j4" and len(args) == 2: + angle = float(args[1]) + self.sender.arm_pitch_j4(angle) + print(f"Arm J4 pitch: {angle} rad") + + elif subcmd == "arm_j5" and len(args) == 2: + angle = float(args[1]) + self.sender.arm_roll_j5(angle) + print(f"Arm J5 roll: {angle} rad") + + elif subcmd == "beak": + if len(args) < 2: + print("Usage: send beak open|close") + return + close = args[1].lower() in ("close", "1", "true") + self.sender.arm_beak(close=close) + print(f"Beak: {'close' if close else 'open'}") + + elif subcmd == "reset_arm": + self.sender.reset_arm() + print("Reset arm to home position.") + + elif subcmd == "set_home": + persist = len(args) > 1 and args[1] == "permanent" + self.sender.set_home(persist=persist) + mode = "permanent (flash)" if persist else "interim (session)" + print(f"Set home: {mode}") + + elif subcmd == "reboot_arm": + self.sender.reboot_arm() + print("Rebooting arm motors...") + + elif subcmd == "reboot_traction": + self.sender.reboot_traction(self.target) + print("Rebooting traction motors...") + + elif subcmd == "joint_1a1b" and len(args) == 3: + theta, phi = float(args[1]), float(args[2]) + self.sender.joint_pitch_1a1b(theta, phi, self.target) + print(f"Joint differential: θ={theta}, φ={phi} rad") + + elif subcmd == "joint_roll" and len(args) == 2: + angle = float(args[1]) + self.sender.joint_roll(angle, self.target) + print(f"Joint roll: {angle} rad") + + else: + print(f"Unknown send command: '{subcmd}' with {len(args) - 1} args") + print("Available: traction, arm_1a1b, arm_j2..j5, beak, reset_arm, " + "set_home, reboot_arm, reboot_traction, joint_1a1b, joint_roll") + + def _handle_monitor(self, args: list[str]) -> None: + filter_name = args[0].lower() if args else "all" + filter_set = NAMED_FILTERS.get(filter_name) + + if filter_name not in NAMED_FILTERS: + print(f"Unknown filter: '{filter_name}'") + print(f"Available: {', '.join(NAMED_FILTERS.keys())}") + return + + if self._monitor_thread and self._monitor_thread.is_alive(): + self.monitor.stop() + self._monitor_thread.join(timeout=2.0) + + print(f"Monitoring CAN bus (filter: {filter_name}). Press Ctrl+C to stop.") + + try: + self.monitor.run(filter_types=filter_set) + except KeyboardInterrupt: + self.monitor.stop() + print("\nMonitor stopped.") + + def _handle_test(self, args: list[str]) -> None: + if not args: + print(f"Available tests: {', '.join(TESTS.keys())}") + return + + test_name = args[0].lower() + test_fn = TESTS.get(test_name) + + if test_fn is None: + print(f"Unknown test: '{test_name}'") + print(f"Available: {', '.join(TESTS.keys())}") + return + + if self._monitor_thread and self._monitor_thread.is_alive(): + self.monitor.stop() + self._monitor_thread.join(timeout=2.0) + + self._monitor_thread = threading.Thread( + target=self.monitor.run, + kwargs={"quiet": True}, + daemon=True, + ) + self._monitor_thread.start() + time.sleep(0.2) # Let monitor start + + if test_name == "full": + test_fn(self.sender, self.monitor) + elif test_name in ("traction", "traction_diff"): + test_fn(self.sender, self.target) + else: + test_fn(self.sender) + + self.monitor.stop() + self._monitor_thread.join(timeout=2.0) + + def _show_status(self) -> None: + stats = self.monitor.stats + if not stats: + print("No messages received yet. Run 'monitor' first.") + return + + print("Message counts:") + for name, count in stats.items(): + print(f" {name}: {count}") + + print("\nLast values:") + for name, values in self.monitor.last_values.items(): + vals = {k: v for k, v in values.items() if not k.startswith("_")} + print(f" {name}: {vals}") + + +def main() -> None: + args = parse_args() + + target_map = { + "MK2_MOD1": ModuleAddress.MK2_MOD1, + "MK2_MOD2": ModuleAddress.MK2_MOD2, + "MK2_MOD3": ModuleAddress.MK2_MOD3, + } + target = target_map[args.module] + + print(f"Connecting to CAN bus: interface={args.interface}, " + f"channel={args.channel}, bitrate={args.bitrate}...") + + try: + bus = can.Bus( + interface=args.interface, + channel=args.channel, + bitrate=args.bitrate, + data_bitrate=args.data_bitrate, + fd=True, + ) + except Exception as e: + print(f"Failed to connect: {e}") + print("\nMake sure:") + print(" - USB2CAN adapter is connected") + print(" - On Linux: sudo ip link set can0 up type can bitrate 1000000 dbitrate 2000000 fd on") + print(" - On Windows: install gs_usb driver for Innomaker USB2CAN") + sys.exit(1) + + print("Connected.\n") + + try: + cli = CLI(bus, target) + cli.run() + finally: + bus.shutdown() + print("CAN bus closed.") + + +if __name__ == "__main__": + main() diff --git a/tools/can_tester/codec.py b/tools/can_tester/codec.py new file mode 100644 index 0000000..56ec339 --- /dev/null +++ b/tools/can_tester/codec.py @@ -0,0 +1,118 @@ +""" +Encode and decode CAN message payloads for STM32LowLevel protocol. + +Provides high-level encode/decode functions that use the PayloadFormat +descriptors from protocol.py. +""" + +from __future__ import annotations + +import struct +from typing import Any + +from .protocol import ( + MsgType, + PayloadFormat, + PAYLOAD_FORMATS, + encode_can_id, + decode_can_id, + DecodedID, + ModuleAddress, + MSG_NAMES, +) + + +def encode_payload(msg_type: int, **kwargs: Any) -> bytes: + """Encode keyword arguments into a CAN payload. + + Args: + msg_type: MsgType value. + **kwargs: Field values matching the PayloadFormat field names. + + Returns: + Packed bytes ready for CAN transmission. + + Raises: + ValueError: If msg_type has no known format or fields are missing. + + Example: + >>> encode_payload(MsgType.MOTOR_SETPOINT, right_rpm=10.0, left_rpm=10.0) + b'\\x00\\x00 A\\x00\\x00 A' + """ + fmt = PAYLOAD_FORMATS.get(msg_type) + if fmt is None: + raise ValueError(f"No payload format for msg_type 0x{msg_type:02X}") + + if not fmt.fmt: + return b"" # No-payload messages (RESET_ARM, REBOOT_ARM, etc.) + + values = [] + for field_name in fmt.fields: + if field_name not in kwargs: + raise ValueError( + f"Missing field '{field_name}' for {MSG_NAMES.get(msg_type, '?')}. " + f"Required: {fmt.fields}" + ) + values.append(kwargs[field_name]) + + return struct.pack(fmt.fmt, *values) + + +def decode_payload(msg_type: int, data: bytes) -> dict[str, Any]: + """Decode a CAN payload into a dict of named fields. + + Args: + msg_type: MsgType value. + data: Raw CAN payload bytes. + + Returns: + Dict mapping field names to decoded values. + Returns {"raw": data.hex()} for unknown message types. + """ + fmt = PAYLOAD_FORMATS.get(msg_type) + if fmt is None: + return {"raw": data.hex()} + + if not fmt.fmt: + return {} # No-payload messages + + try: + values = struct.unpack(fmt.fmt, data[: fmt.size]) + except struct.error: + return {"raw": data.hex(), "error": "payload size mismatch"} + + result = dict(zip(fmt.fields, values)) + result["_unit"] = fmt.unit + return result + + +def format_decoded(decoded_id: DecodedID, payload: dict[str, Any]) -> str: + """Format a decoded CAN message into a human-readable string. + + Args: + decoded_id: Parsed CAN identifier. + payload: Decoded payload dict from decode_payload(). + + Returns: + Single-line formatted string. + """ + parts = [ + f"[{decoded_id.source_name} → {decoded_id.destination_name}]", + decoded_id.msg_name, + ] + + unit = payload.pop("_unit", "") + + for key, value in payload.items(): + if key.startswith("_"): + continue + if isinstance(value, float): + parts.append(f"{key}={value:.4f}{' ' + unit if unit else ''}") + else: + parts.append(f"{key}={value}") + + # Restore unit for potential re-use + if unit: + payload["_unit"] = unit + + return " ".join(parts) diff --git a/tools/can_tester/header_parser.py b/tools/can_tester/header_parser.py new file mode 100644 index 0000000..fc65938 --- /dev/null +++ b/tools/can_tester/header_parser.py @@ -0,0 +1,280 @@ +""" +Minimal C preprocessor for STM32LowLevel header files. + +Parses ``#define`` macros from communication.h and mod_config.h at runtime, +correctly handling ``#if defined()``, ``#ifdef``, ``#ifndef``, +``#if A == B``, ``#elif defined()``, ``#elif A == B``, ``#else``, and +``#endif`` blocks so the Python CAN tester always reflects the firmware's +actual definitions — no manual sync needed. + +Usage:: + + from header_parser import parse_communication_header, parse_mod_config + + msg_defines = parse_communication_header() # all CAN message IDs + mod_defines = parse_mod_config("MK2_MOD1") # module-specific config +""" + +from __future__ import annotations + +import re +from pathlib import Path +from typing import Dict, Optional, Set + +# ─── paths ─────────────────────────────────────────────────────────────────── + +_DEFAULT_INCLUDE_DIR = ( + Path(__file__).resolve().parent.parent.parent + / "STM32LowLevel" / "Inc" +) + +# ─── module address map (for parse_mod_config value injection) ──────────────── + +_MODULE_ADDRESSES: dict[str, int] = { + "MK2_MOD1": 0x21, + "MK2_MOD2": 0x22, + "MK2_MOD3": 0x23, +} + +# ─── regex ─────────────────────────────────────────────────────────────────── + +# Matches: #define NAME 0xFF | #define NAME 123 +_DEFINE_VALUE_RE = re.compile( + r"^\s*#define\s+(\w+)\s+(0[xX][0-9a-fA-F]+|\d+)" +) +# Matches: #define NAME (flag-style, no value) +_DEFINE_FLAG_RE = re.compile( + r"^\s*#define\s+(\w+)\s*$" +) +# Matches: #define NAME IDENTIFIER (value is a non-numeric token) +_DEFINE_IDENT_RE = re.compile( + r"^\s*#define\s+(\w+)\s+([a-zA-Z_]\w*)\s*(?:\/\/.*)?$" +) +# Matches: #if defined(NAME) | #ifdef NAME +_IF_DEFINED_RE = re.compile( + r"^\s*#if\s+defined\s*\(\s*(\w+)\s*\)|^\s*#ifdef\s+(\w+)" +) +# Matches: #elif defined(NAME) +_ELIF_DEFINED_RE = re.compile( + r"^\s*#elif\s+defined\s*\(\s*(\w+)\s*\)" +) +# Matches: #ifndef NAME +_IFNDEF_RE = re.compile( + r"^\s*#ifndef\s+(\w+)" +) +# Matches: #if A == B (where A/B are hex literals, decimal, or identifiers) +_IF_EQ_RE = re.compile( + r"^\s*#if\s+(\w+)\s*==\s*(0[xX][0-9a-fA-F]+|\d+|\w+)" +) +# Matches: #elif A == B +_ELIF_EQ_RE = re.compile( + r"^\s*#elif\s+(\w+)\s*==\s*(0[xX][0-9a-fA-F]+|\d+|\w+)" +) + + +# ─── preprocessor engine ──────────────────────────────────────────────────── + +def preprocess_header( + path: Path, + predefined: Optional[Set[str]] = None, + predefined_values: Optional[Dict[str, int]] = None, +) -> Dict[str, int]: + """Parse a C header file through a minimal preprocessor. + + Handles ``#define``, ``#if defined()``, ``#ifdef``, ``#ifndef``, + ``#if A == B``, ``#elif defined()``, ``#elif A == B``, ``#else``, + and ``#endif``. + + Args: + path: Path to the ``.h`` file. + predefined: Set of macro names to treat as already defined + (flag-style, e.g. ``{"MK2_MOD1"}``). + predefined_values: Dict of macro name → integer value for value-style + predefined macros (e.g. ``{"MODULE_DEFINE": 0x21}``). + + Returns: + ``{MACRO_NAME: int_value}`` for every ``#define NAME `` + that is active given the conditionals. + """ + predefined = predefined or set() + predefined_values = predefined_values or {} + known_flags: Set[str] = set(predefined) # tracks flag defines + defines: Dict[str, int] = {} + + # Stack of (this_branch_active, any_branch_taken) per nesting level. + cond_stack: list[tuple[bool, bool]] = [] + + def _active() -> bool: + return all(active for active, _ in cond_stack) + + def _resolve(token: str) -> Optional[int]: + """Resolve a token to an integer: literal, define, or predefined_value.""" + try: + return int(token, 0) + except ValueError: + pass + if token in defines: + return defines[token] + if token in predefined_values: + return predefined_values[token] + return None + + with open(path, "r") as fh: + for raw_line in fh: + line = raw_line.strip() + + # ── #ifndef ───────────────────────────────────────────── + m = _IFNDEF_RE.match(line) + if m: + name = m.group(1) + active = name not in known_flags + cond_stack.append((active, active)) + continue + + # ── #if defined(NAME) / #ifdef NAME ───────────────────── + m = _IF_DEFINED_RE.match(line) + if m: + name = m.group(1) or m.group(2) + active = name in known_flags + cond_stack.append((active, active)) + continue + + # ── #if A == B ────────────────────────────────────────── + m = _IF_EQ_RE.match(line) + if m: + lhs = _resolve(m.group(1)) + rhs = _resolve(m.group(2)) + active = (lhs is not None and rhs is not None and lhs == rhs) + cond_stack.append((active, active)) + continue + + # ── #elif defined(NAME) ───────────────────────────────── + m = _ELIF_DEFINED_RE.match(line) + if m and cond_stack: + name = m.group(1) + _, any_taken = cond_stack[-1] + if any_taken: + cond_stack[-1] = (False, True) + else: + active = name in known_flags + cond_stack[-1] = (active, active) + continue + + # ── #elif A == B ──────────────────────────────────────── + m = _ELIF_EQ_RE.match(line) + if m and cond_stack: + _, any_taken = cond_stack[-1] + if any_taken: + cond_stack[-1] = (False, True) + else: + lhs = _resolve(m.group(1)) + rhs = _resolve(m.group(2)) + active = (lhs is not None and rhs is not None and lhs == rhs) + cond_stack[-1] = (active, active) + continue + + # ── #else ─────────────────────────────────────────────── + if line.startswith("#else") and cond_stack: + _, any_taken = cond_stack[-1] + cond_stack[-1] = (not any_taken, True) + continue + + # ── #endif ────────────────────────────────────────────── + if line.startswith("#endif") and cond_stack: + cond_stack.pop() + continue + + # Only process defines when all enclosing conditionals active + if not _active(): + continue + + # ── #define NAME VALUE (numeric) ──────────────────────── + m = _DEFINE_VALUE_RE.match(raw_line) + if m: + name, val = m.group(1), m.group(2) + defines[name] = int(val, 0) + known_flags.add(name) + continue + + # ── #define NAME IDENTIFIER (resolve via defines) ──────── + m = _DEFINE_IDENT_RE.match(raw_line) + if m: + name, ident = m.group(1), m.group(2) + val = _resolve(ident) + if val is not None: + defines[name] = val + known_flags.add(name) + continue + + # ── #define NAME (flag) ───────────────────────────────── + m = _DEFINE_FLAG_RE.match(raw_line) + if m: + known_flags.add(m.group(1)) + + return defines + + +# ─── convenience wrappers ──────────────────────────────────────────────────── + +def parse_communication_header( + include_dir: Optional[Path] = None, +) -> Dict[str, int]: + """Parse ``communication.h`` and return all CAN message-type defines. + + Returns: + ``{MACRO_NAME: int_value}`` — every ``#define`` with a numeric value, + excluding include guards. + + Raises: + FileNotFoundError: if communication.h is not at the expected path. + """ + include_dir = include_dir or _DEFAULT_INCLUDE_DIR + path = include_dir / "communication.h" + + if not path.exists(): + raise FileNotFoundError( + f"communication.h not found at {path}. " + f"Ensure you're running from within the STM32LowLevel repo." + ) + + return preprocess_header(path) + + +def parse_mod_config( + module_variant: str = "MK2_MOD1", + include_dir: Optional[Path] = None, +) -> Dict[str, int]: + """Parse ``mod_config.h`` with a specific module variant active. + + STM32LowLevel's mod_config.h uses ``#if MODULE_DEFINE == MK2_MOD1`` + value-comparison guards. The module variant address is injected as + ``MODULE_DEFINE`` so those conditionals resolve correctly. + + Args: + module_variant: One of MK2_MOD1, MK2_MOD2, MK2_MOD3. + include_dir: Override for the include directory path. + + Returns: + ``{MACRO_NAME: int_value}`` for numeric defines active under the + chosen variant. + + Raises: + FileNotFoundError: if mod_config.h is not at the expected path. + ValueError: if module_variant is not a known MK2 variant. + """ + if module_variant not in _MODULE_ADDRESSES: + raise ValueError( + f"Unknown module variant '{module_variant}'. " + f"Valid values: {list(_MODULE_ADDRESSES.keys())}" + ) + + include_dir = include_dir or _DEFAULT_INCLUDE_DIR + path = include_dir / "mod_config.h" + + if not path.exists(): + raise FileNotFoundError(f"mod_config.h not found at {path}") + + return preprocess_header( + path, + predefined_values={"MODULE_DEFINE": _MODULE_ADDRESSES[module_variant]}, + ) diff --git a/tools/can_tester/monitor.py b/tools/can_tester/monitor.py new file mode 100644 index 0000000..d6e3490 --- /dev/null +++ b/tools/can_tester/monitor.py @@ -0,0 +1,161 @@ +""" +Real-time CAN bus monitor with protocol-aware decoding. + +Listens for CAN messages and prints decoded output to the terminal. +""" + +from __future__ import annotations + +import can +import time +import sys +from typing import Optional, Callable + +from .protocol import decode_can_id, MsgType, MSG_NAMES +from .codec import decode_payload, format_decoded + + +class CanMonitor: + """Monitor CAN bus traffic with protocol-aware decoding.""" + + def __init__(self, bus: can.BusABC): + self.bus = bus + self._running = False + self._msg_counts: dict[int, int] = {} + self._last_values: dict[int, dict] = {} + self._callbacks: list[Callable] = [] + + def add_callback(self, callback: Callable) -> None: + """Register a callback for each received message. + + Callback signature: callback(decoded_id, payload, raw_msg) + """ + self._callbacks.append(callback) + + def run( + self, + filter_types: Optional[set[int]] = None, + duration: Optional[float] = None, + quiet: bool = False, + ) -> None: + """Start monitoring CAN bus traffic. + + Args: + filter_types: If set, only show these MsgType values. + duration: Run for this many seconds, then stop. None = forever. + quiet: If True, suppress terminal output (callbacks still fire). + """ + self._running = True + start = time.time() + + try: + while self._running: + if duration and (time.time() - start) >= duration: + break + + msg = self.bus.recv(timeout=0.1) + if msg is None: + continue + + decoded_id = decode_can_id(msg.arbitration_id) + + if filter_types and decoded_id.msg_type not in filter_types: + continue + + payload = decode_payload(decoded_id.msg_type, bytes(msg.data)) + + # Track statistics + self._msg_counts[decoded_id.msg_type] = ( + self._msg_counts.get(decoded_id.msg_type, 0) + 1 + ) + self._last_values[decoded_id.msg_type] = payload + + # Fire callbacks + for cb in self._callbacks: + cb(decoded_id, payload, msg) + + if not quiet: + ts = f"{msg.timestamp:.3f}" if msg.timestamp else f"{time.time():.3f}" + line = format_decoded(decoded_id, payload) + print(f"[{ts}] {line}") + + except KeyboardInterrupt: + pass + finally: + self._running = False + + def stop(self) -> None: + """Stop the monitor loop.""" + self._running = False + + @property + def stats(self) -> dict[str, int]: + """Return message count statistics.""" + return { + MSG_NAMES.get(k, f"0x{k:02X}"): v + for k, v in sorted(self._msg_counts.items()) + } + + @property + def last_values(self) -> dict[str, dict]: + """Return last decoded values for each message type.""" + return { + MSG_NAMES.get(k, f"0x{k:02X}"): v + for k, v in self._last_values.items() + } + + +# ─── Predefined filters ───────────────────────────────────────────────────── + +FILTER_ARM = { + MsgType.ARM_PITCH_1a1b_SETPOINT, MsgType.ARM_PITCH_1a1b_FEEDBACK, + MsgType.ARM_PITCH_2_SETPOINT, MsgType.ARM_PITCH_2_FEEDBACK, + MsgType.ARM_ROLL_3_SETPOINT, MsgType.ARM_ROLL_3_FEEDBACK, + MsgType.ARM_PITCH_4_SETPOINT, MsgType.ARM_PITCH_4_FEEDBACK, + MsgType.ARM_ROLL_5_SETPOINT, MsgType.ARM_ROLL_5_FEEDBACK, + MsgType.ARM_ROLL_6_SETPOINT, MsgType.ARM_ROLL_6_FEEDBACK, + MsgType.RESET_ARM, MsgType.REBOOT_ARM, + MsgType.ARM_PITCH_1a1b_FEEDBACK_VEL, MsgType.ARM_PITCH_2_FEEDBACK_VEL, + MsgType.ARM_ROLL_3_FEEDBACK_VEL, MsgType.ARM_PITCH_4_FEEDBACK_VEL, + MsgType.ARM_ROLL_5_FEEDBACK_VEL, MsgType.ARM_ROLL_6_FEEDBACK_VEL, + MsgType.MOTOR_ARM_ERROR_STATUS, +} + +FILTER_TRACTION = { + MsgType.MOTOR_SETPOINT, MsgType.MOTOR_FEEDBACK, + MsgType.MOTOR_TRACTION_REBOOT, MsgType.MOTOR_TRACTION_ERROR_STATUS, +} + +FILTER_JOINT = { + MsgType.JOINT_PITCH_1a1b_SETPOINT, MsgType.JOINT_PITCH_1a1b_FEEDBACK, + MsgType.JOINT_ROLL_2_SETPOINT, MsgType.JOINT_ROLL_2_FEEDBACK, + MsgType.JOINT_YAW_FEEDBACK, + MsgType.JOINT_PITCH_FEEDBACK, MsgType.JOINT_ROLL_FEEDBACK, +} + +FILTER_IMU = { + MsgType.JOINT_PITCH_FEEDBACK, MsgType.JOINT_ROLL_FEEDBACK, + MsgType.IMU_RAW_ACCEL, MsgType.IMU_RAW_GYRO, +} + +FILTER_FEEDBACK = { + MsgType.MOTOR_FEEDBACK, MsgType.JOINT_YAW_FEEDBACK, + MsgType.JOINT_PITCH_FEEDBACK, MsgType.JOINT_ROLL_FEEDBACK, + MsgType.ARM_PITCH_1a1b_FEEDBACK, MsgType.ARM_PITCH_2_FEEDBACK, + MsgType.ARM_ROLL_3_FEEDBACK, MsgType.ARM_PITCH_4_FEEDBACK, + MsgType.ARM_ROLL_5_FEEDBACK, MsgType.ARM_ROLL_6_FEEDBACK, + MsgType.ARM_PITCH_1a1b_FEEDBACK_VEL, MsgType.ARM_PITCH_2_FEEDBACK_VEL, + MsgType.ARM_ROLL_3_FEEDBACK_VEL, MsgType.ARM_PITCH_4_FEEDBACK_VEL, + MsgType.ARM_ROLL_5_FEEDBACK_VEL, MsgType.ARM_ROLL_6_FEEDBACK_VEL, + MsgType.JOINT_PITCH_1a1b_FEEDBACK, MsgType.JOINT_ROLL_2_FEEDBACK, + MsgType.MOTOR_TRACTION_ERROR_STATUS, MsgType.MOTOR_ARM_ERROR_STATUS, +} + +NAMED_FILTERS = { + "arm": FILTER_ARM, + "traction": FILTER_TRACTION, + "joint": FILTER_JOINT, + "imu": FILTER_IMU, + "feedback": FILTER_FEEDBACK, + "all": None, +} diff --git a/tools/can_tester/procedure.md b/tools/can_tester/procedure.md new file mode 100644 index 0000000..0812ae7 --- /dev/null +++ b/tools/can_tester/procedure.md @@ -0,0 +1,159 @@ +# CAN Tester — Testing Procedures + +This document describes when and how to use the CAN testing tool during +development, integration, and maintenance of the STM32LowLevel firmware. + +--- + +## When to Test + +| Scenario | What to test | Tool | +|----------|-------------|------| +| **New firmware upload** | All motors respond, feedback messages arrive | Web dashboard — full diagnostic | +| **Mechanical change** (shaft/arm repositioned) | Arm home position is safe, no collisions | Dashboard — `reset_arm`, monitor feedback | +| **After fixing a bug** | Specific subsystem affected by the fix | Dashboard or CLI — targeted commands | +| **CAN wiring change** | All modules reachable, no bus errors | Dashboard — listen to all three modules | +| **New motor / Dynamixel replaced** | Motor responds to setpoints, error status clear | Dashboard — individual joint commands | +| **Pre-competition check** | End-to-end system health | CLI — `test full` sequence | +| **Debugging intermittent issues** | Monitor traffic patterns over time | Dashboard — leave running with filters | + +--- + +## Procedure 1: Verify Traction Motors + +**Purpose**: Confirm both traction motors on a module respond correctly. + +1. Power on the module. Connect USB2CAN to the CAN bus. +2. Start dashboard: `python -m tools.can_tester.web_dashboard -i gs_usb -c 0 --port 8080` +3. Select the target module (e.g. MK2_MOD1). +4. Select **Traction Motors** command. +5. Enter small values first: Right RPM = `5`, Left RPM = `5`. Click **Send**. +6. Observe: + - Motors should spin forward at low speed. + - `MOTOR_FEEDBACK` messages should appear in the live feed showing actual RPM. +7. Try differential: Right = `20`, Left = `-20` (spin in place). +8. Click **STOP ALL** to halt. +9. Check for `MOTOR_TRACTION_ERROR_STATUS` messages — both bytes should be `0`. + +**Expected**: Motors respond within ~100 ms, feedback RPM matches setpoint +within ±10%. + +--- + +## Procedure 2: Verify Robotic Arm (MOD1 Only) + +**Purpose**: Confirm all 6 arm joints + beak respond correctly. + +> **Safety**: Ensure the arm is free to move. Keep hands clear. Start with +> small angles. + +1. Start dashboard, select **MK2_MOD1**. +2. Send **Reset Arm** — arm should move to home position. +3. Monitor `ARM_*_FEEDBACK` messages to confirm positions near zero. +4. Test each joint individually: + - **Arm J1 (1a1b)**: Theta = `0.1`, Phi = `0.0` → shoulder should yaw slightly. + - **Arm J2**: Angle = `0.2` → elbow should bend slightly. + - **Arm J3**: Angle = `0.1` → forearm should rotate slightly. + - **Arm J4**: Angle = `0.1` → wrist should pitch slightly. + - **Arm J5**: Angle = `0.1` → wrist should rotate slightly. +5. Send **Beak Close**, then **Beak Open** — gripper should actuate. +6. Send **Reset Arm** to return to home. +7. Check `MOTOR_ARM_ERROR_STATUS` — all 7 bytes should be `0`. + +**Expected**: Each joint moves smoothly, feedback matches setpoint within +~0.05 rad. No error flags. + +--- + +## Procedure 3: Verify Inter-Module Joints (MOD2/MOD3) + +**Purpose**: Confirm the inter-module joint motors on middle/tail modules work. + +1. Start dashboard, select **MK2_MOD2** (or MOD3). +2. Select **Joint Pitch (1a1b)**: Theta = `0.1`, Phi = `0.0`. Send. +3. Observe `JOINT_PITCH_1a1b_FEEDBACK` — should reflect the setpoint. +4. Select **Joint Roll**: Angle = `0.1`. Send. +5. Observe `JOINT_ROLL_2_FEEDBACK`. +6. Return to zero: send both with `0.0`. + +**Expected**: Joint responds, feedback matches within ~0.05 rad. + +--- + +## Procedure 4: CAN Bus Health Check + +**Purpose**: Verify all modules are alive and communicating. + +1. Start dashboard in monitoring mode (no commands needed). +2. Filter: **All**. Watch for periodic messages from each module: + - `BATTERY_VOLTAGE`, `BATTERY_PERCENT` from each module. + - `MOTOR_FEEDBACK` if traction is active. +3. Check the **Statistics** panel — each expected message type should + have a nonzero count. +4. If a module is missing: check CAN wiring, termination resistor (120 Ω), + and power supply. + +--- + +## Procedure 5: Error Recovery + +**Purpose**: Clear motor error states and verify recovery. + +1. If `MOTOR_TRACTION_ERROR_STATUS` or `MOTOR_ARM_ERROR_STATUS` shows + nonzero values: +2. Send **Reboot Traction** (for traction errors) or **Reboot Arm** + (for arm errors). +3. Wait 2–3 seconds for motors to re-initialize. +4. Send a small setpoint to verify the motor responds. +5. Monitor error status — should return to all zeros. + +--- + +## Procedure 6: Full Diagnostic (CLI) + +**Purpose**: Automated end-to-end test of all subsystems. + +```bash +python -m tools.can_tester -i gs_usb -c 0 +> test full +``` + +This runs all test sequences in order: +1. Traction forward/stop/reverse +2. Differential turning +3. Arm reset + individual joint sweep +4. Beak open/close +5. Arm motor reboot + +Monitor the output for any failures or timeouts. + +--- + +## Procedure 7: Offline Validation + +**Purpose**: Verify the tool's protocol layer without hardware. + +```bash +cd STM32LowLevel +python -m tools.can_tester.test_dry +``` + +This validates: +- CAN ID encoding/decoding round-trip +- Payload codec for all message types +- Header parser output matches protocol definitions +- MsgType enum completeness (all 44 message types) + +Run this after any change to `protocol.py`, `codec.py`, or `communication.h`. + +--- + +## Troubleshooting + +| Symptom | Likely cause | Fix | +|---------|-------------|-----| +| "No gs_usb device found" | WinUSB driver not installed | See README.md — Windows setup | +| Dashboard shows no messages | CAN wiring disconnected or wrong bitrate | Check wiring; must be 125 kbps | +| Motors don't respond to setpoints | Module not powered or CAN ID mismatch | Verify module address in module selector | +| `MOTOR_*_ERROR_STATUS` nonzero | Motor in error state (overload, overtemp) | Send reboot command, check mechanical load | +| Messages from unknown address | MK1 module or misconfigured firmware | Check `mod_config.h` build defines | diff --git a/tools/can_tester/protocol.py b/tools/can_tester/protocol.py new file mode 100644 index 0000000..c2197ab --- /dev/null +++ b/tools/can_tester/protocol.py @@ -0,0 +1,277 @@ +""" +CAN bus protocol definitions for STM32LowLevel firmware. + +Defines the authoritative MsgType enum and CAN ID encoding/decoding. +At import time, optionally validates against communication.h via +header_parser — if the header has new or changed defines, a warning +is printed so the developer can update this file. + +Extended CAN (29-bit) identifiers based on J1939: + - SA [0:7] Source Address (module ID) + - PS [8:15] Destination Address + - PF [16:23] PDU Format (message type) + - EFF bit[31] Extended Frame Flag (set by CAN driver) + +Identifier encoding (little-endian bytes): + byte0 = source_address + byte1 = destination_address + byte2 = message_type (PDU Format) + byte3 = 0x00 (set by CAN driver for EFF) +""" + +from __future__ import annotations + +import struct +import warnings +from enum import IntEnum +from dataclasses import dataclass, field +from typing import Optional + + +# ─── Module addresses ──────────────────────────────────────────────────────── + +class ModuleAddress(IntEnum): + """Module CAN addresses. Format: 0xYX where Y=version, X=ordinal.""" + CENTRAL = 0x00 # Jetson / PC + MK2_MOD1 = 0x21 # Head module (arm) + MK2_MOD2 = 0x22 # Middle module (joint) + MK2_MOD3 = 0x23 # Tail module (joint) + + +# ─── Message types (authoritative — manually synced with communication.h) ─── + +class MsgType(IntEnum): + """CAN message types. Keep in sync with STM32LowLevel/Inc/communication.h.""" + + # Battery + BATTERY_VOLTAGE = 0x11 + BATTERY_PERCENT = 0x12 + BATTERY_TEMPERATURE = 0x13 + + # Traction + MOTOR_SETPOINT = 0x21 + MOTOR_FEEDBACK = 0x22 + + # Joint / IMU orientation feedback + JOINT_YAW_FEEDBACK = 0x32 + JOINT_PITCH_FEEDBACK = 0x34 + JOINT_ROLL_FEEDBACK = 0x36 + + # End Effector (MK1 legacy) + DATA_EE_PITCH_SETPOINT = 0x41 + DATA_EE_PITCH_FEEDBACK = 0x42 + DATA_EE_HEAD_PITCH_SETPOINT = 0x43 + DATA_EE_HEAD_PITCH_FEEDBACK = 0x44 + DATA_EE_HEAD_ROLL_SETPOINT = 0x45 + DATA_EE_HEAD_ROLL_FEEDBACK = 0x46 + + # Robotic arm (MOD1 only) + ARM_PITCH_1a1b_SETPOINT = 0x51 + ARM_PITCH_1a1b_FEEDBACK = 0x52 + ARM_PITCH_2_SETPOINT = 0x53 + ARM_PITCH_2_FEEDBACK = 0x54 + ARM_ROLL_3_SETPOINT = 0x55 + ARM_ROLL_3_FEEDBACK = 0x56 + ARM_PITCH_4_SETPOINT = 0x57 + ARM_PITCH_4_FEEDBACK = 0x58 + ARM_ROLL_5_SETPOINT = 0x59 + ARM_ROLL_5_FEEDBACK = 0x5A + ARM_ROLL_6_SETPOINT = 0x5B + ARM_ROLL_6_FEEDBACK = 0x5C + RESET_ARM = 0x5D + REBOOT_ARM = 0x5E + SET_HOME = 0x5F + + # Inter-module joints (MOD2/MOD3) + JOINT_PITCH_1a1b_SETPOINT = 0x61 + JOINT_PITCH_1a1b_FEEDBACK = 0x62 + JOINT_ROLL_2_SETPOINT = 0x63 + JOINT_ROLL_2_FEEDBACK = 0x64 + + # Motor control / status + MOTOR_TRACTION_REBOOT = 0x71 + MOTOR_TRACTION_ERROR_STATUS = 0x72 + MOTOR_ARM_ERROR_STATUS = 0x73 + + # Arm velocity feedback + ARM_PITCH_1a1b_FEEDBACK_VEL = 0x80 + ARM_PITCH_2_FEEDBACK_VEL = 0x81 + ARM_ROLL_3_FEEDBACK_VEL = 0x82 + ARM_PITCH_4_FEEDBACK_VEL = 0x83 + ARM_ROLL_5_FEEDBACK_VEL = 0x84 + ARM_ROLL_6_FEEDBACK_VEL = 0x85 + + # IMU raw data (debug) + IMU_RAW_ACCEL = 0x92 + IMU_RAW_GYRO = 0x93 + + +# ─── Validate against communication.h (optional) ──────────────────────────── + +try: + from .header_parser import parse_communication_header + _header_defines = parse_communication_header() + + # Check for defines in the header that are missing or different here + _py_defines = {m.name: m.value for m in MsgType} + _missing = {k: v for k, v in _header_defines.items() if k not in _py_defines} + _changed = { + k: (v, _py_defines[k]) + for k, v in _header_defines.items() + if k in _py_defines and v != _py_defines[k] + } + _extra = {k: v for k, v in _py_defines.items() if k not in _header_defines} + if _missing or _changed or _extra: + parts = [] + if _missing: + parts.append(f"New defines in communication.h not in protocol.py: {_missing}") + if _changed: + parts.append(f"Changed values: {_changed}") + if _extra: + parts.append(f"Python-only entries not in communication.h: {_extra}") + warnings.warn( + "protocol.py is out of sync with communication.h — " + + "; ".join(parts), + stacklevel=2, + ) +except (FileNotFoundError, ImportError): + pass # Running outside the repo — skip validation + + +# ─── Human-readable names ─────────────────────────────────────────────────── + +MSG_NAMES: dict[int, str] = {m.value: m.name for m in MsgType} +MODULE_NAMES: dict[int, str] = {m.value: m.name for m in ModuleAddress} + + +# ─── Payload format descriptors ───────────────────────────────────────────── + +@dataclass +class PayloadFormat: + """Describes the encoding of a CAN message payload.""" + fmt: str # struct format string (little-endian) + fields: list[str] # field names + unit: str = "" # unit label + size: int = field(init=False) + + def __post_init__(self): + self.size = struct.calcsize(self.fmt) + + +# Maps MsgType → PayloadFormat +PAYLOAD_FORMATS: dict[int, PayloadFormat] = { + # Traction + MsgType.MOTOR_SETPOINT: PayloadFormat(" int: + """Build a 29-bit Extended CAN identifier. + + Layout (little-endian byte view): + byte0 = source address [bits 0:7] + byte1 = destination [bits 8:15] + byte2 = msg_type (PF) [bits 16:23] + byte3 = 0 [bits 24:28] + """ + return (msg_type << 16) | (destination << 8) | source + + +@dataclass +class DecodedID: + """Parsed fields from a 29-bit CAN identifier.""" + raw: int + source: int + destination: int + msg_type: int + source_name: str + destination_name: str + msg_name: str + + +def decode_can_id(can_id: int) -> DecodedID: + """Parse a 29-bit Extended CAN identifier into its component fields.""" + source = can_id & 0xFF + destination = (can_id >> 8) & 0xFF + msg_type = (can_id >> 16) & 0xFF + + return DecodedID( + raw=can_id, + source=source, + destination=destination, + msg_type=msg_type, + source_name=MODULE_NAMES.get(source, f"0x{source:02X}"), + destination_name=MODULE_NAMES.get(destination, f"0x{destination:02X}"), + msg_name=MSG_NAMES.get(msg_type, f"UNKNOWN_0x{msg_type:02X}"), + ) diff --git a/tools/can_tester/requirements.txt b/tools/can_tester/requirements.txt new file mode 100644 index 0000000..8eac1da --- /dev/null +++ b/tools/can_tester/requirements.txt @@ -0,0 +1,3 @@ +python-can>=4.0 +flask>=3.0 +gs_usb>=0.3 diff --git a/tools/can_tester/sender.py b/tools/can_tester/sender.py new file mode 100644 index 0000000..7b4a84a --- /dev/null +++ b/tools/can_tester/sender.py @@ -0,0 +1,146 @@ +""" +CAN message sender with convenience methods for STM32LowLevel commands. + +Wraps python-can's Bus object and provides typed, protocol-aware send methods. +""" + +from __future__ import annotations + +import can +import time +from typing import Optional + +from .protocol import MsgType, ModuleAddress, encode_can_id +from .codec import encode_payload + + +class CanSender: + """Send CAN messages to STM32LowLevel modules.""" + + def __init__( + self, + bus: can.BusABC, + source: int = ModuleAddress.CENTRAL, + ): + self.bus = bus + self.source = source + + def send_raw( + self, + msg_type: int, + data: bytes, + destination: int = ModuleAddress.MK2_MOD1, + ) -> None: + """Send a raw CAN message.""" + can_id = encode_can_id(self.source, destination, msg_type) + msg = can.Message( + arbitration_id=can_id, + data=data, + is_extended_id=True, + is_fd=True, + bitrate_switch=True, + ) + self.bus.send(msg) + + def send( + self, + msg_type: int, + destination: int = ModuleAddress.MK2_MOD1, + **kwargs, + ) -> None: + """Encode and send a CAN message. + + Args: + msg_type: MsgType value. + destination: Target module address. + **kwargs: Payload fields matching the PayloadFormat. + """ + data = encode_payload(msg_type, **kwargs) + self.send_raw(msg_type, data, destination) + + # ─── Convenience methods ───────────────────────────────────────────── + + def traction( + self, + left_rpm: float, + right_rpm: float, + destination: int = ModuleAddress.MK2_MOD1, + ) -> None: + """Set traction motor speeds.""" + self.send( + MsgType.MOTOR_SETPOINT, + destination, + right_rpm=right_rpm, + left_rpm=left_rpm, + ) + + def arm_pitch_1a1b(self, theta: float, phi: float) -> None: + """Set arm J1 differential pitch/yaw.""" + self.send(MsgType.ARM_PITCH_1a1b_SETPOINT, theta=theta, phi=phi) + + def arm_pitch_j2(self, angle: float) -> None: + """Set arm elbow pitch J2.""" + self.send(MsgType.ARM_PITCH_2_SETPOINT, angle=angle) + + def arm_roll_j3(self, angle: float) -> None: + """Set arm roll J3.""" + self.send(MsgType.ARM_ROLL_3_SETPOINT, angle=angle) + + def arm_pitch_j4(self, angle: float) -> None: + """Set arm wrist pitch J4.""" + self.send(MsgType.ARM_PITCH_4_SETPOINT, angle=angle) + + def arm_roll_j5(self, angle: float) -> None: + """Set arm wrist roll J5.""" + self.send(MsgType.ARM_ROLL_5_SETPOINT, angle=angle) + + def arm_beak(self, close: bool) -> None: + """Control beak gripper. close=True to close, False to open.""" + # Firmware convention: 0 = close, 1 = open + self.send(MsgType.ARM_ROLL_6_SETPOINT, command=0 if close else 1) + + def reset_arm(self) -> None: + """Move arm to calibrated initial position.""" + self.send(MsgType.RESET_ARM) + + def reboot_arm(self) -> None: + """Reboot all arm Dynamixel motors.""" + self.send(MsgType.REBOOT_ARM) + + def set_home(self, persist: bool = False) -> None: + """Set current arm position as home. persist=True saves to flash.""" + self.send(MsgType.SET_HOME, persist=1 if persist else 0) + + def reboot_traction( + self, + destination: int = ModuleAddress.MK2_MOD1, + ) -> None: + """Reboot traction Dynamixel motors.""" + self.send(MsgType.MOTOR_TRACTION_REBOOT, destination) + + def joint_pitch_1a1b( + self, + theta: float, + phi: float, + destination: int = ModuleAddress.MK2_MOD2, + ) -> None: + """Set inter-module joint differential pitch/yaw.""" + self.send( + MsgType.JOINT_PITCH_1a1b_SETPOINT, + destination, + theta=theta, + phi=phi, + ) + + def joint_roll( + self, + angle: float, + destination: int = ModuleAddress.MK2_MOD2, + ) -> None: + """Set inter-module joint roll.""" + self.send(MsgType.JOINT_ROLL_2_SETPOINT, destination, angle=angle) + + def stop_all(self) -> None: + """Emergency stop: zero all motor speeds.""" + for dest in [ModuleAddress.MK2_MOD1, ModuleAddress.MK2_MOD2, ModuleAddress.MK2_MOD3]: + self.traction(0.0, 0.0, dest) diff --git a/tools/can_tester/test_dry.py b/tools/can_tester/test_dry.py new file mode 100644 index 0000000..361e7b4 --- /dev/null +++ b/tools/can_tester/test_dry.py @@ -0,0 +1,133 @@ +"""Dry test — validates all CAN tester logic without hardware.""" + +import sys + +def main(): + errors = 0 + + # 1. Import test + print("=== Import test ===") + try: + from tools.can_tester import ( + protocol, codec, sender, monitor, cli, + test_sequences, web_dashboard, header_parser, + ) + print(" All modules imported OK") + except Exception as e: + print(f" FAIL: {e}") + errors += 1 + sys.exit(1) + + # 2. CAN ID encode/decode + print("\n=== CAN ID encode/decode ===") + from tools.can_tester.protocol import ( + encode_can_id, decode_can_id, MsgType, ModuleAddress, + ) + + can_id = encode_can_id( + ModuleAddress.CENTRAL, ModuleAddress.MK2_MOD1, + MsgType.ARM_PITCH_2_SETPOINT, + ) + print(f" encode(CENTRAL -> MK2_MOD1, ARM_PITCH_2_SETPOINT) = 0x{can_id:08X}") + + decoded = decode_can_id(can_id) + assert decoded.source == ModuleAddress.CENTRAL, f"src mismatch: {decoded.source}" + assert decoded.destination == ModuleAddress.MK2_MOD1, f"dst mismatch: {decoded.destination}" + assert decoded.msg_type == MsgType.ARM_PITCH_2_SETPOINT, f"msg mismatch: {decoded.msg_type}" + print(f" decode -> {decoded.source_name} -> {decoded.destination_name}: {decoded.msg_name}") + print(" Round-trip OK") + + # 3. Payload codec + print("\n=== Payload codec ===") + from tools.can_tester.codec import encode_payload, decode_payload, format_decoded + from tools.can_tester.protocol import PAYLOAD_FORMATS, DecodedID + + # Test traction motor setpoint (two floats: right_rpm, left_rpm) + data = encode_payload(MsgType.MOTOR_SETPOINT, right_rpm=100.0, left_rpm=-50.0) + vals = decode_payload(MsgType.MOTOR_SETPOINT, data) + assert abs(vals["right_rpm"] - 100.0) < 0.01, f"right_rpm mismatch: {vals}" + assert abs(vals["left_rpm"] - (-50.0)) < 0.01, f"left_rpm mismatch: {vals}" + print(f" MOTOR_SETPOINT encode -> {data.hex()} -> {vals} OK") + + # Test arm single float + data2 = encode_payload(MsgType.ARM_PITCH_2_SETPOINT, angle=1.5708) + vals2 = decode_payload(MsgType.ARM_PITCH_2_SETPOINT, data2) + assert abs(vals2["angle"] - 1.5708) < 0.001, f"arm mismatch: {vals2}" + print(f" ARM_PITCH_2_SETPOINT encode -> {data2.hex()} -> OK") + + # Test no-payload messages + data3 = encode_payload(MsgType.RESET_ARM) + assert data3 == b"", f"RESET_ARM should be empty: {data3}" + vals3 = decode_payload(MsgType.RESET_ARM, data3) + assert vals3 == {}, f"RESET_ARM decode should be empty: {vals3}" + print(f" RESET_ARM (no payload) OK") + + # Test format_decoded + test_id = decode_can_id(encode_can_id( + ModuleAddress.CENTRAL, ModuleAddress.MK2_MOD1, MsgType.MOTOR_SETPOINT + )) + fmt = format_decoded(test_id, vals) + print(f" format_decoded: {fmt}") + + # 4. Header parser — STM32LowLevel MK2 module variants + print("\n=== Module config variants ===") + for variant in ["MK2_MOD1", "MK2_MOD2", "MK2_MOD3"]: + defs = header_parser.parse_mod_config(variant) + can_id_val = defs.get("CAN_ID", 0) + print(f" {variant}: CAN_ID=0x{can_id_val:02X}, {len(defs)} defines") + assert "CAN_ID" in defs, f"{variant} missing CAN_ID" + + # Verify CAN_ID values match expected module addresses + assert header_parser.parse_mod_config("MK2_MOD1")["CAN_ID"] == 0x21, \ + "MK2_MOD1 CAN_ID should be 0x21" + assert header_parser.parse_mod_config("MK2_MOD2")["CAN_ID"] == 0x22, \ + "MK2_MOD2 CAN_ID should be 0x22" + assert header_parser.parse_mod_config("MK2_MOD3")["CAN_ID"] == 0x23, \ + "MK2_MOD3 CAN_ID should be 0x23" + print(" CAN_ID values OK") + + # Verify conditional exclusion: MK2_MOD1 should have ARM servo IDs, not JOINT + mk2_1 = header_parser.parse_mod_config("MK2_MOD1") + assert "SERVO_ARM_2_PITCH_ID" in mk2_1, "MK2_MOD1 missing SERVO_ARM_2_PITCH_ID" + print(" MK2_MOD1 servo defines OK") + + # Verify MK2_MOD2/3 do not have arm servo IDs (only joint + yaw) + mk2_2 = header_parser.parse_mod_config("MK2_MOD2") + assert "SERVO_ARM_2_PITCH_ID" not in mk2_2, "MK2_MOD2 should not have ARM servo IDs" + print(" MK2_MOD2 conditional isolation OK") + + # 5. Test sequences — verify importable + print("\n=== Test sequences ===") + from tools.can_tester import test_sequences + seq_funcs = [ + name for name in dir(test_sequences) + if name.startswith("test_") and callable(getattr(test_sequences, name)) + ] + for name in seq_funcs: + print(f" {name}") + assert len(seq_funcs) >= 5, f"Expected >= 5 test functions, got {len(seq_funcs)}" + print(f" {len(seq_funcs)} test functions OK") + + # 6. MsgType completeness — STM32LowLevel has 44 message types + print(f"\n=== MsgType enum ===") + msg_count = len(list(MsgType)) + print(f" {msg_count} message types loaded") + assert msg_count == 44, f"Expected 44 message types, got {msg_count}" + + # Spot-check key entries + assert MsgType.REBOOT_ARM == 0x5E + assert MsgType.ARM_ROLL_6_FEEDBACK_VEL == 0x85 + assert MsgType.MOTOR_SETPOINT == 0x21 + assert MsgType.IMU_RAW_GYRO == 0x93 + print(" Spot checks OK") + + print(f"\n{'='*40}") + if errors == 0: + print("All tests PASSED!") + else: + print(f"{errors} test(s) FAILED") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/tools/can_tester/test_sequences.py b/tools/can_tester/test_sequences.py new file mode 100644 index 0000000..d971245 --- /dev/null +++ b/tools/can_tester/test_sequences.py @@ -0,0 +1,211 @@ +""" +Pre-built test sequences for STM32LowLevel CAN bus validation. + +Each test function takes a CanSender and CanMonitor and runs a scripted +sequence of commands, printing results. +""" + +from __future__ import annotations + +import time +import math +from typing import Optional + +from .protocol import MsgType, ModuleAddress +from .sender import CanSender +from .monitor import CanMonitor, FILTER_ARM, FILTER_TRACTION + + +def test_traction_basic( + sender: CanSender, + destination: int = ModuleAddress.MK2_MOD1, + speed: float = 5.0, + duration: float = 3.0, +) -> None: + """Basic traction motor test: forward → stop → reverse → stop. + + Args: + sender: CanSender instance. + destination: Module to test. + speed: RPM value to use. + duration: Seconds per phase. + """ + print(f"=== Traction motor test (speed={speed} RPM, {duration}s per phase) ===") + + print(f" Forward ({speed} RPM)...") + sender.traction(speed, speed, destination) + time.sleep(duration) + + print(" Stop...") + sender.traction(0.0, 0.0, destination) + time.sleep(1.0) + + print(f" Reverse ({-speed} RPM)...") + sender.traction(-speed, -speed, destination) + time.sleep(duration) + + print(" Stop...") + sender.traction(0.0, 0.0, destination) + print("=== Traction test complete ===\n") + + +def test_traction_differential( + sender: CanSender, + destination: int = ModuleAddress.MK2_MOD1, + speed: float = 5.0, + duration: float = 2.0, +) -> None: + """Differential traction test: left → right → spin. + + Args: + sender: CanSender instance. + destination: Module to test. + speed: RPM value. + duration: Seconds per phase. + """ + print(f"=== Differential traction test ===") + + print(f" Turn left (right={speed}, left=0)...") + sender.traction(0.0, speed, destination) + time.sleep(duration) + + print(f" Turn right (right=0, left={speed})...") + sender.traction(speed, 0.0, destination) + time.sleep(duration) + + print(f" Spin (right={speed}, left={-speed})...") + sender.traction(-speed, speed, destination) + time.sleep(duration) + + print(" Stop...") + sender.traction(0.0, 0.0, destination) + print("=== Differential test complete ===\n") + + +def test_arm_init(sender: CanSender) -> None: + """Test arm initialization: reset → verify feedback. + + Sends RESET_ARM and waits for feedback messages. + """ + print("=== Arm initialization test ===") + + print(" Sending RESET_ARM...") + sender.reset_arm() + time.sleep(3.0) + + print(" Arm should now be at home position.") + print(" Check feedback messages for position confirmation.") + print("=== Arm init test complete ===\n") + + +def test_arm_joints( + sender: CanSender, + amplitude: float = 0.2, + duration: float = 2.0, +) -> None: + """Move each arm joint individually by a small amount. + + Args: + sender: CanSender instance. + amplitude: Angle in radians to move each joint. + duration: Seconds to wait between movements. + """ + print(f"=== Arm joint test (amplitude={amplitude} rad) ===") + + joints = [ + ("J1 (pitch 1a1b)", lambda a: sender.arm_pitch_1a1b(a, 0.0)), + ("J2 (elbow pitch)", sender.arm_pitch_j2), + ("J3 (roll)", sender.arm_roll_j3), + ("J4 (wrist pitch)", sender.arm_pitch_j4), + ("J5 (wrist roll)", sender.arm_roll_j5), + ] + + for name, cmd in joints: + print(f" Moving {name} to +{amplitude} rad...") + cmd(amplitude) + time.sleep(duration) + + print(f" Moving {name} to 0 rad...") + cmd(0.0) + time.sleep(duration) + + print("=== Arm joint test complete ===\n") + + +def test_arm_beak(sender: CanSender, wait: float = 3.0) -> None: + """Test beak open/close cycle. + + Args: + sender: CanSender instance. + wait: Seconds to wait between open/close. + """ + print("=== Beak gripper test ===") + + print(" Closing beak...") + sender.arm_beak(close=True) + time.sleep(wait) + + print(" Opening beak...") + sender.arm_beak(close=False) + time.sleep(wait) + + print("=== Beak test complete ===\n") + + +def test_arm_reboot(sender: CanSender) -> None: + """Test arm motor reboot sequence. + + Sends REBOOT_ARM and waits for re-initialization. + """ + print("=== Arm reboot test ===") + + print(" Sending REBOOT_ARM...") + sender.reboot_arm() + print(" Waiting for re-initialization (5s)...") + time.sleep(5.0) + + print(" Arm should have re-initialized smoothly.") + print("=== Arm reboot test complete ===\n") + + +def test_full_diagnostics( + sender: CanSender, + monitor: CanMonitor, + speed: float = 3.0, +) -> None: + """Run full diagnostic sequence: traction → arm init → arm joints → beak. + + Args: + sender: CanSender instance. + monitor: CanMonitor instance (for collecting feedback). + speed: RPM for traction tests. + """ + print("=" * 60) + print(" FULL DIAGNOSTICS") + print("=" * 60) + + test_traction_basic(sender, speed=speed, duration=2.0) + test_arm_init(sender) + test_arm_joints(sender, amplitude=0.15, duration=1.5) + test_arm_beak(sender, wait=2.0) + + print("=" * 60) + print(" DIAGNOSTICS COMPLETE") + print("=" * 60) + + print("\nFeedback summary:") + for name, count in monitor.stats.items(): + print(f" {name}: {count} messages") + + +# ─── Test registry ─────────────────────────────────────────────────────────── + +TESTS = { + "traction": test_traction_basic, + "traction_diff": test_traction_differential, + "arm_init": test_arm_init, + "arm_joints": test_arm_joints, + "arm_beak": test_arm_beak, + "arm_reboot": test_arm_reboot, + "full": test_full_diagnostics, +} diff --git a/tools/can_tester/web_dashboard.py b/tools/can_tester/web_dashboard.py new file mode 100644 index 0000000..743cbcf --- /dev/null +++ b/tools/can_tester/web_dashboard.py @@ -0,0 +1,769 @@ +#!/usr/bin/env python3 +""" +Web dashboard for real-time CAN bus monitoring. + +Provides a browser-based UI at http://localhost:8080 with: +- Live message feed via Server-Sent Events (SSE) +- Message statistics +- Send command panel +- Auto-reconnecting real-time display + +Usage: + python -m tools.can_tester.web_dashboard --interface gs_usb --channel 0 + python -m tools.can_tester.web_dashboard --interface socketcan --channel can0 +""" + +from __future__ import annotations + +import argparse +import json +import sys +import threading +import time +from queue import Queue, Empty, Full + +from flask import Flask, Response, request, jsonify, render_template_string + +import can + +from .protocol import ModuleAddress, MsgType, decode_can_id, MSG_NAMES +from .codec import decode_payload, format_decoded +from .sender import CanSender +from .monitor import CanMonitor, NAMED_FILTERS + +app = Flask(__name__) + +# Global state (initialized in main) +bus: can.BusABC = None # type: ignore +sender: CanSender = None # type: ignore +monitor: CanMonitor = None # type: ignore +event_queues: list[Queue] = [] +event_queues_lock = threading.Lock() + +# Accumulated positions for relative mode (per-command) +_positions: dict[str, list[float]] = {} + + +# ─── HTML template ─────────────────────────────────────────────────────────── + +DASHBOARD_HTML = """ + + + + +STM32LowLevel CAN Dashboard + + + +

STM32LowLevel CAN Dashboard

+
+ Waiting for CAN data...
+
+
+

Live Feed

+
+ + + + + + + +
+
+
+
+
+
+

Send Command

+
+ + +
+
+ + +
+

+
+
+
+ + +
+
+ + +
+
+ + +
+ + +
+ +
+
+ + +
+
+ + +
+ +
+
+
+ + +
+
+
+

Statistics

+
+
+
+

Latest Values

+
+
+
+
+ +""" + + +# ─── Determine filter tags for a message type ─────────────────────────────── + +def get_filter_tags(msg_type: int) -> list[str]: + """Return which named filters this message type belongs to.""" + tags = [] + for name, filter_set in NAMED_FILTERS.items(): + if filter_set is None: + continue + if msg_type in filter_set: + tags.append(name) + return tags + + +# ─── Routes ────────────────────────────────────────────────────────────────── + +@app.route("/") +def index(): + return render_template_string(DASHBOARD_HTML) + + +@app.route("/stream") +def stream(): + """Server-Sent Events stream for live CAN messages.""" + q: Queue = Queue(maxsize=100) + with event_queues_lock: + event_queues.append(q) + + def generate(): + try: + while True: + try: + data = q.get(timeout=30) + yield f"data: {json.dumps(data)}\n\n" + except Empty: + yield ": keepalive\n\n" + except GeneratorExit: + pass + finally: + with event_queues_lock: + if q in event_queues: + event_queues.remove(q) + + return Response(generate(), mimetype="text/event-stream") + + +@app.route("/send", methods=["POST"]) +def send_command(): + """Handle send command from web UI.""" + data = request.get_json(silent=True) + if not data: + return jsonify({"error": "Missing or invalid JSON payload"}), 400 + cmd = data.get("command", "") + dest = int(data.get("target", ModuleAddress.MK2_MOD1)) + v1 = float(data.get("val1", 0)) + v2 = float(data.get("val2", 0)) + permanent = data.get("permanent", False) + relative = data.get("relative", False) + + # Relative mode: accumulate deltas into absolute positions + if relative and cmd in _positions: + _positions[cmd][0] += v1 + _positions[cmd][1] += v2 + v1, v2 = _positions[cmd] + elif relative: + _positions[cmd] = [v1, v2] + else: + _positions[cmd] = [v1, v2] + + try: + if cmd == "traction": + sender.traction(v1, v2, destination=dest) + elif cmd == "arm_j2": + sender.arm_pitch_j2(v1) + elif cmd == "arm_j3": + sender.arm_roll_j3(v1) + elif cmd == "arm_j4": + sender.arm_pitch_j4(v1) + elif cmd == "arm_j5": + sender.arm_roll_j5(v1) + elif cmd == "arm_1a1b": + sender.arm_pitch_1a1b(v1, v2) + elif cmd == "beak_close": + sender.arm_beak(close=True) + elif cmd == "beak_open": + sender.arm_beak(close=False) + elif cmd == "reset_arm": + sender.reset_arm() + elif cmd == "reboot_arm": + sender.reboot_arm() + elif cmd == "set_home": + sender.set_home(persist=permanent) + elif cmd == "reboot_traction": + sender.reboot_traction(destination=dest) + elif cmd == "joint_1a1b": + sender.joint_pitch_1a1b(v1, v2, destination=dest) + elif cmd == "joint_roll": + sender.joint_roll(v1, destination=dest) + elif cmd == "stop_all": + sender.stop_all() + else: + return jsonify({"error": f"Unknown command: {cmd}"}), 400 + + return jsonify({"ok": True}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/reset_positions", methods=["POST"]) +def reset_positions(): + """Reset accumulated relative positions to zero.""" + _positions.clear() + return jsonify({"ok": True}) + + +@app.route("/stop", methods=["POST"]) +def stop_all(): + """Emergency stop all motors.""" + sender.stop_all() + return jsonify({"ok": True}) + + +# ─── CAN → SSE bridge ─────────────────────────────────────────────────────── + +def can_to_sse_callback(decoded_id, payload, raw_msg): + """Push decoded CAN messages to all SSE clients.""" + payload_str = " ".join( + f"{k}={v:.4f}" if isinstance(v, float) else f"{k}={v}" + for k, v in payload.items() + if not k.startswith("_") + ) + + event = { + "source": decoded_id.source_name, + "destination": decoded_id.destination_name, + "msg_name": decoded_id.msg_name, + "msg_type": decoded_id.msg_type, + "payload_str": payload_str, + "payload": {k: v for k, v in payload.items() if not k.startswith("_")}, + "filter_tags": get_filter_tags(decoded_id.msg_type), + "timestamp": raw_msg.timestamp or time.time(), + } + + with event_queues_lock: + for q in event_queues: + try: + q.put_nowait(event) + except Full: + pass # Drop if client is slow + + +# ─── Main ──────────────────────────────────────────────────────────────────── + +def main(): + global bus, sender, monitor + + parser = argparse.ArgumentParser(description="STM32LowLevel CAN Web Dashboard") + parser.add_argument("--interface", "-i", default="gs_usb") + parser.add_argument("--channel", "-c", default="0") + parser.add_argument("--bitrate", "-b", type=int, default=1000000, + help="CAN arbitration bitrate in bps (default: 1000000 = 1 Mbit/s)") + parser.add_argument("--data-bitrate", "-D", type=int, default=2000000, + help="CAN FD data-phase bitrate in bps (default: 2000000 = 2 Mbit/s)") + parser.add_argument("--port", "-p", type=int, default=8080) + parser.add_argument("--host", default="127.0.0.1") + parser.add_argument( + "--demo", action="store_true", + help="Use virtual CAN bus for GUI preview (no hardware needed)", + ) + args = parser.parse_args() + + iface = "virtual" if args.demo else args.interface + channel = "demo" if args.demo else args.channel + + print(f"Connecting to CAN bus: interface={iface}, " + f"channel={channel}, bitrate={args.bitrate}, data_bitrate={args.data_bitrate}...") + if args.demo: + print(" (demo mode — no real CAN hardware required)") + + try: + bus = can.Bus( + interface=iface, + channel=channel, + bitrate=args.bitrate, + data_bitrate=args.data_bitrate, + fd=True, + ) + except Exception as e: + print(f"Failed to connect: {e}") + sys.exit(1) + + sender = CanSender(bus) + monitor = CanMonitor(bus) + monitor.add_callback(can_to_sse_callback) + + # Start CAN monitor in background thread + monitor_thread = threading.Thread( + target=monitor.run, + kwargs={"quiet": True}, + daemon=True, + ) + monitor_thread.start() + + print(f"Dashboard: http://localhost:{args.port}") + print("Press Ctrl+C to stop.\n") + + try: + app.run(host=args.host, port=args.port, threaded=True) + except KeyboardInterrupt: + pass + finally: + monitor.stop() + bus.shutdown() + + +if __name__ == "__main__": + main()