From 0cb7fa96dcf2372fe539e002a17d25bf53deb5d5 Mon Sep 17 00:00:00 2001 From: mkozlowski <10508687+m-kozlowski@users.noreply.github.com> Date: Sun, 22 Feb 2026 21:24:44 +0100 Subject: [PATCH 1/8] resmed_flash.py UART-only flash tool for ResMed AirSense 10. Enters bootloader via RES + BID, erases and writes flash blocks (CCX/CDX/CMX/BLX), with auto baud negotiation. Supports full 1MB image or individual block flashing, CRC16 validation with optional fix, and auto-detection by file size. --- python/resmed_flash.py | 551 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 551 insertions(+) create mode 100755 python/resmed_flash.py diff --git a/python/resmed_flash.py b/python/resmed_flash.py new file mode 100755 index 0000000..0ebc665 --- /dev/null +++ b/python/resmed_flash.py @@ -0,0 +1,551 @@ +#!/usr/bin/env python3 +""" +ResMed AirSense UART Flash Tool + +Flashes firmware to ResMed AirSense devices via the UART bootloader protocol. +Supports individual block or full image flashing with CRC validation and +automatic baud rate negotiation up to 460800. + +Flash memory map: + 0x08000000 BLX 16KB Bootloader + 0x08004000 CCX 240KB Configuration + 0x08040000 CDX 768KB Firmware + CMX 1008KB CCX+CDX combined + +""" + +import serial +import argparse +import time +import sys +import struct + + +BLOCKS = { + 'BLX': {'flash_start': 0x08000000, 'file_offset': 0x00000, 'size': 0x04000, + 'name': 'Bootloader', 'erase_cmd': 'P F *BLX 0000'}, + 'CCX': {'flash_start': 0x08004000, 'file_offset': 0x04000, 'size': 0x3C000, + 'name': 'Config', 'erase_cmd': 'P F *CCX 0000'}, + 'CDX': {'flash_start': 0x08040000, 'file_offset': 0x40000, 'size': 0xC0000, + 'name': 'Firmware', 'erase_cmd': 'P F *CDX 0000'}, + 'CMX': {'flash_start': 0x08004000, 'file_offset': 0x04000, 'size': 0xFC000, + 'name': 'Config+FW', 'erase_cmd': 'P F *CMX 0000'}, +} + +FULL_IMAGE_SIZE = 0x100000 # 1MB + +BLOCK_ALIASES = { + 'bootloader': 'BLX', 'boot': 'BLX', 'blx': 'BLX', + 'config': 'CCX', 'conf': 'CCX', 'ccx': 'CCX', + 'firmware': 'CDX', 'fw': 'CDX', 'cdx': 'CDX', + 'all': 'CMX', 'cmx': 'CMX', +} + + +BDD_RATES = {57600: '0000', 115200: '0001', 460800: '0002'} +BDD_RATES_ORDERED = [460800, 115200, 57600] +#PROBE_RATES = [9600, 19200, 57600, 115200, 460800] +PROBE_RATES = [57600, 115200, 460800] + + +def crc16_ccitt(data: bytes) -> int: + crc = 0xFFFF + for b in data: + crc ^= b << 8 + for _ in range(8): + crc = ((crc << 1) ^ 0x1021 if crc & 0x8000 else crc << 1) & 0xFFFF + return crc + +def escape_payload(data: bytes) -> bytes: + return data.replace(b'U', b'UU') + +def build_frame(frame_type: str, payload: bytes) -> bytes: + escaped = escape_payload(payload) + frame_len = 1 + 1 + 3 + len(escaped) + 4 + pre_crc = b'U' + frame_type.encode() + f'{frame_len:03X}'.encode() + escaped + crc = crc16_ccitt(pre_crc) + return pre_crc + f'{crc:04X}'.encode() + +def build_q_frame(cmd: str) -> bytes: + return build_frame('Q', cmd.encode()) + +def build_f_frame(block_name: bytes, seq: int, records: bytes) -> bytes: + return build_frame('F', block_name + b'\x00' + bytes([seq]) + records) + +def build_completion_frame(block_name: bytes, seq: int) -> bytes: + return build_frame('F', block_name + b'F' + bytes([seq])) + +def build_record_03(address: int, data: bytes) -> bytes: + length = 4 + len(data) + 1 + return bytes([0x03, length]) + struct.pack('>I', address) + data + bytes([0]) + +def parse_responses(data: bytes) -> list: + responses = [] + i = 0 + while i < len(data): + if data[i:i+1] == b'U' and i+1 < len(data) and data[i+1:i+2] != b'U': + try: + ft = chr(data[i+1]) + if ft in 'EFKLPQR': + length = int(data[i+2:i+5], 16) + if i + length <= len(data): + raw_payload = data[i+5:i+length-4] + payload = raw_payload.replace(b'UU', b'U') + responses.append({'type': ft, 'payload': payload}) + i += length + continue + except (ValueError, IndexError): + pass + i += 1 + return responses + +def read_responses(ser, timeout=1.0): + old_timeout = ser.timeout + ser.timeout = 0.02 + data = b'' + deadline = time.time() + timeout + while time.time() < deadline: + chunk = ser.read(4096) + if chunk: + data += chunk + # Got data but do one more quick read for any trailing bytes + trail_deadline = time.time() + 0.02 + while time.time() < trail_deadline: + chunk = ser.read(4096) + if chunk: + data += chunk + trail_deadline = time.time() + 0.02 # extend if still coming + break + ser.timeout = old_timeout + return data, parse_responses(data) + +def send_cmd(ser, cmd_str, timeout=2.0, quiet=False): + ser.reset_input_buffer() + ser.write(build_q_frame(cmd_str)) + ser.flush() + raw, responses = read_responses(ser, timeout=timeout) + if not quiet: + for r in responses: + print(f" [{r['type']}] {r['payload'].decode('ascii', errors='replace')}") + return raw, responses + +def sync_uart(ser): + ser.reset_input_buffer() + ser.write(b'\x55' * 128) + ser.flush() + time.sleep(0.05) + ser.reset_input_buffer() + + +def probe_baud(ser): + for rate in PROBE_RATES: + ser.baudrate = rate + ser.reset_input_buffer() + time.sleep(0.05) + _, resp = send_cmd(ser, "G S #BID", timeout=0.3, quiet=True) + if any(b'BID' in r['payload'] for r in resp): + return rate + return None + +def switch_baud(ser, target, quiet=False): + if target not in BDD_RATES or ser.baudrate == target: + return ser.baudrate == target + old = ser.baudrate + if not quiet: + print(f"[*] Switching baud: {old} -> {target} (BDD {BDD_RATES[target]})...") + ser.reset_input_buffer() + ser.write(build_q_frame(f"P S #BDD {BDD_RATES[target]}")) + ser.flush() + time.sleep(0.3) + read_responses(ser, timeout=0.5) # consume ACK at old baud + ser.baudrate = target + sync_uart(ser) + _, resp = send_cmd(ser, "G S #BID", timeout=0.5, quiet=True) + if any(b'BID' in r['payload'] for r in resp): + if not quiet: + print(f"[+] Running at {target} baud") + return True + if not quiet: + print(f"[!] No response at {target}, reverting") + ser.baudrate = old + sync_uart(ser) + return False + +def negotiate_best_baud(ser): + for rate in BDD_RATES_ORDERED: + if rate == ser.baudrate: + return rate + if switch_baud(ser, rate): + return rate + return ser.baudrate + + +def enter_bootloader(ser, max_retries=3): + bid_frame = build_q_frame("G S #BID") + preamble = b'\x55' * 128 + for retry in range(max_retries): + if retry > 0: + print(f"[*] Retry {retry}/{max_retries-1}...") + time.sleep(1.0) + ser.reset_input_buffer() + print("[*] Checking device...") + _, resp = send_cmd(ser, "G S #BID", timeout=1.0) + if not resp: + continue + print("[*] Triggering reboot...") + ser.reset_input_buffer() + ser.write(build_q_frame("P S #RES 0001")) + ser.flush() + time.sleep(0.05) + ser.reset_input_buffer() + print("[*] Flooding to catch bootloader...") + t0 = time.time() + for _ in range(300): + ser.write(preamble + bid_frame) + _, responses = read_responses(ser, timeout=0.05) + if any(b'BID' in r['payload'] for r in responses): + print(f"[+] Bootloader caught at t+{time.time()-t0:.2f}s") + time.sleep(0.2) + ser.reset_input_buffer() + return True + print("[!] Failed to catch bootloader") + return False + +def wait_for_erase(ser, block_id, timeout=30.0): + print("[*] Waiting for erase completion...") + t0 = time.time() + p_count = 0 + while time.time() - t0 < timeout: + _, responses = read_responses(ser, timeout=0.5) + for r in responses: + p = r['payload'].decode('ascii', errors='replace') + if r['type'] == 'P': + p_count += 1 + sys.stdout.write(f"\r Erase progress: {p_count} ACKs...") + sys.stdout.flush() + elif r['type'] == 'R': + print(f"\r Erase done [{r['type']}]: {p} ({p_count} ACKs) ") + return True + else: + print(f"\n [{r['type']}] {p}") + print(f"\n[!] Erase timeout ({p_count} ACKs)") + return False + + +def check_block_crc(data: bytes, block_id: str) -> tuple: + """Check CRC of a block. Returns (stored_crc, computed_crc, match).""" + stored = (data[-2] << 8) | data[-1] + computed = crc16_ccitt(data[:-2]) + return stored, computed, stored == computed + +def fix_block_crc(data: bytearray, block_id: str) -> int: + """Recalculate and patch CRC in last 2 bytes. Returns new CRC.""" + crc = crc16_ccitt(data[:-2]) + data[-2] = (crc >> 8) & 0xFF + data[-1] = crc & 0xFF + return crc + + +SIZE_TO_BLOCK = { + 0x04000: 'BLX', + 0x3C000: 'CCX', + 0xC0000: 'CDX', + 0xFC000: 'CMX', +} + +def detect_input(file_data: bytes, block_arg: str, include_bootloader: bool): + """ + Determine what to flash based on file size and --block argument. + Returns list of (block_id, data_bytes, flash_start_addr) tuples. + """ + fsize = len(file_data) + is_full_image = (fsize == FULL_IMAGE_SIZE) + + # If --block specified, resolve it + if block_arg: + block_id = BLOCK_ALIASES.get(block_arg.lower()) + if not block_id: + print(f"[!] Unknown block '{block_arg}'. Use: config, firmware, all, bootloader") + return None + blk = BLOCKS[block_id] + if is_full_image: + data = file_data[blk['file_offset']:blk['file_offset'] + blk['size']] + elif fsize == blk['size']: + data = file_data + else: + print(f"[!] File size {fsize} doesn't match {block_id} ({blk['size']}) or full image ({FULL_IMAGE_SIZE})") + return None + if block_id == 'BLX' and not include_bootloader: + print("[!] Bootloader flash requires --include-bootloader") + return None + return [(block_id, bytearray(data), blk['flash_start'])] + + # Auto-detect from file size + if is_full_image: + # Default: flash config + firmware (CMX), skip bootloader + blk = BLOCKS['CMX'] + data = file_data[blk['file_offset']:blk['file_offset'] + blk['size']] + return [('CMX', bytearray(data), blk['flash_start'])] + elif fsize in SIZE_TO_BLOCK: + block_id = SIZE_TO_BLOCK[fsize] + blk = BLOCKS[block_id] + if block_id == 'BLX' and not include_bootloader: + print("[!] Bootloader flash requires --include-bootloader") + return None + return [(block_id, bytearray(file_data), blk['flash_start'])] + else: + print(f"[!] Unknown file size {fsize}. Use --block to specify target.") + print(f" Known sizes: 1MB (full), 240KB (config), 768KB (firmware), 1008KB (config+fw)") + return None + + +CHUNK_SIZE = 250 + +def flash_block(ser, block_id, data, flash_start, dry_run=False): + """Erase and flash a single block. Returns True on success.""" + blk = BLOCKS[block_id] + block_name = block_id.encode() + + # Trim trailing 0xFF + data_end = len(data) + while data_end > 0 and data[data_end-1] == 0xFF: + data_end -= 1 + if data_end == 0: + print(f"[!] {block_id} data is all 0xFF, nothing to write") + return True + data_end = (data_end + 3) & ~3 + + print(f"\n{'='*60}") + print(f" Block: {block_id} ({blk['name']})") + print(f" Flash: 0x{flash_start:08X} — 0x{flash_start + len(data):08X}") + print(f" Data: {data_end:,} bytes (trimmed from {len(data):,})") + print(f"{'='*60}") + + if dry_run: + print(" [DRY RUN] Would erase and flash this block") + return True + + # ERASE + for attempt in range(3): + print(f"\n[*] Erasing {block_id} (attempt {attempt+1})...") + ser.reset_input_buffer() + ser.write(build_q_frame(blk['erase_cmd'])) + ser.flush() + if wait_for_erase(ser, block_id, timeout=30.0): + break + elif attempt < 2: + print("[!] Erase stalled, retrying...") + time.sleep(1.0) + else: + print("[!] Erase failed") + return False + + # WRITE + seq = 0 + offset = 0 + frame_count = 0 + t0 = time.time() + + print(f"\n[*] Writing {data_end:,} bytes @ {ser.baudrate} baud...") + + while offset < data_end: + chunk = data[offset:offset + CHUNK_SIZE] + if not chunk: + break + address = flash_start + offset + f_frame = build_f_frame(block_name, seq, build_record_03(address, bytes(chunk))) + ser.write(f_frame) + frame_count += 1 + offset += len(chunk) + seq = (seq + 1) & 0xFF + + if frame_count % 20 == 0: + ser.flush() + elapsed = time.time() - t0 + pct = offset / data_end * 100 + rate = offset / elapsed if elapsed > 0 else 0 + eta = (data_end - offset) / rate if rate > 0 else 0 + sys.stdout.write( + f"\r {offset:,}/{data_end:,} ({pct:.0f}%) " + f"[{rate/1024:.1f} KB/s] ETA {eta:.0f}s" + ) + sys.stdout.flush() + time.sleep(0.01) + _, responses = read_responses(ser, timeout=0.01) + for r in responses: + print(f"\n [{r['type']}] {r['payload'].decode('ascii', errors='replace')}") + + ser.flush() + elapsed = time.time() - t0 + print(f"\r {offset:,}/{data_end:,} (100%) in {elapsed:.1f}s, {frame_count} frames ") + + # Completion frame + print("[*] Sending completion frame...") + ser.write(build_completion_frame(block_name, seq)) + ser.flush() + time.sleep(0.5) + _, responses = read_responses(ser, timeout=1.0) + for r in responses: + print(f" [{r['type']}] {r['payload'].decode('ascii', errors='replace')}") + + return True + + +def cmd_info(ser): + print("\n[*] Probing device...") + baud = probe_baud(ser) + if not baud: + print("[!] Device not responding") + return 1 + ser.baudrate = baud + print(f"[+] Device responding at {baud} baud") + print("\n[*] Device info:") + for cmd in ["G S #BID", "G S #SID", "G S #CID", "G S #PCB", + "G S #SRN", "G S #PCD", "G S #PNA"]: + send_cmd(ser, cmd, timeout=0.5) + return 0 + + +def main(): + parser = argparse.ArgumentParser( + description='ResMed AirSense UART Flash Tool', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Blocks: + config (CCX) Configuration data (240KB) + firmware (CDX) Application firmware (768KB) + all (CMX) Config + Firmware (1008KB) + bootloader (BLX) Bootloader (16KB, requires --include-bootloader) + +Examples: + %(prog)s -p /dev/ttyACM0 -f dump.bin Flash config+firmware + %(prog)s -p /dev/ttyACM0 -f dump.bin --block config Flash config only + %(prog)s -p /dev/ttyACM0 -f config.bin Auto-detect 240KB config + %(prog)s -p /dev/ttyACM0 -f dump.bin --fix-crc Fix CRC before flash + %(prog)s -p /dev/ttyACM0 -f dump.bin --dry-run Validate without flashing + %(prog)s -p /dev/ttyACM0 --info Show device info +""") + parser.add_argument('-p', '--port', required=True, help='Serial port') + parser.add_argument('-f', '--file', help='Firmware file to flash') + parser.add_argument('--block', help='Target block: config, firmware, all, bootloader') + parser.add_argument('--baud', default='auto', help='Transfer baud: auto, 57600, 115200, 460800') + parser.add_argument('--fix-crc', action='store_true', help='Recalculate and patch CRC') + parser.add_argument('--force', action='store_true', help='Flash even with bad CRC') + parser.add_argument('--include-bootloader', action='store_true', help='Allow bootloader writes') + parser.add_argument('--dry-run', action='store_true', help='Validate only, do not flash') + parser.add_argument('--no-reset', action='store_true', help='Do not reset device after flash') + parser.add_argument('--no-enter', action='store_true', help='Skip bootloader entry') + parser.add_argument('--info', action='store_true', help='Show device info and exit') + args = parser.parse_args() + + ser = serial.Serial(args.port, 57600, timeout=1.0) + + try: + if args.info: + return cmd_info(ser) + + if not args.file: + parser.error("-f/--file is required (unless using --info)") + + with open(args.file, 'rb') as f: + file_data = f.read() + print(f"[*] Loaded: {args.file} ({len(file_data):,} bytes)") + + # detect blocks + jobs = detect_input(file_data, args.block, args.include_bootloader) + if not jobs: + return 1 + + print(f"\n[*] Flash plan:") + for block_id, data, flash_start in jobs: + blk = BLOCKS[block_id] + print(f" {block_id} ({blk['name']}): {len(data):,} bytes @ 0x{flash_start:08X}") + + + print(f"\n[*] CRC validation:") + crc_ok = True + for block_id, data, _ in jobs: + if block_id == 'CMX': + # CMX spans CCX + CDX, check both sub-block CRCs + sub_blocks = [ + ('CCX', data[:BLOCKS['CCX']['size']]), + ('CDX', data[BLOCKS['CDX']['file_offset'] - BLOCKS['CMX']['file_offset']:]), + ] + else: + sub_blocks = [(block_id, data)] + + for sub_id, sub_data in sub_blocks: + stored, computed, match = check_block_crc(sub_data, sub_id) + status = "OK" if match else "MISMATCH" + icon = "+" if match else "!" + print(f" [{icon}] {sub_id}: stored=0x{stored:04X} computed=0x{computed:04X} {status}") + + if not match: + if args.fix_crc: + new_crc = fix_block_crc(sub_data, sub_id) + print(f" Fixed CRC -> 0x{new_crc:04X}") + elif not args.force: + crc_ok = False + + if not crc_ok: + print("\n[!] CRC mismatch. Use --fix-crc to repair or --force to ignore.") + return 1 + + if args.dry_run: + print("\n[DRY RUN] Validation complete. No changes made.") + for block_id, data, flash_start in jobs: + flash_block(ser, block_id, data, flash_start, dry_run=True) + return 0 + + # baud auto or constant + if args.baud == 'auto': + print(f"\n[*] Probing device...") + baud = probe_baud(ser) + if not baud: + print("[!] Device not responding at any known baud rate") + return 1 + print(f"[+] Device responding at {baud} baud") + ser.baudrate = baud + else: + init_baud = int(args.baud) + ser.baudrate = init_baud + print(f"\n[*] Connecting at {init_baud} baud...") + _, resp = send_cmd(ser, "G S #BID", timeout=1.0, quiet=True) + if not any(b'BID' in r['payload'] for r in resp): + print(f"[!] No response at {init_baud} baud") + return 1 + print(f"[+] Device responding at {init_baud} baud") + + if not args.no_enter: + if ser.baudrate != 57600: + switch_baud(ser, 57600) + if not enter_bootloader(ser): + return 1 + + if args.baud == 'auto': + print("\n[*] Negotiating baud rate...") + negotiate_best_baud(ser) + else: + target = int(args.baud) + if target != ser.baudrate: + switch_baud(ser, target) + + for block_id, data, flash_start in jobs: + if not flash_block(ser, block_id, data, flash_start): + print(f"\n[!] Failed to flash {block_id}") + return 1 + + if not args.no_reset: + if ser.baudrate != 57600: + switch_baud(ser, 57600) + print("\n[*] Resetting device...") + send_cmd(ser, "P S #RES 0001", timeout=2.0) + + print("\n[+] Flash complete!") + return 0 + + finally: + ser.close() + +if __name__ == '__main__': + sys.exit(main()) From 7bd0ee5e38f2969b217fcb0c81f646b396626626 Mon Sep 17 00:00:00 2001 From: mkozlowski <10508687+m-kozlowski@users.noreply.github.com> Date: Sun, 22 Feb 2026 21:48:33 +0100 Subject: [PATCH 2/8] FIX: fix crc in original data array, not a sliced copy --- python/resmed_flash.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/python/resmed_flash.py b/python/resmed_flash.py index 0ebc665..d1c67f0 100755 --- a/python/resmed_flash.py +++ b/python/resmed_flash.py @@ -466,15 +466,17 @@ def main(): crc_ok = True for block_id, data, _ in jobs: if block_id == 'CMX': - # CMX spans CCX + CDX, check both sub-block CRCs + ccx_size = BLOCKS['CCX']['size'] + cdx_offset = BLOCKS['CDX']['file_offset'] - BLOCKS['CMX']['file_offset'] sub_blocks = [ - ('CCX', data[:BLOCKS['CCX']['size']]), - ('CDX', data[BLOCKS['CDX']['file_offset'] - BLOCKS['CMX']['file_offset']:]), + ('CCX', 0, ccx_size), + ('CDX', cdx_offset, len(data)), ] else: - sub_blocks = [(block_id, data)] + sub_blocks = [(block_id, 0, len(data))] - for sub_id, sub_data in sub_blocks: + for sub_id, start, end in sub_blocks: + sub_data = data[start:end] stored, computed, match = check_block_crc(sub_data, sub_id) status = "OK" if match else "MISMATCH" icon = "+" if match else "!" @@ -482,8 +484,10 @@ def main(): if not match: if args.fix_crc: - new_crc = fix_block_crc(sub_data, sub_id) - print(f" Fixed CRC -> 0x{new_crc:04X}") + crc = crc16_ccitt(data[start:end-2]) + data[end-2] = (crc >> 8) & 0xFF + data[end-1] = crc & 0xFF + print(f" Fixed CRC -> 0x{crc:04X}") elif not args.force: crc_ok = False From 3e97095c08cc101cfa2fc98b063455aeeaeebe9c Mon Sep 17 00:00:00 2001 From: mkozlowski <10508687+m-kozlowski@users.noreply.github.com> Date: Tue, 24 Feb 2026 16:27:29 +0100 Subject: [PATCH 3/8] resmed_flash.py: use BLL 0001 instead of RES 0001 for more reliable bootloader switching --- python/resmed_flash.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/resmed_flash.py b/python/resmed_flash.py index d1c67f0..80e300f 100755 --- a/python/resmed_flash.py +++ b/python/resmed_flash.py @@ -194,7 +194,9 @@ def enter_bootloader(ser, max_retries=3): continue print("[*] Triggering reboot...") ser.reset_input_buffer() - ser.write(build_q_frame("P S #RES 0001")) + #ser.write(build_q_frame("P S #RES 0001")) + #ser.write(build_q_frame("P S #RES 0003")) + ser.write(build_q_frame("P S #BLL 0001")) ser.flush() time.sleep(0.05) ser.reset_input_buffer() From 5463554f51df7cb1d8122dc3c0e4043f7e24394a Mon Sep 17 00:00:00 2001 From: mkozlowski <10508687+m-kozlowski@users.noreply.github.com> Date: Wed, 25 Feb 2026 00:14:27 +0100 Subject: [PATCH 4/8] Improved block selection logic - multiple blocks can be selected by passing more --block args - block alias 'all' also includes bootloader - sort and deduplicate specified blocks --- python/resmed_flash.py | 88 +++++++++++++++++++++++++----------------- 1 file changed, 53 insertions(+), 35 deletions(-) diff --git a/python/resmed_flash.py b/python/resmed_flash.py index 80e300f..58ac9aa 100755 --- a/python/resmed_flash.py +++ b/python/resmed_flash.py @@ -38,7 +38,7 @@ 'bootloader': 'BLX', 'boot': 'BLX', 'blx': 'BLX', 'config': 'CCX', 'conf': 'CCX', 'ccx': 'CCX', 'firmware': 'CDX', 'fw': 'CDX', 'cdx': 'CDX', - 'all': 'CMX', 'cmx': 'CMX', + 'cmx': 'CMX', } @@ -255,51 +255,66 @@ def fix_block_crc(data: bytearray, block_id: str) -> int: 0xFC000: 'CMX', } -def detect_input(file_data: bytes, block_arg: str, include_bootloader: bool): +def detect_input(file_data: bytes, block_args: list, include_bootloader: bool): """ - Determine what to flash based on file size and --block argument. + Determine what to flash based on file size and --block argument(s). Returns list of (block_id, data_bytes, flash_start_addr) tuples. """ fsize = len(file_data) is_full_image = (fsize == FULL_IMAGE_SIZE) - # If --block specified, resolve it - if block_arg: - block_id = BLOCK_ALIASES.get(block_arg.lower()) - if not block_id: - print(f"[!] Unknown block '{block_arg}'. Use: config, firmware, all, bootloader") - return None + if block_args: + requested = [] + for arg in block_args: + key = arg.lower() + if key == 'all': + requested.extend(['BLX', 'CMX']) + else: + block_id = BLOCK_ALIASES.get(key) + if not block_id: + print(f"[!] Unknown block '{arg}'. Use: config, firmware, all, bootloader, cmx") + return None + requested.append(block_id) + else: + requested = ['BLX', 'CMX'] + + # Deduplicate preserving order + seen = set() + unique = [] + for bid in requested: + if bid not in seen: + seen.add(bid) + unique.append(bid) + + jobs = [] + for block_id in unique: + if block_id == 'BLX' and not include_bootloader: + print(f"[*] Skipping {block_id} (use --include-bootloader to include)") + continue + blk = BLOCKS[block_id] if is_full_image: data = file_data[blk['file_offset']:blk['file_offset'] + blk['size']] - elif fsize == blk['size']: + elif len(unique) == 1 and fsize == blk['size']: + # Single standalone block file data = file_data - else: - print(f"[!] File size {fsize} doesn't match {block_id} ({blk['size']}) or full image ({FULL_IMAGE_SIZE})") - return None - if block_id == 'BLX' and not include_bootloader: - print("[!] Bootloader flash requires --include-bootloader") + elif fsize in SIZE_TO_BLOCK and len(unique) == 1: + print(f"[!] File size {fsize} doesn't match {block_id} ({blk['size']})") return None - return [(block_id, bytearray(data), blk['flash_start'])] - - # Auto-detect from file size - if is_full_image: - # Default: flash config + firmware (CMX), skip bootloader - blk = BLOCKS['CMX'] - data = file_data[blk['file_offset']:blk['file_offset'] + blk['size']] - return [('CMX', bytearray(data), blk['flash_start'])] - elif fsize in SIZE_TO_BLOCK: - block_id = SIZE_TO_BLOCK[fsize] - blk = BLOCKS[block_id] - if block_id == 'BLX' and not include_bootloader: - print("[!] Bootloader flash requires --include-bootloader") + else: + print(f"[!] Need full image ({FULL_IMAGE_SIZE} bytes) for this operation") return None - return [(block_id, bytearray(file_data), blk['flash_start'])] - else: - print(f"[!] Unknown file size {fsize}. Use --block to specify target.") - print(f" Known sizes: 1MB (full), 240KB (config), 768KB (firmware), 1008KB (config+fw)") + + jobs.append((block_id, bytearray(data), blk['flash_start'])) + + if not jobs: + print("[!] No blocks to flash") return None + # Sort by flash address (BLX before CMX) + jobs.sort(key=lambda j: j[2]) + return jobs + CHUNK_SIZE = 250 @@ -414,14 +429,17 @@ def main(): formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Blocks: + all BLX + CMX (default) config (CCX) Configuration data (240KB) firmware (CDX) Application firmware (768KB) - all (CMX) Config + Firmware (1008KB) + cmx (CMX) Config + Firmware combined (1008KB) bootloader (BLX) Bootloader (16KB, requires --include-bootloader) Examples: - %(prog)s -p /dev/ttyACM0 -f dump.bin Flash config+firmware + %(prog)s -p /dev/ttyACM0 -f dump.bin Flash CMX (skip BLX) + %(prog)s -p /dev/ttyACM0 -f dump.bin --include-bootloader Flash BLX + CMX %(prog)s -p /dev/ttyACM0 -f dump.bin --block config Flash config only + %(prog)s -p /dev/ttyACM0 -f dump.bin --block blx --block cmx --include-bootloader %(prog)s -p /dev/ttyACM0 -f config.bin Auto-detect 240KB config %(prog)s -p /dev/ttyACM0 -f dump.bin --fix-crc Fix CRC before flash %(prog)s -p /dev/ttyACM0 -f dump.bin --dry-run Validate without flashing @@ -429,7 +447,7 @@ def main(): """) parser.add_argument('-p', '--port', required=True, help='Serial port') parser.add_argument('-f', '--file', help='Firmware file to flash') - parser.add_argument('--block', help='Target block: config, firmware, all, bootloader') + parser.add_argument('--block', action='append', help='Target block (repeatable): config, firmware, all, bootloader') parser.add_argument('--baud', default='auto', help='Transfer baud: auto, 57600, 115200, 460800') parser.add_argument('--fix-crc', action='store_true', help='Recalculate and patch CRC') parser.add_argument('--force', action='store_true', help='Flash even with bad CRC') From e67bfcfebcca61aeecdad026416cbfa4818f7e18 Mon Sep 17 00:00:00 2001 From: mkozlowski <10508687+m-kozlowski@users.noreply.github.com> Date: Wed, 25 Feb 2026 00:50:20 +0100 Subject: [PATCH 5/8] Reenter bootloader when flashing multiple blocks Sending completion frame after single block causes reboot, so we need to reenter bootloader. There /should/ be a way to do this in a single pass, but it requires further reverse engineering of the protocol. --- python/resmed_flash.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/python/resmed_flash.py b/python/resmed_flash.py index 58ac9aa..3847684 100755 --- a/python/resmed_flash.py +++ b/python/resmed_flash.py @@ -334,7 +334,7 @@ def flash_block(ser, block_id, data, flash_start, dry_run=False): print(f"\n{'='*60}") print(f" Block: {block_id} ({blk['name']})") - print(f" Flash: 0x{flash_start:08X} — 0x{flash_start + len(data):08X}") + print(f" Flash: 0x{flash_start:08X} - 0x{flash_start + len(data):08X}") print(f" Data: {data_end:,} bytes (trimmed from {len(data):,})") print(f"{'='*60}") @@ -396,7 +396,7 @@ def flash_block(ser, block_id, data, flash_start, dry_run=False): elapsed = time.time() - t0 print(f"\r {offset:,}/{data_end:,} (100%) in {elapsed:.1f}s, {frame_count} frames ") - # Completion frame + # Completion frame print("[*] Sending completion frame...") ser.write(build_completion_frame(block_name, seq)) ser.flush() @@ -554,7 +554,21 @@ def main(): if target != ser.baudrate: switch_baud(ser, target) - for block_id, data, flash_start in jobs: + for i, (block_id, data, flash_start) in enumerate(jobs): + if i > 0: + # Completion frame reset the bootloader. re-enter and re-negotiate + print("\n[*] Re-entering bootloader for next block...") + time.sleep(0.5) + if ser.baudrate != 57600: + ser.baudrate = 57600 + if not enter_bootloader(ser, 10): + return 1 + if args.baud == 'auto': + negotiate_best_baud(ser) + else: + target = int(args.baud) + if target != ser.baudrate: + switch_baud(ser, target) if not flash_block(ser, block_id, data, flash_start): print(f"\n[!] Failed to flash {block_id}") return 1 From a2ea459fe43dad5083e22c9a1fdfcafe74be54c2 Mon Sep 17 00:00:00 2001 From: mkozlowski <10508687+m-kozlowski@users.noreply.github.com> Date: Thu, 26 Feb 2026 00:55:53 +0100 Subject: [PATCH 6/8] resmed_flash.py: SX585-0200 bootloader support - support alternative block layout - cross flashing safety check - sanity checks when trying to flash blocks not including bootloader that does not match currently running image layout - support only SX577-0200 by default as the SX585 platform is currently a minefield. --- python/resmed_flash.py | 345 ++++++++++++++++++++++++++++++----------- 1 file changed, 251 insertions(+), 94 deletions(-) diff --git a/python/resmed_flash.py b/python/resmed_flash.py index 3847684..633135d 100755 --- a/python/resmed_flash.py +++ b/python/resmed_flash.py @@ -21,18 +21,32 @@ import struct -BLOCKS = { - 'BLX': {'flash_start': 0x08000000, 'file_offset': 0x00000, 'size': 0x04000, - 'name': 'Bootloader', 'erase_cmd': 'P F *BLX 0000'}, - 'CCX': {'flash_start': 0x08004000, 'file_offset': 0x04000, 'size': 0x3C000, - 'name': 'Config', 'erase_cmd': 'P F *CCX 0000'}, - 'CDX': {'flash_start': 0x08040000, 'file_offset': 0x40000, 'size': 0xC0000, - 'name': 'Firmware', 'erase_cmd': 'P F *CDX 0000'}, - 'CMX': {'flash_start': 0x08004000, 'file_offset': 0x04000, 'size': 0xFC000, - 'name': 'Config+FW', 'erase_cmd': 'P F *CMX 0000'}, +BLOCK_MAPS = { + 'SX577-0200': { + 'BLX': {'flash_start': 0x08000000, 'file_offset': 0x00000, 'size': 0x04000, + 'name': 'Bootloader', 'erase_cmd': 'P F *BLX 0000'}, + 'CCX': {'flash_start': 0x08004000, 'file_offset': 0x04000, 'size': 0x3C000, + 'name': 'Config', 'erase_cmd': 'P F *CCX 0000'}, + 'CDX': {'flash_start': 0x08040000, 'file_offset': 0x40000, 'size': 0xC0000, + 'name': 'Firmware', 'erase_cmd': 'P F *CDX 0000'}, + 'CMX': {'flash_start': 0x08004000, 'file_offset': 0x04000, 'size': 0xFC000, + 'name': 'Config+FW', 'erase_cmd': 'P F *CMX 0000'}, + }, + 'SX585-0200': { + 'BLX': {'flash_start': 0x08000000, 'file_offset': 0x00000, 'size': 0x04000, + 'name': 'Bootloader', 'erase_cmd': 'P F *BLX 0000'}, + 'CCX': {'flash_start': 0x08004000, 'file_offset': 0x04000, 'size': 0x1C000, + 'name': 'Config', 'erase_cmd': 'P F *CCX 0000'}, + 'CDX': {'flash_start': 0x08020000, 'file_offset': 0x20000, 'size': 0xE0000, + 'name': 'Firmware', 'erase_cmd': 'P F *CDX 0000'}, + 'CMX': {'flash_start': 0x08004000, 'file_offset': 0x04000, 'size': 0xFC000, + 'name': 'Config+FW', 'erase_cmd': 'P F *CMX 0000'}, + }, } -FULL_IMAGE_SIZE = 0x100000 # 1MB +SUPPORTED_BIDS = {'SX577-0200'} + +FULL_IMAGE_SIZE = 0x100000 BLOCK_ALIASES = { 'bootloader': 'BLX', 'boot': 'BLX', 'blx': 'BLX', @@ -137,15 +151,49 @@ def sync_uart(ser): ser.reset_input_buffer() +def _extract_bid(responses): + for r in responses: + if b'BID' in r['payload']: + text = r['payload'].decode('ascii', errors='replace') + if '= ' in text: + return text.split('= ', 1)[1].strip().rstrip('\x00') + parts = text.split('BID ', 1) + if len(parts) > 1: + return parts[1].strip().rstrip('\x00') + return None + +def query_bid(ser): + _, resp = send_cmd(ser, "G S #BID", timeout=0.5, quiet=True) + return _extract_bid(resp) + +def bid_from_image(image_data): + """Extract BID from BLX region of a full image (version string near end of BLX).""" + BLX_SIZE = 0x04000 + if len(image_data) < BLX_SIZE: + return None + blx = image_data[:BLX_SIZE] + for off in range(BLX_SIZE - 16, BLX_SIZE // 2, -1): + chunk = blx[off:off+10] + if (chunk[:2] == b'SX' and chunk[5:6] == b'-' + and all(0x30 <= b <= 0x39 for b in chunk[2:5]) + and all(0x30 <= b <= 0x39 for b in chunk[6:10])): + return chunk.decode('ascii') + return None + +def get_blocks(bid): + return BLOCK_MAPS.get(bid) + + def probe_baud(ser): for rate in PROBE_RATES: ser.baudrate = rate ser.reset_input_buffer() time.sleep(0.05) _, resp = send_cmd(ser, "G S #BID", timeout=0.3, quiet=True) - if any(b'BID' in r['payload'] for r in resp): - return rate - return None + bid = _extract_bid(resp) + if bid: + return rate, bid + return None, None def switch_baud(ser, target, quiet=False): if target not in BDD_RATES or ser.baudrate == target: @@ -248,14 +296,8 @@ def fix_block_crc(data: bytearray, block_id: str) -> int: return crc -SIZE_TO_BLOCK = { - 0x04000: 'BLX', - 0x3C000: 'CCX', - 0xC0000: 'CDX', - 0xFC000: 'CMX', -} -def detect_input(file_data: bytes, block_args: list, include_bootloader: bool): +def detect_input(file_data: bytes, block_args: list, include_bootloader: bool, blocks: dict): """ Determine what to flash based on file size and --block argument(s). Returns list of (block_id, data_bytes, flash_start_addr) tuples. @@ -292,15 +334,11 @@ def detect_input(file_data: bytes, block_args: list, include_bootloader: bool): print(f"[*] Skipping {block_id} (use --include-bootloader to include)") continue - blk = BLOCKS[block_id] + blk = blocks[block_id] if is_full_image: data = file_data[blk['file_offset']:blk['file_offset'] + blk['size']] elif len(unique) == 1 and fsize == blk['size']: - # Single standalone block file data = file_data - elif fsize in SIZE_TO_BLOCK and len(unique) == 1: - print(f"[!] File size {fsize} doesn't match {block_id} ({blk['size']})") - return None else: print(f"[!] Need full image ({FULL_IMAGE_SIZE} bytes) for this operation") return None @@ -318,9 +356,9 @@ def detect_input(file_data: bytes, block_args: list, include_bootloader: bool): CHUNK_SIZE = 250 -def flash_block(ser, block_id, data, flash_start, dry_run=False): +def flash_block(ser, block_id, data, flash_start, blocks, dry_run=False): """Erase and flash a single block. Returns True on success.""" - blk = BLOCKS[block_id] + blk = blocks[block_id] block_name = block_id.encode() # Trim trailing 0xFF @@ -410,7 +448,7 @@ def flash_block(ser, block_id, data, flash_start, dry_run=False): def cmd_info(ser): print("\n[*] Probing device...") - baud = probe_baud(ser) + baud, bid = probe_baud(ser) if not baud: print("[!] Device not responding") return 1 @@ -430,20 +468,19 @@ def main(): epilog=""" Blocks: all BLX + CMX (default) - config (CCX) Configuration data (240KB) - firmware (CDX) Application firmware (768KB) - cmx (CMX) Config + Firmware combined (1008KB) - bootloader (BLX) Bootloader (16KB, requires --include-bootloader) + config (CCX) Configuration data + firmware (CDX) Application firmware + cmx (CMX) Config + Firmware combined + bootloader (BLX) Bootloader (requires --include-bootloader) Examples: - %(prog)s -p /dev/ttyACM0 -f dump.bin Flash CMX (skip BLX) - %(prog)s -p /dev/ttyACM0 -f dump.bin --include-bootloader Flash BLX + CMX - %(prog)s -p /dev/ttyACM0 -f dump.bin --block config Flash config only + %(prog)s -p /dev/ttyACM0 -f dump.bin Flash CMX (skip BLX) + %(prog)s -p /dev/ttyACM0 -f dump.bin --include-bootloader Flash BLX + CMX + %(prog)s -p /dev/ttyACM0 -f dump.bin --block config Flash config only %(prog)s -p /dev/ttyACM0 -f dump.bin --block blx --block cmx --include-bootloader - %(prog)s -p /dev/ttyACM0 -f config.bin Auto-detect 240KB config - %(prog)s -p /dev/ttyACM0 -f dump.bin --fix-crc Fix CRC before flash - %(prog)s -p /dev/ttyACM0 -f dump.bin --dry-run Validate without flashing - %(prog)s -p /dev/ttyACM0 --info Show device info + %(prog)s -p /dev/ttyACM0 -f dump.bin --fix-crc Fix CRC before flash + %(prog)s -p /dev/ttyACM0 -f dump.bin --dry-run Validate without flashing + %(prog)s -p /dev/ttyACM0 --info Show device info """) parser.add_argument('-p', '--port', required=True, help='Serial port') parser.add_argument('-f', '--file', help='Firmware file to flash') @@ -456,6 +493,7 @@ def main(): parser.add_argument('--no-reset', action='store_true', help='Do not reset device after flash') parser.add_argument('--no-enter', action='store_true', help='Skip bootloader entry') parser.add_argument('--info', action='store_true', help='Show device info and exit') + parser.add_argument('--yolo', action='store_true', help=argparse.SUPPRESS) args = parser.parse_args() ser = serial.Serial(args.port, 57600, timeout=1.0) @@ -471,74 +509,194 @@ def main(): file_data = f.read() print(f"[*] Loaded: {args.file} ({len(file_data):,} bytes)") - # detect blocks - jobs = detect_input(file_data, args.block, args.include_bootloader) - if not jobs: - return 1 - - print(f"\n[*] Flash plan:") - for block_id, data, flash_start in jobs: - blk = BLOCKS[block_id] - print(f" {block_id} ({blk['name']}): {len(data):,} bytes @ 0x{flash_start:08X}") - - - print(f"\n[*] CRC validation:") - crc_ok = True - for block_id, data, _ in jobs: - if block_id == 'CMX': - ccx_size = BLOCKS['CCX']['size'] - cdx_offset = BLOCKS['CDX']['file_offset'] - BLOCKS['CMX']['file_offset'] - sub_blocks = [ - ('CCX', 0, ccx_size), - ('CDX', cdx_offset, len(data)), - ] + is_full_image = (len(file_data) == FULL_IMAGE_SIZE) + flashing_blx = args.include_bootloader + + image_bid = None + if is_full_image: + image_bid = bid_from_image(file_data) + if image_bid: + print(f"[*] Image bootloader: {image_bid}") + + blx_only = (args.block is not None and all( + BLOCK_ALIASES.get(a.lower(), a.upper()) == 'BLX' for a in args.block)) + + # --- offline validation (no device contact) --- + # for full images we can resolve layout from image BID alone. + # device BID is only needed later for cross-flash checks. + + if is_full_image and image_bid: + if flashing_blx and not blx_only: + active_bid = image_bid else: - sub_blocks = [(block_id, 0, len(data))] + active_bid = image_bid + else: + # standalone block file -- need device BID to pick layout. + # defer to online phase; use None as sentinel. + active_bid = None + + if active_bid: + blocks = get_blocks(active_bid) + if not blocks: + if not args.yolo: + print(f"[!] Unknown block layout for BID: {active_bid}") + return 1 + print(f"[!] --yolo: unknown BID {active_bid}, no block map available") + return 1 + + if active_bid not in SUPPORTED_BIDS: + if not args.yolo: + print(f"[!] BID {active_bid} is not supported") + return 1 + print(f"[!] --yolo: proceeding with non-whitelisted BID {active_bid}") + + # verify CDX header at expected offset + if is_full_image and flashing_blx and not blx_only: + cdx = blocks['CDX'] + cdx_hdr = file_data[cdx['file_offset']:cdx['file_offset'] + 10] + if not (len(cdx_hdr) == 10 and cdx_hdr[:2] == b'SX' + and cdx_hdr[5:6] == b'-' + and all(0x30 <= b <= 0x39 for b in cdx_hdr[2:5]) + and all(0x30 <= b <= 0x39 for b in cdx_hdr[6:10])): + print(f"[!] No valid CDX header at image offset 0x{cdx['file_offset']:X}") + print(f"[!] Expected SXnnn-nnnn, got: {cdx_hdr}") + print(f"[!] Image layout does not match {active_bid}") + if not args.yolo: + return 1 + + jobs = detect_input(file_data, args.block, args.include_bootloader, blocks) + if not jobs: + return 1 - for sub_id, start, end in sub_blocks: - sub_data = data[start:end] - stored, computed, match = check_block_crc(sub_data, sub_id) - status = "OK" if match else "MISMATCH" - icon = "+" if match else "!" - print(f" [{icon}] {sub_id}: stored=0x{stored:04X} computed=0x{computed:04X} {status}") - - if not match: - if args.fix_crc: - crc = crc16_ccitt(data[start:end-2]) - data[end-2] = (crc >> 8) & 0xFF - data[end-1] = crc & 0xFF - print(f" Fixed CRC -> 0x{crc:04X}") - elif not args.force: - crc_ok = False - - if not crc_ok: - print("\n[!] CRC mismatch. Use --fix-crc to repair or --force to ignore.") - return 1 - - if args.dry_run: - print("\n[DRY RUN] Validation complete. No changes made.") + print(f"\n[*] Flash plan ({active_bid} layout):") for block_id, data, flash_start in jobs: - flash_block(ser, block_id, data, flash_start, dry_run=True) - return 0 + blk = blocks[block_id] + print(f" {block_id} ({blk['name']}): {len(data):,} bytes @ 0x{flash_start:08X}") + + print(f"\n[*] CRC validation:") + crc_ok = True + for block_id, data, _ in jobs: + if block_id == 'CMX': + ccx_size = blocks['CCX']['size'] + cdx_offset = blocks['CDX']['file_offset'] - blocks['CMX']['file_offset'] + sub_blocks = [ + ('CCX', 0, ccx_size), + ('CDX', cdx_offset, len(data)), + ] + else: + sub_blocks = [(block_id, 0, len(data))] + + for sub_id, start, end in sub_blocks: + sub_data = data[start:end] + stored, computed, match = check_block_crc(sub_data, sub_id) + status = "OK" if match else "MISMATCH" + icon = "+" if match else "!" + print(f" [{icon}] {sub_id}: stored=0x{stored:04X} computed=0x{computed:04X} {status}") + + if not match: + if args.fix_crc: + crc = crc16_ccitt(data[start:end-2]) + data[end-2] = (crc >> 8) & 0xFF + data[end-1] = crc & 0xFF + print(f" Fixed CRC -> 0x{crc:04X}") + elif not args.force: + crc_ok = False + + if not crc_ok: + print("\n[!] CRC mismatch. Use --fix-crc to repair or --force to ignore.") + return 1 + + if args.dry_run: + print("\n[DRY RUN] Validation complete. No changes made.") + for block_id, data, flash_start in jobs: + flash_block(ser, block_id, data, flash_start, blocks, dry_run=True) + return 0 + + # --- online phase (device contact required) --- - # baud auto or constant if args.baud == 'auto': print(f"\n[*] Probing device...") - baud = probe_baud(ser) + baud, device_bid = probe_baud(ser) if not baud: print("[!] Device not responding at any known baud rate") return 1 - print(f"[+] Device responding at {baud} baud") + print(f"[+] Device responding at {baud} baud (BID: {device_bid})") ser.baudrate = baud else: init_baud = int(args.baud) ser.baudrate = init_baud print(f"\n[*] Connecting at {init_baud} baud...") _, resp = send_cmd(ser, "G S #BID", timeout=1.0, quiet=True) - if not any(b'BID' in r['payload'] for r in resp): + device_bid = _extract_bid(resp) + if not device_bid: print(f"[!] No response at {init_baud} baud") return 1 - print(f"[+] Device responding at {init_baud} baud") + print(f"[+] Device responding at {init_baud} baud (BID: {device_bid})") + + # if we couldn't resolve layout offline (standalone block file), + # use device BID now + if active_bid is None: + active_bid = device_bid + blocks = get_blocks(active_bid) + if not blocks: + if not args.yolo: + print(f"[!] Unknown block layout for BID: {active_bid}") + return 1 + print(f"[!] --yolo: unknown BID {active_bid}") + return 1 + + if active_bid not in SUPPORTED_BIDS: + if not args.yolo: + print(f"[!] BID {active_bid} is not supported") + return 1 + print(f"[!] --yolo: proceeding with non-whitelisted BID {active_bid}") + + jobs = detect_input(file_data, args.block, args.include_bootloader, blocks) + if not jobs: + return 1 + + print(f"\n[*] Flash plan ({active_bid} layout):") + for block_id, data, flash_start in jobs: + blk = blocks[block_id] + print(f" {block_id} ({blk['name']}): {len(data):,} bytes @ 0x{flash_start:08X}") + + print(f"\n[*] CRC validation:") + crc_ok = True + for block_id, data, _ in jobs: + sub_blocks = [(block_id, 0, len(data))] + for sub_id, start, end in sub_blocks: + sub_data = data[start:end] + stored, computed, match = check_block_crc(sub_data, sub_id) + status = "OK" if match else "MISMATCH" + icon = "+" if match else "!" + print(f" [{icon}] {sub_id}: stored=0x{stored:04X} computed=0x{computed:04X} {status}") + if not match: + if args.fix_crc: + crc = crc16_ccitt(data[start:end-2]) + data[end-2] = (crc >> 8) & 0xFF + data[end-1] = crc & 0xFF + print(f" Fixed CRC -> 0x{crc:04X}") + elif not args.force: + crc_ok = False + + if not crc_ok: + print("\n[!] CRC mismatch. Use --fix-crc to repair or --force to ignore.") + return 1 + + if args.dry_run: + print("\n[DRY RUN] Validation complete. No changes made.") + for block_id, data, flash_start in jobs: + flash_block(ser, block_id, data, flash_start, blocks, dry_run=True) + return 0 + + # cross-flash check (needs both image and device BID) + if is_full_image and image_bid and image_bid != device_bid and not blx_only: + if not args.yolo: + print(f"[!] Cross-flash: image BID={image_bid}, device BID={device_bid}") + return 1 + print(f"[!] --yolo: cross-flashing {image_bid} image onto {device_bid} device") + if not flashing_blx: + print(f"[!] WARNING: not replacing bootloader -- layout mismatch likely") if not args.no_enter: if ser.baudrate != 57600: @@ -561,7 +719,7 @@ def main(): time.sleep(0.5) if ser.baudrate != 57600: ser.baudrate = 57600 - if not enter_bootloader(ser, 10): + if not enter_bootloader(ser, 30): return 1 if args.baud == 'auto': negotiate_best_baud(ser) @@ -569,7 +727,7 @@ def main(): target = int(args.baud) if target != ser.baudrate: switch_baud(ser, target) - if not flash_block(ser, block_id, data, flash_start): + if not flash_block(ser, block_id, data, flash_start, blocks): print(f"\n[!] Failed to flash {block_id}") return 1 @@ -584,6 +742,5 @@ def main(): finally: ser.close() - -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main()) From a7dfb04b4bfba98e3fb5c5604297ec0731a75524 Mon Sep 17 00:00:00 2001 From: mkozlowski <10508687+m-kozlowski@users.noreply.github.com> Date: Mon, 2 Mar 2026 02:40:44 +0100 Subject: [PATCH 7/8] resmed_flash.py: allow flashing block image smaller than expected size --- python/resmed_flash.py | 132 +++++++++++++++++++++++------------------ 1 file changed, 75 insertions(+), 57 deletions(-) diff --git a/python/resmed_flash.py b/python/resmed_flash.py index 633135d..7ff0740 100755 --- a/python/resmed_flash.py +++ b/python/resmed_flash.py @@ -297,7 +297,8 @@ def fix_block_crc(data: bytearray, block_id: str) -> int: -def detect_input(file_data: bytes, block_args: list, include_bootloader: bool, blocks: dict): +def detect_input(file_data: bytes, block_args: list, include_bootloader: bool, blocks: dict, + raw: bool = False): """ Determine what to flash based on file size and --block argument(s). Returns list of (block_id, data_bytes, flash_start_addr) tuples. @@ -339,6 +340,8 @@ def detect_input(file_data: bytes, block_args: list, include_bootloader: bool, b data = file_data[blk['file_offset']:blk['file_offset'] + blk['size']] elif len(unique) == 1 and fsize == blk['size']: data = file_data + elif raw and len(unique) == 1 and fsize <= blk['size']: + data = file_data else: print(f"[!] Need full image ({FULL_IMAGE_SIZE} bytes) for this operation") return None @@ -493,6 +496,7 @@ def main(): parser.add_argument('--no-reset', action='store_true', help='Do not reset device after flash') parser.add_argument('--no-enter', action='store_true', help='Skip bootloader entry') parser.add_argument('--info', action='store_true', help='Show device info and exit') + parser.add_argument('--raw', action='store_true', help='Flash raw image smaller than block size') parser.add_argument('--yolo', action='store_true', help=argparse.SUPPRESS) args = parser.parse_args() @@ -512,6 +516,12 @@ def main(): is_full_image = (len(file_data) == FULL_IMAGE_SIZE) flashing_blx = args.include_bootloader + if args.raw: + if not args.block or len(args.block) != 1: + parser.error("--raw requires exactly one --block") + if args.block[0].lower() == 'all': + parser.error("--raw cannot be used with --block all") + image_bid = None if is_full_image: image_bid = bid_from_image(file_data) @@ -564,7 +574,8 @@ def main(): if not args.yolo: return 1 - jobs = detect_input(file_data, args.block, args.include_bootloader, blocks) + jobs = detect_input(file_data, args.block, args.include_bootloader, blocks, + raw=args.raw) if not jobs: return 1 @@ -573,38 +584,41 @@ def main(): blk = blocks[block_id] print(f" {block_id} ({blk['name']}): {len(data):,} bytes @ 0x{flash_start:08X}") - print(f"\n[*] CRC validation:") - crc_ok = True - for block_id, data, _ in jobs: - if block_id == 'CMX': - ccx_size = blocks['CCX']['size'] - cdx_offset = blocks['CDX']['file_offset'] - blocks['CMX']['file_offset'] - sub_blocks = [ - ('CCX', 0, ccx_size), - ('CDX', cdx_offset, len(data)), - ] - else: - sub_blocks = [(block_id, 0, len(data))] - - for sub_id, start, end in sub_blocks: - sub_data = data[start:end] - stored, computed, match = check_block_crc(sub_data, sub_id) - status = "OK" if match else "MISMATCH" - icon = "+" if match else "!" - print(f" [{icon}] {sub_id}: stored=0x{stored:04X} computed=0x{computed:04X} {status}") - - if not match: - if args.fix_crc: - crc = crc16_ccitt(data[start:end-2]) - data[end-2] = (crc >> 8) & 0xFF - data[end-1] = crc & 0xFF - print(f" Fixed CRC -> 0x{crc:04X}") - elif not args.force: - crc_ok = False - - if not crc_ok: - print("\n[!] CRC mismatch. Use --fix-crc to repair or --force to ignore.") - return 1 + if args.raw: + print(f"\n[*] CRC validation: skipped (raw mode)") + else: + print(f"\n[*] CRC validation:") + crc_ok = True + for block_id, data, _ in jobs: + if block_id == 'CMX': + ccx_size = blocks['CCX']['size'] + cdx_offset = blocks['CDX']['file_offset'] - blocks['CMX']['file_offset'] + sub_blocks = [ + ('CCX', 0, ccx_size), + ('CDX', cdx_offset, len(data)), + ] + else: + sub_blocks = [(block_id, 0, len(data))] + + for sub_id, start, end in sub_blocks: + sub_data = data[start:end] + stored, computed, match = check_block_crc(sub_data, sub_id) + status = "OK" if match else "MISMATCH" + icon = "+" if match else "!" + print(f" [{icon}] {sub_id}: stored=0x{stored:04X} computed=0x{computed:04X} {status}") + + if not match: + if args.fix_crc: + crc = crc16_ccitt(data[start:end-2]) + data[end-2] = (crc >> 8) & 0xFF + data[end-1] = crc & 0xFF + print(f" Fixed CRC -> 0x{crc:04X}") + elif not args.force: + crc_ok = False + + if not crc_ok: + print("\n[!] CRC mismatch. Use --fix-crc to repair or --force to ignore.") + return 1 if args.dry_run: print("\n[DRY RUN] Validation complete. No changes made.") @@ -651,7 +665,8 @@ def main(): return 1 print(f"[!] --yolo: proceeding with non-whitelisted BID {active_bid}") - jobs = detect_input(file_data, args.block, args.include_bootloader, blocks) + jobs = detect_input(file_data, args.block, args.include_bootloader, blocks, + raw=args.raw) if not jobs: return 1 @@ -660,28 +675,31 @@ def main(): blk = blocks[block_id] print(f" {block_id} ({blk['name']}): {len(data):,} bytes @ 0x{flash_start:08X}") - print(f"\n[*] CRC validation:") - crc_ok = True - for block_id, data, _ in jobs: - sub_blocks = [(block_id, 0, len(data))] - for sub_id, start, end in sub_blocks: - sub_data = data[start:end] - stored, computed, match = check_block_crc(sub_data, sub_id) - status = "OK" if match else "MISMATCH" - icon = "+" if match else "!" - print(f" [{icon}] {sub_id}: stored=0x{stored:04X} computed=0x{computed:04X} {status}") - if not match: - if args.fix_crc: - crc = crc16_ccitt(data[start:end-2]) - data[end-2] = (crc >> 8) & 0xFF - data[end-1] = crc & 0xFF - print(f" Fixed CRC -> 0x{crc:04X}") - elif not args.force: - crc_ok = False - - if not crc_ok: - print("\n[!] CRC mismatch. Use --fix-crc to repair or --force to ignore.") - return 1 + if args.raw: + print(f"\n[*] CRC validation: skipped (raw mode)") + else: + print(f"\n[*] CRC validation:") + crc_ok = True + for block_id, data, _ in jobs: + sub_blocks = [(block_id, 0, len(data))] + for sub_id, start, end in sub_blocks: + sub_data = data[start:end] + stored, computed, match = check_block_crc(sub_data, sub_id) + status = "OK" if match else "MISMATCH" + icon = "+" if match else "!" + print(f" [{icon}] {sub_id}: stored=0x{stored:04X} computed=0x{computed:04X} {status}") + if not match: + if args.fix_crc: + crc = crc16_ccitt(data[start:end-2]) + data[end-2] = (crc >> 8) & 0xFF + data[end-1] = crc & 0xFF + print(f" Fixed CRC -> 0x{crc:04X}") + elif not args.force: + crc_ok = False + + if not crc_ok: + print("\n[!] CRC mismatch. Use --fix-crc to repair or --force to ignore.") + return 1 if args.dry_run: print("\n[DRY RUN] Validation complete. No changes made.") From fa8b127c88fa6b673199f026b897e6cc224ee89f Mon Sep 17 00:00:00 2001 From: mkozlowski <10508687+m-kozlowski@users.noreply.github.com> Date: Thu, 5 Mar 2026 20:53:27 +0100 Subject: [PATCH 8/8] resmed_flash.py: add support for s9 platform --- python/resmed_flash.py | 371 +++++++++++++++++++++++++++++++---------- 1 file changed, 280 insertions(+), 91 deletions(-) diff --git a/python/resmed_flash.py b/python/resmed_flash.py index 7ff0740..1e37d6c 100755 --- a/python/resmed_flash.py +++ b/python/resmed_flash.py @@ -3,15 +3,19 @@ ResMed AirSense UART Flash Tool Flashes firmware to ResMed AirSense devices via the UART bootloader protocol. -Supports individual block or full image flashing with CRC validation and -automatic baud rate negotiation up to 460800. +Supports individual block or full image flashing with CRC validation. -Flash memory map: +S10 flash memory map (SX577/SX585): 0x08000000 BLX 16KB Bootloader 0x08004000 CCX 240KB Configuration 0x08040000 CDX 768KB Firmware CMX 1008KB CCX+CDX combined +S9 flash memory map (SX525): + 0x08000000 BLX 12KB Bootloader + 0x08003000 CCX 118KB Configuration + 0x08020800 CDX 894KB Firmware + """ import serial @@ -21,6 +25,19 @@ import struct +# Platform profiles, keyed by bootloader BID prefix. +PLATFORMS = { + 'SX577': {'baud_method': 'bdd', 'default_baud': 57600, + 'baud_rates': {57600: '0000', 115200: '0001', 460800: '0002'}, + 'enter_cmd': 'P S #BLL 0001', 'reset_cmd': 'P S #RES 0001'}, + 'SX585': {'baud_method': 'bdd', 'default_baud': 57600, + 'baud_rates': {57600: '0000', 115200: '0001', 460800: '0002'}, + 'enter_cmd': 'P S #BLL 0001', 'reset_cmd': 'P S #RES 0001'}, + 'SX525': {'baud_method': 'fixed', 'default_baud': 57600, + 'baud_rates': {57600: 'E100'}, + 'enter_cmd': 'P S #RES 0001', 'reset_cmd': None}, +} + BLOCK_MAPS = { 'SX577-0200': { 'BLX': {'flash_start': 0x08000000, 'file_offset': 0x00000, 'size': 0x04000, @@ -42,9 +59,25 @@ 'CMX': {'flash_start': 0x08004000, 'file_offset': 0x04000, 'size': 0xFC000, 'name': 'Config+FW', 'erase_cmd': 'P F *CMX 0000'}, }, + 'SX525-0300': { + 'BLX': {'flash_start': 0x08000000, 'file_offset': 0x00000, 'size': 0x03000, + 'name': 'Bootloader', 'erase_cmd': 'P F *BLX 1C200'}, + 'CCX': {'flash_start': 0x08003000, 'file_offset': 0x03000, 'size': 0x1D800, + 'name': 'Config', 'erase_cmd': 'P F *CCX 1C200'}, + 'CDX': {'flash_start': 0x08020800, 'file_offset': 0x20800, 'size': 0xDF800, + 'name': 'Firmware', 'erase_cmd': 'P F *CDX 1C200'}, + }, + 'SX525-0400': { + 'BLX': {'flash_start': 0x08000000, 'file_offset': 0x00000, 'size': 0x03000, + 'name': 'Bootloader', 'erase_cmd': 'P F *BLX 1C200'}, + 'CCX': {'flash_start': 0x08003000, 'file_offset': 0x03000, 'size': 0x1D800, + 'name': 'Config', 'erase_cmd': 'P F *CCX 1C200'}, + 'CDX': {'flash_start': 0x08020800, 'file_offset': 0x20800, 'size': 0xDF800, + 'name': 'Firmware', 'erase_cmd': 'P F *CDX 1C200'}, + }, } -SUPPORTED_BIDS = {'SX577-0200'} +SUPPORTED_BIDS = {'SX577-0200', 'SX525-0300', 'SX525-0400'} FULL_IMAGE_SIZE = 0x100000 @@ -58,10 +91,15 @@ BDD_RATES = {57600: '0000', 115200: '0001', 460800: '0002'} BDD_RATES_ORDERED = [460800, 115200, 57600] -#PROBE_RATES = [9600, 19200, 57600, 115200, 460800] PROBE_RATES = [57600, 115200, 460800] +def _platform(bid): + if bid and len(bid) >= 5: + return PLATFORMS.get(bid[:5]) + return None + + def crc16_ccitt(data: bytes) -> int: crc = 0xFFFF for b in data: @@ -84,10 +122,10 @@ def build_q_frame(cmd: str) -> bytes: return build_frame('Q', cmd.encode()) def build_f_frame(block_name: bytes, seq: int, records: bytes) -> bytes: - return build_frame('F', block_name + b'\x00' + bytes([seq]) + records) + return build_frame('f', block_name + b'\x00' + bytes([seq]) + records) def build_completion_frame(block_name: bytes, seq: int) -> bytes: - return build_frame('F', block_name + b'F' + bytes([seq])) + return build_frame('f', block_name + b'F' + bytes([seq])) def build_record_03(address: int, data: bytes) -> bytes: length = 4 + len(data) + 1 @@ -168,16 +206,17 @@ def query_bid(ser): def bid_from_image(image_data): """Extract BID from BLX region of a full image (version string near end of BLX).""" - BLX_SIZE = 0x04000 - if len(image_data) < BLX_SIZE: - return None - blx = image_data[:BLX_SIZE] - for off in range(BLX_SIZE - 16, BLX_SIZE // 2, -1): - chunk = blx[off:off+10] - if (chunk[:2] == b'SX' and chunk[5:6] == b'-' - and all(0x30 <= b <= 0x39 for b in chunk[2:5]) - and all(0x30 <= b <= 0x39 for b in chunk[6:10])): - return chunk.decode('ascii') + blx_sizes = sorted(set(b['BLX']['size'] for b in BLOCK_MAPS.values()), reverse=True) + for blx_size in blx_sizes: + if len(image_data) < blx_size: + continue + blx = image_data[:blx_size] + for off in range(blx_size - 16, blx_size // 2, -1): + chunk = blx[off:off+10] + if (chunk[:2] == b'SX' and chunk[5:6] == b'-' + and all(0x30 <= b <= 0x39 for b in chunk[2:5]) + and all(0x30 <= b <= 0x39 for b in chunk[6:10])): + return chunk.decode('ascii') return None def get_blocks(bid): @@ -195,6 +234,25 @@ def probe_baud(ser): return rate, bid return None, None +def wait_for_device(ser, timeout=None): + t0 = time.time() + attempt = 0 + while True: + baud, bid = probe_baud(ser) + if baud: + return baud, bid + attempt += 1 + if attempt == 1: + sys.stdout.write("[*] Waiting for device...") + sys.stdout.flush() + elif attempt % 5 == 0: + sys.stdout.write(".") + sys.stdout.flush() + if timeout and time.time() - t0 > timeout: + print() + return None, None + time.sleep(0.5) + def switch_baud(ser, target, quiet=False): if target not in BDD_RATES or ser.baudrate == target: return ser.baudrate == target @@ -228,9 +286,11 @@ def negotiate_best_baud(ser): return ser.baudrate -def enter_bootloader(ser, max_retries=3): +def enter_bootloader(ser, max_retries=3, enter_cmd='P S #BLL 0001', flood=True): + """Enter bootloader mode. Returns bootloader BID on success, None on failure. + flood=True: S10 style preamble flood to catch short bootloader window + flood=False: S9 style send reset, wait, then poll gently""" bid_frame = build_q_frame("G S #BID") - preamble = b'\x55' * 128 for retry in range(max_retries): if retry > 0: print(f"[*] Retry {retry}/{max_retries-1}...") @@ -242,29 +302,61 @@ def enter_bootloader(ser, max_retries=3): continue print("[*] Triggering reboot...") ser.reset_input_buffer() - #ser.write(build_q_frame("P S #RES 0001")) - #ser.write(build_q_frame("P S #RES 0003")) - ser.write(build_q_frame("P S #BLL 0001")) + ser.write(build_q_frame(enter_cmd)) ser.flush() time.sleep(0.05) ser.reset_input_buffer() - print("[*] Flooding to catch bootloader...") - t0 = time.time() - for _ in range(300): - ser.write(preamble + bid_frame) - _, responses = read_responses(ser, timeout=0.05) - if any(b'BID' in r['payload'] for r in responses): - print(f"[+] Bootloader caught at t+{time.time()-t0:.2f}s") - time.sleep(0.2) + + if flood: + # S10: bootloader window is tight, flood sync+BID to catch it + preamble = b'\x55' * 128 + print("[*] Flooding to catch bootloader...") + t0 = time.time() + for _ in range(300): + ser.write(preamble + bid_frame) + _, responses = read_responses(ser, timeout=0.05) + bl_bid = _extract_bid(responses) + if bl_bid: + print(f"[+] Bootloader caught at t+{time.time()-t0:.2f}s") + time.sleep(0.2) + ser.reset_input_buffer() + return bl_bid + else: + # S9: bootloader can't handle flood, poll with spacing + print("[*] Waiting for bootloader...") + t0 = time.time() + time.sleep(0.5) + for _ in range(60): ser.reset_input_buffer() - return True + ser.write(bid_frame) + ser.flush() + _, responses = read_responses(ser, timeout=0.3) + bl_bid = _extract_bid(responses) + if bl_bid: + print(f"[+] Bootloader caught at t+{time.time()-t0:.2f}s") + time.sleep(0.2) + ser.reset_input_buffer() + return bl_bid + print("[!] Failed to catch bootloader") - return False + return None -def wait_for_erase(ser, block_id, timeout=30.0): - print("[*] Waiting for erase completion...") +def _finish_erase(ser, initial_responses, timeout=30.0): + """Collect P-ACKs until R-frame. Returns baud from R-frame or None on failure.""" t0 = time.time() p_count = 0 + for r in initial_responses: + p = r['payload'].decode('ascii', errors='replace') + if r['type'] == 'P': + p_count += 1 + elif r['type'] == 'R': + print(f" Erase done: {p} ({p_count} ACKs)") + if '= ' in p: + try: + return int(p.split('= ', 1)[1].strip(), 16) + except (ValueError, IndexError): + pass + return 0 while time.time() - t0 < timeout: _, responses = read_responses(ser, timeout=0.5) for r in responses: @@ -274,22 +366,29 @@ def wait_for_erase(ser, block_id, timeout=30.0): sys.stdout.write(f"\r Erase progress: {p_count} ACKs...") sys.stdout.flush() elif r['type'] == 'R': - print(f"\r Erase done [{r['type']}]: {p} ({p_count} ACKs) ") - return True + print(f"\r Erase done: {p} ({p_count} ACKs) ") + if '= ' in p: + try: + return int(p.split('= ', 1)[1].strip(), 16) + except (ValueError, IndexError): + pass + return 0 else: print(f"\n [{r['type']}] {p}") print(f"\n[!] Erase timeout ({p_count} ACKs)") - return False + return None +def wait_for_erase(ser, block_id, timeout=30.0): + print("[*] Waiting for erase completion...") + return _finish_erase(ser, [], timeout=timeout) + def check_block_crc(data: bytes, block_id: str) -> tuple: - """Check CRC of a block. Returns (stored_crc, computed_crc, match).""" stored = (data[-2] << 8) | data[-1] computed = crc16_ccitt(data[:-2]) return stored, computed, stored == computed def fix_block_crc(data: bytearray, block_id: str) -> int: - """Recalculate and patch CRC in last 2 bytes. Returns new CRC.""" crc = crc16_ccitt(data[:-2]) data[-2] = (crc >> 8) & 0xFF data[-1] = crc & 0xFF @@ -305,21 +404,32 @@ def detect_input(file_data: bytes, block_args: list, include_bootloader: bool, b """ fsize = len(file_data) is_full_image = (fsize == FULL_IMAGE_SIZE) + has_cmx = 'CMX' in blocks if block_args: requested = [] for arg in block_args: key = arg.lower() if key == 'all': - requested.extend(['BLX', 'CMX']) + if has_cmx: + requested.extend(['BLX', 'CMX']) + else: + requested.extend(['BLX', 'CCX', 'CDX']) else: block_id = BLOCK_ALIASES.get(key) if not block_id: - print(f"[!] Unknown block '{arg}'. Use: config, firmware, all, bootloader, cmx") + print(f"[!] Unknown block '{arg}'. Use: config, firmware, all, bootloader" + + (", cmx" if has_cmx else "")) + return None + if block_id == 'CMX' and not has_cmx: + print(f"[!] CMX not supported on this platform. Use --block config --block firmware") return None requested.append(block_id) else: - requested = ['BLX', 'CMX'] + if has_cmx: + requested = ['BLX', 'CMX'] + else: + requested = ['BLX', 'CCX', 'CDX'] # Deduplicate preserving order seen = set() @@ -352,7 +462,7 @@ def detect_input(file_data: bytes, block_args: list, include_bootloader: bool, b print("[!] No blocks to flash") return None - # Sort by flash address (BLX before CMX) + # Sort by flash address (BLX before CMX/CCX/CDX) jobs.sort(key=lambda j: j[2]) return jobs @@ -389,7 +499,9 @@ def flash_block(ser, block_id, data, flash_start, blocks, dry_run=False): ser.reset_input_buffer() ser.write(build_q_frame(blk['erase_cmd'])) ser.flush() - if wait_for_erase(ser, block_id, timeout=30.0): + erase_baud = _finish_erase(ser, [], timeout=30.0) + + if erase_baud is not None: break elif attempt < 2: print("[!] Erase stalled, retrying...") @@ -398,6 +510,17 @@ def flash_block(ser, block_id, data, flash_start, blocks, dry_run=False): print("[!] Erase failed") return False + # S9 Bootloader may switch baud after erase (reported in R-frame as hex value). + # E100 = 57600, 1C200 = 115200 + KNOWN_BAUDS = {9600, 19200, 38400, 57600, 115200, 230400, 460800, 921600} + if erase_baud and erase_baud in KNOWN_BAUDS and erase_baud != ser.baudrate: + print(f"[*] Bootloader switched to {erase_baud} baud, following...") + ser.baudrate = erase_baud + + # After the R-frame, the s9 bootloader reconfigures USART + # Any bytes we send during this window get discarded. + time.sleep(0.3) + # WRITE seq = 0 offset = 0 @@ -437,7 +560,7 @@ def flash_block(ser, block_id, data, flash_start, blocks, dry_run=False): elapsed = time.time() - t0 print(f"\r {offset:,}/{data_end:,} (100%) in {elapsed:.1f}s, {frame_count} frames ") - # Completion frame + # Completion frame print("[*] Sending completion frame...") ser.write(build_completion_frame(block_name, seq)) ser.flush() @@ -449,9 +572,14 @@ def flash_block(ser, block_id, data, flash_start, blocks, dry_run=False): return True -def cmd_info(ser): +def cmd_info(ser, wait=False): print("\n[*] Probing device...") - baud, bid = probe_baud(ser) + if wait: + baud, bid = wait_for_device(ser) + if baud: + print() + else: + baud, bid = probe_baud(ser) if not baud: print("[!] Device not responding") return 1 @@ -459,7 +587,7 @@ def cmd_info(ser): print(f"[+] Device responding at {baud} baud") print("\n[*] Device info:") for cmd in ["G S #BID", "G S #SID", "G S #CID", "G S #PCB", - "G S #SRN", "G S #PCD", "G S #PNA"]: + "G S #SRN", "G S #PCD", "G S #PNA", "G S #PST"]: send_cmd(ser, cmd, timeout=0.5) return 0 @@ -470,19 +598,19 @@ def main(): formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Blocks: - all BLX + CMX (default) + all BLX + CMX (S10) or BLX + CCX + CDX (S9) config (CCX) Configuration data firmware (CDX) Application firmware - cmx (CMX) Config + Firmware combined + cmx (CMX) Config + Firmware combined (S10 only) bootloader (BLX) Bootloader (requires --include-bootloader) Examples: - %(prog)s -p /dev/ttyACM0 -f dump.bin Flash CMX (skip BLX) - %(prog)s -p /dev/ttyACM0 -f dump.bin --include-bootloader Flash BLX + CMX + %(prog)s -p /dev/ttyACM0 -f dump.bin Flash CMX/CCX+CDX (skip BLX) + %(prog)s -p /dev/ttyACM0 -f dump.bin --include-bootloader Flash everything %(prog)s -p /dev/ttyACM0 -f dump.bin --block config Flash config only - %(prog)s -p /dev/ttyACM0 -f dump.bin --block blx --block cmx --include-bootloader %(prog)s -p /dev/ttyACM0 -f dump.bin --fix-crc Fix CRC before flash %(prog)s -p /dev/ttyACM0 -f dump.bin --dry-run Validate without flashing + %(prog)s -p /dev/ttyACM0 -f dump.bin --no-wait Fail if device not found %(prog)s -p /dev/ttyACM0 --info Show device info """) parser.add_argument('-p', '--port', required=True, help='Serial port') @@ -497,6 +625,7 @@ def main(): parser.add_argument('--no-enter', action='store_true', help='Skip bootloader entry') parser.add_argument('--info', action='store_true', help='Show device info and exit') parser.add_argument('--raw', action='store_true', help='Flash raw image smaller than block size') + parser.add_argument('--no-wait', action='store_true', help='Fail immediately if device not found') parser.add_argument('--yolo', action='store_true', help=argparse.SUPPRESS) args = parser.parse_args() @@ -504,7 +633,7 @@ def main(): try: if args.info: - return cmd_info(ser) + return cmd_info(ser, wait=not args.no_wait) if not args.file: parser.error("-f/--file is required (unless using --info)") @@ -531,18 +660,14 @@ def main(): blx_only = (args.block is not None and all( BLOCK_ALIASES.get(a.lower(), a.upper()) == 'BLX' for a in args.block)) - # --- offline validation (no device contact) --- + # offline validation (no device contact) # for full images we can resolve layout from image BID alone. # device BID is only needed later for cross-flash checks. if is_full_image and image_bid: - if flashing_blx and not blx_only: - active_bid = image_bid - else: - active_bid = image_bid + active_bid = image_bid else: - # standalone block file -- need device BID to pick layout. - # defer to online phase; use None as sentinel. + # standalone block file need device BID to pick layout. active_bid = None if active_bid: @@ -626,11 +751,16 @@ def main(): flash_block(ser, block_id, data, flash_start, blocks, dry_run=True) return 0 - # --- online phase (device contact required) --- + # online phase (device contact required) if args.baud == 'auto': print(f"\n[*] Probing device...") - baud, device_bid = probe_baud(ser) + if not args.no_wait: + baud, device_bid = wait_for_device(ser) + if baud: + print() + else: + baud, device_bid = probe_baud(ser) if not baud: print("[!] Device not responding at any known baud rate") return 1 @@ -640,8 +770,16 @@ def main(): init_baud = int(args.baud) ser.baudrate = init_baud print(f"\n[*] Connecting at {init_baud} baud...") - _, resp = send_cmd(ser, "G S #BID", timeout=1.0, quiet=True) - device_bid = _extract_bid(resp) + if not args.no_wait: + while True: + _, resp = send_cmd(ser, "G S #BID", timeout=0.5, quiet=True) + device_bid = _extract_bid(resp) + if device_bid: + break + time.sleep(0.5) + else: + _, resp = send_cmd(ser, "G S #BID", timeout=1.0, quiet=True) + device_bid = _extract_bid(resp) if not device_bid: print(f"[!] No response at {init_baud} baud") return 1 @@ -716,44 +854,95 @@ def main(): if not flashing_blx: print(f"[!] WARNING: not replacing bootloader -- layout mismatch likely") + # resolve platform from whatever BID we have + plat = _platform(active_bid) or _platform(device_bid) or {} + enter_cmd = plat.get('enter_cmd', 'P S #BLL 0001') + reset_cmd = plat.get('reset_cmd', 'P S #RES 0001') + default_baud = plat.get('default_baud', 57600) + can_bdd = plat.get('baud_method', 'bdd') == 'bdd' + if not args.no_enter: - if ser.baudrate != 57600: - switch_baud(ser, 57600) - if not enter_bootloader(ser): + if ser.baudrate != default_baud: + if can_bdd: + switch_baud(ser, default_baud) + else: + ser.baudrate = default_baud + + bl_bid = enter_bootloader(ser, enter_cmd=enter_cmd, flood=can_bdd) + if not bl_bid: return 1 - if args.baud == 'auto': - print("\n[*] Negotiating baud rate...") - negotiate_best_baud(ser) - else: - target = int(args.baud) - if target != ser.baudrate: - switch_baud(ser, target) + # active_bid may have come from image file; re-resolve if device BID differs + if bl_bid != active_bid: + bl_blocks = get_blocks(bl_bid) + if bl_blocks: + active_bid = bl_bid + blocks = bl_blocks + plat = _platform(bl_bid) or plat + can_bdd = plat.get('baud_method', 'bdd') == 'bdd' + reset_cmd = plat.get('reset_cmd') + jobs = detect_input(file_data, args.block, args.include_bootloader, + blocks, raw=args.raw) + if not jobs: + return 1 + + if can_bdd: + if args.baud == 'auto': + print("\n[*] Negotiating baud rate...") + negotiate_best_baud(ser) + else: + target = int(args.baud) + if target != ser.baudrate: + switch_baud(ser, target) for i, (block_id, data, flash_start) in enumerate(jobs): if i > 0: - # Completion frame reset the bootloader. re-enter and re-negotiate - print("\n[*] Re-entering bootloader for next block...") - time.sleep(0.5) - if ser.baudrate != 57600: - ser.baudrate = 57600 - if not enter_bootloader(ser, 30): - return 1 - if args.baud == 'auto': - negotiate_best_baud(ser) + if reset_cmd: + # S10: completion frame triggered reset, re-enter bootloader + print("\n[*] Re-entering bootloader for next block...") + time.sleep(0.5) + if ser.baudrate != default_baud: + ser.baudrate = default_baud + if not enter_bootloader(ser, 30, enter_cmd=enter_cmd, flood=can_bdd): + return 1 + if can_bdd: + if args.baud == 'auto': + negotiate_best_baud(ser) + else: + target = int(args.baud) + if target != ser.baudrate: + switch_baud(ser, target) else: - target = int(args.baud) - if target != ser.baudrate: - switch_baud(ser, target) + # No software reset from bootloader. Need power cycle. + ser.baudrate = default_baud + if not args.no_wait: + print(f"\n[*] Power cycle device to continue with {block_id}...") + while _extract_bid(send_cmd(ser, "G S #BID", timeout=0.3, quiet=True)[1]): + time.sleep(0.3) + baud, bid = wait_for_device(ser) + if not baud: + return 1 + print() + ser.baudrate = baud + else: + print(f"\n[*] Power cycle device to continue with {block_id}, then press Enter...") + input() + print("[*] Entering bootloader...") + bl_bid = enter_bootloader(ser, enter_cmd=enter_cmd, flood=False) + if not bl_bid: + return 1 if not flash_block(ser, block_id, data, flash_start, blocks): print(f"\n[!] Failed to flash {block_id}") return 1 if not args.no_reset: - if ser.baudrate != 57600: - switch_baud(ser, 57600) - print("\n[*] Resetting device...") - send_cmd(ser, "P S #RES 0001", timeout=2.0) + if reset_cmd: + if can_bdd and ser.baudrate != default_baud: + switch_baud(ser, default_baud) + print("\n[*] Resetting device...") + send_cmd(ser, reset_cmd, timeout=2.0) + else: + print("\n[*] Power cycle device to boot new firmware") print("\n[+] Flash complete!") return 0