Skip to content

Commit b0d1b6b

Browse files
committed
feature(test_apps): Added test for measuring CPU load
1 parent 399a3e4 commit b0d1b6b

File tree

10 files changed

+810
-0
lines changed

10 files changed

+810
-0
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# The following lines of boilerplate have to be in your project's
2+
# CMakeLists in this exact order for cmake to work correctly
3+
cmake_minimum_required(VERSION 3.16)
4+
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
5+
6+
# "Trim" the build. Include the minimal set of components, main, and anything it depends on.
7+
set(COMPONENTS main)
8+
9+
project(test_app_cpu_load)
Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
#!/usr/bin/env python3
2+
# SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
# Test has been generated with AI
6+
# Verified by Roman Leonov manually
7+
8+
"""
9+
CDC Echo Integrity Tester
10+
11+
Sends randomly generated data to a CDC-ACM echo device, reads back the echoed
12+
bytes, and verifies integrity by comparing a hash (SHA or HMAC) of the
13+
transmitted payload versus the received payload.
14+
15+
Usage examples:
16+
17+
# Select port explicitly and send 4 MiB in 1 KiB chunks using SHA-256
18+
python cdc_echo_integrity.py --port COM7 --bytes 4MiB --chunk-size 1KiB
19+
20+
# Auto-select by VID:PID (hex) and use HMAC-SHA256 with a key
21+
python cdc_echo_integrity.py --vid 0x303A --pid 0x1001 \
22+
--bytes 2MiB --hmac-key secretkey
23+
24+
# Use SHA-512 and deterministic pseudo-random data with seed
25+
python cdc_echo_integrity.py --port /dev/ttyACM0 --algo sha512 --seed 1234
26+
27+
Exit code 0 on success, non-zero on failure.
28+
"""
29+
from __future__ import annotations
30+
31+
import argparse
32+
import binascii
33+
import os
34+
import sys
35+
import time
36+
import hashlib
37+
import hmac
38+
from dataclasses import dataclass
39+
from typing import Optional, Callable, Tuple
40+
41+
try:
42+
import serial # pyserial
43+
from serial.tools import list_ports
44+
except Exception as e: # pragma: no cover - import guidance
45+
print("This script requires pyserial. Install with: pip install pyserial", file=sys.stderr)
46+
raise
47+
48+
# ------------------------------- Helpers ------------------------------------
49+
50+
SIZE_SUFFIXES = {
51+
"b": 1,
52+
"kb": 1000,
53+
"kib": 1024,
54+
"mb": 1000 ** 2,
55+
"mib": 1024 ** 2,
56+
"gb": 1000 ** 3,
57+
"gib": 1024 ** 3,
58+
}
59+
60+
def parse_size(s: str) -> int:
61+
"""Parse human-readable size like 4096, 4KiB, 1MB, 2MiB."""
62+
s = s.strip().lower()
63+
# plain int
64+
if s.isdigit():
65+
return int(s)
66+
# try suffixes
67+
for suf in sorted(SIZE_SUFFIXES, key=len, reverse=True):
68+
if s.endswith(suf):
69+
num = s[: -len(suf)].strip()
70+
return int(float(num) * SIZE_SUFFIXES[suf])
71+
# accept hex
72+
if s.startswith("0x"):
73+
return int(s, 16)
74+
raise argparse.ArgumentTypeError(f"Invalid size: {s}")
75+
76+
77+
def find_port(vid: Optional[int], pid: Optional[int], explicit_port: Optional[str]) -> str:
78+
if explicit_port:
79+
return explicit_port
80+
if vid is None and pid is None:
81+
raise SystemExit("Either --port or both --vid and --pid must be specified")
82+
candidates = []
83+
for p in list_ports.comports():
84+
if p.vid is None or p.pid is None:
85+
continue
86+
if vid is not None and pid is not None and p.vid == vid and p.pid == pid:
87+
candidates.append(p.device)
88+
if not candidates:
89+
raise SystemExit(f"No port found for VID:PID {vid:#06x}:{pid:#06x}")
90+
if len(candidates) > 1:
91+
print("Multiple matching ports found; using first:", candidates, file=sys.stderr)
92+
return candidates[0]
93+
94+
95+
def make_hashers(algo: str, hmac_key: Optional[bytes]) -> Tuple[Callable[[bytes], None], Callable[[bytes], None], Callable[[], str]]:
96+
"""Return (tx_update, rx_update, hexdigest) closures configured per algo/mode."""
97+
algo = algo.lower()
98+
if hmac_key is not None:
99+
# HMAC mode
100+
try:
101+
tx_h = hmac.new(hmac_key, digestmod=algo)
102+
rx_h = hmac.new(hmac_key, digestmod=algo)
103+
except ValueError:
104+
raise SystemExit(f"Unsupported HMAC algorithm: {algo}")
105+
return tx_h.update, rx_h.update, lambda: (tx_h.hexdigest(), rx_h.hexdigest())
106+
# Plain hash mode
107+
try:
108+
tx_h = hashlib.new(algo)
109+
rx_h = hashlib.new(algo)
110+
except ValueError:
111+
raise SystemExit(f"Unsupported hash algorithm: {algo}")
112+
return tx_h.update, rx_h.update, lambda: (tx_h.hexdigest(), rx_h.hexdigest())
113+
114+
115+
def iter_chunks(buf: bytes, chunk_size: int):
116+
for i in range(0, len(buf), chunk_size):
117+
yield memoryview(buf)[i : i + chunk_size]
118+
119+
120+
@dataclass
121+
class Stats:
122+
total_written: int = 0
123+
total_read: int = 0
124+
start_time: float = 0.0
125+
126+
def start(self):
127+
self.start_time = time.perf_counter()
128+
129+
@property
130+
def elapsed(self) -> float:
131+
return max(1e-9, time.perf_counter() - self.start_time)
132+
133+
@property
134+
def write_mbps(self) -> float:
135+
return (self.total_written * 8) / (1_000_000 * self.elapsed)
136+
137+
@property
138+
def read_mbps(self) -> float:
139+
return (self.total_read * 8) / (1_000_000 * self.elapsed)
140+
141+
142+
# ------------------------------- Core ---------------------------------------
143+
144+
def generate_payload(nbytes: int, seed: Optional[int]) -> bytes:
145+
if seed is None:
146+
return os.urandom(nbytes)
147+
# Deterministic pseudo-random: use a simple stream based on hashlib
148+
out = bytearray()
149+
counter = 0
150+
while len(out) < nbytes:
151+
counter_bytes = counter.to_bytes(8, 'little')
152+
block = hashlib.sha256(seed.to_bytes(8, 'little') + counter_bytes).digest()
153+
out.extend(block)
154+
counter += 1
155+
return bytes(out[:nbytes])
156+
157+
158+
def echo_test(
159+
port: str,
160+
nbytes: int,
161+
chunk_size: int,
162+
algo: str,
163+
hmac_key: Optional[bytes],
164+
baudrate: int,
165+
timeout: float,
166+
seed: Optional[int],
167+
rtscts: bool,
168+
dsrdtr: bool,
169+
xonxoff: bool,
170+
) -> bool:
171+
payload = generate_payload(nbytes, seed)
172+
tx_update, rx_update, hexdigests = make_hashers(algo, hmac_key)
173+
174+
stats = Stats()
175+
176+
with serial.Serial(
177+
port=port,
178+
baudrate=baudrate,
179+
timeout=timeout,
180+
rtscts=rtscts,
181+
dsrdtr=dsrdtr,
182+
xonxoff=xonxoff,
183+
write_timeout=timeout,
184+
) as set:
185+
# Flush any stale bytes
186+
set.reset_input_buffer()
187+
set.reset_output_buffer()
188+
189+
stats.start()
190+
# Write in chunks to avoid huge buffers
191+
for chunk in iter_chunks(payload, chunk_size):
192+
n = set.write(chunk)
193+
if n != len(chunk):
194+
raise SystemExit(f"Short write: wrote {n} of {len(chunk)} bytes")
195+
stats.total_written += n
196+
tx_update(chunk)
197+
198+
# Read back exactly nbytes
199+
received = bytearray()
200+
while len(received) < nbytes:
201+
part = set.read(min(chunk_size, nbytes - len(received)))
202+
if not part:
203+
raise SystemExit(
204+
f"Timeout while reading echoed data: got {len(received)}/{nbytes} bytes"
205+
)
206+
received.extend(part)
207+
stats.total_read += len(part)
208+
rx_update(part)
209+
210+
tx_hex, rx_hex = hexdigests()
211+
212+
ok = (len(received) == len(payload)) and (tx_hex == rx_hex)
213+
214+
print("=== CDC Echo Integrity Test ===")
215+
print(f"Port : {port}")
216+
print(f"Bytes : {nbytes}")
217+
print(f"Chunk size : {chunk_size}")
218+
print(f"Mode : {'HMAC' if hmac_key else 'Hash'} ({algo})")
219+
print(f"Elapsed (s) : {stats.elapsed:.3f}")
220+
print(f"TX rate (Mb/s) : {stats.write_mbps:.3f}")
221+
print(f"RX rate (Mb/s) : {stats.read_mbps:.3f}")
222+
print(f"TX digest : {tx_hex}")
223+
print(f"RX digest : {rx_hex}")
224+
print(f"Result : {'PASS' if ok else 'FAIL'}")
225+
226+
return ok
227+
228+
229+
# ------------------------------- CLI ----------------------------------------
230+
231+
def parse_args(argv=None) -> argparse.Namespace:
232+
p = argparse.ArgumentParser(description="CDC echo integrity tester (SHA or HMAC)")
233+
234+
g_dev = p.add_argument_group("Device selection")
235+
g_dev.add_argument("--port", help="Serial port name, e.g., COM7 or /dev/ttyACM0")
236+
g_dev.add_argument("--vid", type=lambda s: int(s, 0), help="USB VID in hex, e.g., 0x303A")
237+
g_dev.add_argument("--pid", type=lambda s: int(s, 0), help="USB PID in hex, e.g., 0x1001")
238+
239+
g_io = p.add_argument_group("I/O settings")
240+
g_io.add_argument("--baudrate", type=int, default=115200, help="Baudrate (some CDC stacks ignore this)")
241+
g_io.add_argument("--timeout", type=float, default=5.0, help="Read/write timeout in seconds")
242+
g_io.add_argument("--rtscts", action="store_true", help="Enable RTS/CTS flow control")
243+
g_io.add_argument("--dsrdtr", action="store_true", help="Enable DSR/DTR flow control")
244+
g_io.add_argument("--xonxoff", action="store_true", help="Enable XON/XOFF software flow control")
245+
246+
g_data = p.add_argument_group("Test payload")
247+
g_data.add_argument("--bytes", type=parse_size, default="1MiB", help="Total payload size (e.g., 1MiB, 4096)")
248+
g_data.add_argument("--chunk-size", type=parse_size, default="512", help="Chunk size for TX/RX")
249+
g_data.add_argument("--seed", type=int, help="Deterministic pseudo-random payload seed (optional)")
250+
251+
g_hash = p.add_argument_group("Integrity")
252+
g_hash.add_argument("--algo", default="sha256", help="Hash algorithm (e.g., sha256, sha1, sha512)")
253+
g_hash.add_argument(
254+
"--hmac-key",
255+
help="If provided, compute HMAC instead of plain hash. Accepts ASCII or hex with 0x prefix.",
256+
)
257+
258+
return p.parse_args(argv)
259+
260+
261+
def main(argv=None) -> int:
262+
args = parse_args(argv)
263+
264+
port = find_port(args.vid if hasattr(args, 'vid') else None,
265+
args.pid if hasattr(args, 'pid') else None,
266+
args.port if hasattr(args, 'port') else None)
267+
268+
key_bytes: Optional[bytes] = None
269+
if args.hmac_key:
270+
if args.hmac_key.startswith("0x"):
271+
try:
272+
key_bytes = binascii.unhexlify(args.hmac_key[2:])
273+
except binascii.Error:
274+
raise SystemExit("Invalid hex for --hmac-key")
275+
else:
276+
key_bytes = args.hmac_key.encode("utf-8")
277+
278+
ok = echo_test(
279+
port=port,
280+
nbytes=args.bytes,
281+
chunk_size=args.chunk_size,
282+
algo=args.algo,
283+
hmac_key=key_bytes,
284+
baudrate=args.baudrate,
285+
timeout=args.timeout,
286+
seed=args.seed,
287+
rtscts=args.rtscts,
288+
dsrdtr=args.dsrdtr,
289+
xonxoff=args.xonxoff,
290+
)
291+
292+
return 0 if ok else 2
293+
294+
295+
if __name__ == "__main__":
296+
sys.exit(main())
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
idf_component_register(SRC_DIRS .
2+
INCLUDE_DIRS .
3+
REQUIRES unity
4+
WHOLE_ARCHIVE)
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
3+
*
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
#include "soc/soc_caps.h"
8+
9+
#if SOC_USB_OTG_SUPPORTED
10+
//
11+
#include <stdio.h>
12+
#include <string.h>
13+
//
14+
#include "freertos/FreeRTOS.h"
15+
#include "freertos/task.h"
16+
#include "freertos/semphr.h"
17+
//
18+
#include "esp_system.h"
19+
#include "esp_log.h"
20+
#include "esp_err.h"
21+
//
22+
#include "unity.h"
23+
#include "tinyusb.h"
24+
25+
static SemaphoreHandle_t wait_mount = NULL;
26+
27+
#define TUSB_DEVICE_DELAY_MS 5000
28+
29+
void test_device_setup(void)
30+
{
31+
wait_mount = xSemaphoreCreateBinary();
32+
TEST_ASSERT_NOT_NULL(wait_mount);
33+
}
34+
35+
void test_device_teardown(void)
36+
{
37+
TEST_ASSERT_NOT_NULL(wait_mount);
38+
vSemaphoreDelete(wait_mount);
39+
}
40+
41+
void test_device_wait(void)
42+
{
43+
// Wait for tud_mount_cb() to be called (first timeout)
44+
if (xSemaphoreTake(wait_mount, pdMS_TO_TICKS(TUSB_DEVICE_DELAY_MS)) != pdTRUE) {
45+
ESP_LOGW("device timeout!", "Device did not appear in first %d ms, waiting again...", TUSB_DEVICE_DELAY_MS);
46+
// Wait for the second timeout
47+
TEST_ASSERT_EQUAL_MESSAGE(pdTRUE, xSemaphoreTake(wait_mount, pdMS_TO_TICKS(TUSB_DEVICE_DELAY_MS)), "No tusb_mount_cb() after second timeout");
48+
}
49+
// Delay to allow finish the enumeration
50+
// Disable this delay could lead to potential race conditions when the tud_task() is pinned to another CPU
51+
vTaskDelay(pdMS_TO_TICKS(250));
52+
}
53+
54+
/**
55+
* @brief TinyUSB callback for device mount.
56+
*
57+
* @note
58+
* For Linux-based Hosts: Reflects the SetConfiguration() request from the Host Driver.
59+
* For Win-based Hosts: SetConfiguration() request is present only with available Class in device descriptor.
60+
*/
61+
void test_device_event_handler(tinyusb_event_t *event, void *arg)
62+
{
63+
switch (event->id) {
64+
case TINYUSB_EVENT_ATTACHED:
65+
xSemaphoreGive(wait_mount);
66+
break;
67+
case TINYUSB_EVENT_DETACHED:
68+
break;
69+
default:
70+
break;
71+
}
72+
}
73+
74+
#endif // SOC_USB_OTG_SUPPORTED

0 commit comments

Comments
 (0)