From e247147b6c00b955ba199aff328a10f36ceafd9a Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Thu, 31 Jul 2025 23:23:14 -0700 Subject: [PATCH 01/13] this is absolutely slow and done for every process for every msg!!! --- common/prefix.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/common/prefix.py b/common/prefix.py index 762ae70fb48d35..66927d35f30752 100644 --- a/common/prefix.py +++ b/common/prefix.py @@ -14,15 +14,15 @@ def __init__(self, prefix: str = None, clean_dirs_on_exit: bool = True, shared_d self.msgq_path = os.path.join(Paths.shm_path(), self.prefix) self.clean_dirs_on_exit = clean_dirs_on_exit self.shared_download_cache = shared_download_cache + self.dirs_set_up = False def __enter__(self): self.original_prefix = os.environ.get('OPENPILOT_PREFIX', None) os.environ['OPENPILOT_PREFIX'] = self.prefix - try: - os.mkdir(self.msgq_path) - except FileExistsError: - pass - os.makedirs(Paths.log_root(), exist_ok=True) + + if not self.dirs_set_up: + self.dirs_set_up = True + self.setup_dirs() if self.shared_download_cache: os.environ["COMMA_CACHE"] = DEFAULT_DOWNLOAD_CACHE_ROOT @@ -40,6 +40,13 @@ def __exit__(self, exc_type, exc_obj, exc_tb): pass return False + def setup_dirs(self): + try: + os.mkdir(self.msgq_path) + except FileExistsError: + pass + os.makedirs(Paths.log_root(), exist_ok=True) + def clean_dirs(self): symlink_path = Params().get_param_path() if os.path.exists(symlink_path): From edf1ee452dc95cfd05b4e7e84c269919bb62a046 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Thu, 31 Jul 2025 23:25:59 -0700 Subject: [PATCH 02/13] let process replay control itself? --- common/prefix.py | 11 +++++------ selfdrive/test/process_replay/process_replay.py | 3 ++- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/common/prefix.py b/common/prefix.py index 66927d35f30752..207f8477d76e5b 100644 --- a/common/prefix.py +++ b/common/prefix.py @@ -9,20 +9,19 @@ from openpilot.system.hardware.hw import DEFAULT_DOWNLOAD_CACHE_ROOT class OpenpilotPrefix: - def __init__(self, prefix: str = None, clean_dirs_on_exit: bool = True, shared_download_cache: bool = False): + def __init__(self, prefix: str = None, create_dirs_on_enter: bool = True, clean_dirs_on_exit: bool = True, shared_download_cache: bool = False): self.prefix = prefix if prefix else str(uuid.uuid4().hex[0:15]) self.msgq_path = os.path.join(Paths.shm_path(), self.prefix) + self.create_dirs_on_enter = create_dirs_on_enter self.clean_dirs_on_exit = clean_dirs_on_exit self.shared_download_cache = shared_download_cache - self.dirs_set_up = False def __enter__(self): self.original_prefix = os.environ.get('OPENPILOT_PREFIX', None) os.environ['OPENPILOT_PREFIX'] = self.prefix - if not self.dirs_set_up: - self.dirs_set_up = True - self.setup_dirs() + if self.create_dirs_on_enter: + self.create_dirs() if self.shared_download_cache: os.environ["COMMA_CACHE"] = DEFAULT_DOWNLOAD_CACHE_ROOT @@ -40,7 +39,7 @@ def __exit__(self, exc_type, exc_obj, exc_tb): pass return False - def setup_dirs(self): + def create_dirs(self): try: os.mkdir(self.msgq_path) except FileExistsError: diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 288f107437451a..132dda9c212128 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -147,7 +147,7 @@ class ProcessConfig: class ProcessContainer: def __init__(self, cfg: ProcessConfig): - self.prefix = OpenpilotPrefix(clean_dirs_on_exit=False) + self.prefix = OpenpilotPrefix(create_dirs_on_enter=False, clean_dirs_on_exit=False) self.cfg = copy.deepcopy(cfg) self.process = copy.deepcopy(managed_processes[cfg.proc_name]) self.msg_queue: list[capnp._DynamicStructReader] = [] @@ -229,6 +229,7 @@ def start( fingerprint: str | None, capture_output: bool ): with self.prefix as p: + self.prefix.create_dirs() self._setup_env(params_config, environ_config) if self.cfg.config_callback is not None: From ae46fec1eb7d3036523fcebd14872aac852636bf Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Thu, 31 Jul 2025 23:36:25 -0700 Subject: [PATCH 03/13] no timeout for now --- common/timeout.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/common/timeout.py b/common/timeout.py index d0b0ce0630af78..f9d88bcbe433f3 100644 --- a/common/timeout.py +++ b/common/timeout.py @@ -20,8 +20,11 @@ def handle_timeout(self, signume, frame): raise TimeoutException(self.error_msg) def __enter__(self): + return signal.signal(signal.SIGALRM, self.handle_timeout) signal.alarm(self.seconds) def __exit__(self, exc_type, exc_val, exc_tb): + return + # return signal.alarm(0) From 77290ac385ea9d109f5bcde1378f0589029be152 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Thu, 31 Jul 2025 23:36:33 -0700 Subject: [PATCH 04/13] fast radard --- selfdrive/controls/radard.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/selfdrive/controls/radard.py b/selfdrive/controls/radard.py index 98fce1cb26e7b5..b59f57d4eebc64 100755 --- a/selfdrive/controls/radard.py +++ b/selfdrive/controls/radard.py @@ -243,6 +243,7 @@ def update(self, sm: messaging.SubMaster, rr: car.RadarData): self.radar_state.leadTwo = get_lead(self.v_ego, self.ready, self.tracks, leads_v3[1], model_v_ego, low_speed_override=False) def publish(self, pm: messaging.PubMaster): + self.radar_state = log.RadarState.new_message() assert self.radar_state is not None radar_msg = messaging.new_message("radarState") @@ -269,7 +270,7 @@ def main() -> None: while 1: sm.update() - RD.update(sm, sm['liveTracks']) + # RD.update(sm, sm['liveTracks']) RD.publish(pm) From c2085694b75b10ae0dd4944fe1120d32b29b5160 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Thu, 31 Jul 2025 23:37:21 -0700 Subject: [PATCH 05/13] benchmark --- .../test/process_replay/process_replay.py | 51 ++++++++++++----- tools/diff/diff.py | 56 +++++++++++++++++++ 2 files changed, 92 insertions(+), 15 deletions(-) create mode 100755 tools/diff/diff.py diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 132dda9c212128..88151188849a34 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -4,7 +4,7 @@ import copy import heapq import signal -from collections import Counter, OrderedDict +from collections import Counter, OrderedDict, deque from dataclasses import dataclass, field from typing import Any from collections.abc import Callable, Iterable @@ -272,7 +272,7 @@ def run_step(self, msg: capnp._DynamicStructReader, frs: dict[str, FrameReader] assert self.rc and self.pm and self.sockets and self.process.proc output_msgs = [] - with self.prefix, Timeout(self.cfg.timeout, error_msg=f"timed out testing process {repr(self.cfg.proc_name)}"): + with Timeout(self.cfg.timeout, error_msg=f"timed out testing process {repr(self.cfg.proc_name)}"): end_of_cycle = True if self.cfg.should_recv_callback is not None: end_of_cycle = self.cfg.should_recv_callback(msg, self.cfg, self.cnt) @@ -501,7 +501,7 @@ def selfdrived_config_callback(params, cfg, lr): subs=["radarState"], ignore=["logMonoTime"], init_callback=get_car_params_callback, - should_recv_callback=FrequencyBasedRcvCallback("modelV2"), + should_recv_callback=MessageBasedRcvCallback("modelV2"), # it polls ??? ), ProcessConfig( proc_name="plannerd", @@ -656,7 +656,7 @@ def replay_process_with_name(name: str | Iterable[str], lr: LogIterable, *args, def replay_process( cfg: ProcessConfig | Iterable[ProcessConfig], lr: LogIterable, frs: dict[str, FrameReader] = None, fingerprint: str = None, return_all_logs: bool = False, custom_params: dict[str, Any] = None, - captured_output_store: dict[str, dict[str, str]] = None, disable_progress: bool = False + captured_output_store: dict[str, dict[str, str]] = None, disable_progress: bool = False, t=0 ) -> list[capnp._DynamicStructReader]: if isinstance(cfg, Iterable): cfgs = list(cfg) @@ -667,7 +667,9 @@ def replay_process( manager_states=True, panda_states=any("pandaStates" in cfg.pubs for cfg in cfgs), camera_states=any(len(cfg.vision_pubs) != 0 for cfg in cfgs)) - process_logs = _replay_multi_process(cfgs, all_msgs, frs, fingerprint, custom_params, captured_output_store, disable_progress) + # return all_msgs + process_logs = _replay_multi_process(cfgs, all_msgs, frs, fingerprint, custom_params, captured_output_store, disable_progress, t) + return [] if return_all_logs: keys = {m.which() for m in process_logs} @@ -683,7 +685,7 @@ def replay_process( def _replay_multi_process( cfgs: list[ProcessConfig], lr: LogIterable, frs: dict[str, FrameReader] | None, fingerprint: str | None, - custom_params: dict[str, Any] | None, captured_output_store: dict[str, dict[str, str]] | None, disable_progress: bool + custom_params: dict[str, Any] | None, captured_output_store: dict[str, dict[str, str]] | None, disable_progress: bool, t: float ) -> list[capnp._DynamicStructReader]: if fingerprint is not None: params_config = generate_params_config(lr=lr, fingerprint=fingerprint, custom_params=custom_params) @@ -692,55 +694,74 @@ def _replay_multi_process( CP = next((m.carParams for m in lr if m.which() == "carParams"), None) params_config = generate_params_config(lr=lr, CP=CP, custom_params=custom_params) env_config = generate_environ_config(CP=CP) + print('env config', time.monotonic() - t) # validate frs and vision pubs all_vision_pubs = [pub for cfg in cfgs for pub in cfg.vision_pubs] if len(all_vision_pubs) != 0: assert frs is not None, "frs must be provided when replaying process using vision streams" assert all(meta_from_camera_state(st) is not None for st in all_vision_pubs), \ - f"undefined vision stream spotted, probably misconfigured process: (vision pubs: {all_vision_pubs})" + f"undefined vision stream spotted, probably misconfigured process: (vision pubs: {all_vision_pubs})" required_vision_pubs = {m.camera_state for m in available_streams(lr)} & set(all_vision_pubs) assert all(st in frs for st in required_vision_pubs), f"frs for this process must contain following vision streams: {required_vision_pubs}" + print('validate frs', time.monotonic() - t) all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) log_msgs = [] + containers = [] try: - containers = [] for cfg in cfgs: container = ProcessContainer(cfg) containers.append(container) container.start(params_config, env_config, all_msgs, frs, fingerprint, captured_output_store is not None) + print('created containers', time.monotonic() - t) + all_pubs = {pub for container in containers for pub in container.pubs} all_subs = {sub for container in containers for sub in container.subs} lr_pubs = all_pubs - all_subs + print('all_pubs', all_pubs, 'all_subs', all_subs, 'lr_pubs', lr_pubs) pubs_to_containers = {pub: [container for container in containers if pub in container.pubs] for pub in all_pubs} + print('prepared pubs and subs', time.monotonic() - t) + pub_msgs = [msg for msg in all_msgs if msg.which() in lr_pubs] # external queue for messages taken from logs; internal queue for messages generated by processes, which will be republished - external_pub_queue: list[capnp._DynamicStructReader] = pub_msgs.copy() + external_pub_queue: list[capnp._DynamicStructReader] = deque(pub_msgs) internal_pub_queue: list[capnp._DynamicStructReader] = [] # heap for maintaining the order of messages generated by processes, where each element: (logMonoTime, index in internal_pub_queue) internal_pub_index_heap: list[tuple[int, int]] = [] + print('prepared queues', time.monotonic() - t) + pbar = tqdm(total=len(external_pub_queue), disable=disable_progress) + print('starting looping', time.monotonic() - t) while len(external_pub_queue) != 0 or (len(internal_pub_index_heap) != 0 and not all(c.has_empty_queue for c in containers)): + # t = time.monotonic() + # del external_pub_queue[0] + # msg = external_pub_queue.popleft() if len(internal_pub_index_heap) == 0 or (len(external_pub_queue) != 0 and external_pub_queue[0].logMonoTime < internal_pub_index_heap[0][0]): - msg = external_pub_queue.pop(0) + msg = external_pub_queue.popleft() pbar.update(1) else: + raise Exception _, index = heapq.heappop(internal_pub_index_heap) msg = internal_pub_queue[index] + # print('loop pop msg', time.monotonic() - t) target_containers = pubs_to_containers[msg.which()] for container in target_containers: output_msgs = container.run_step(msg, frs) - for m in output_msgs: - if m.which() in all_pubs: - internal_pub_queue.append(m) - heapq.heappush(internal_pub_index_heap, (m.logMonoTime, len(internal_pub_queue) - 1)) - log_msgs.extend(output_msgs) + # for m in output_msgs: + # if m.which() in all_pubs: + # internal_pub_queue.append(m) + # heapq.heappush(internal_pub_index_heap, (m.logMonoTime, len(internal_pub_queue) - 1)) + # log_msgs.extend(output_msgs) + # print('loop run step', time.monotonic() - t) + # print() finally: + print('final internal_pub_queue len', len(internal_pub_queue)) + print('loop finished', time.monotonic() - t) for container in containers: container.stop() if captured_output_store is not None: diff --git a/tools/diff/diff.py b/tools/diff/diff.py new file mode 100755 index 00000000000000..3ec2f330edf5ac --- /dev/null +++ b/tools/diff/diff.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +import argparse +import time +from tqdm import tqdm +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor + +from openpilot.selfdrive.test.process_replay.process_replay import CONFIGS, replay_process +from openpilot.tools.lib.logreader import LogReader, save_log + +SEG_LIST = [ + "d9b97c1d3b8c39b2/0000018b--8a62ed4984/1", +] * 1 + +# these use cameras/run models which are slow +BLACKLIST_PROCS = ['modeld', 'dmonitoringmodeld'] +WHITELIST_PROCS = ['radard'] # TODO: temporary for debugging + + +def replay(cfgs, seg): + inputs = list(LogReader(seg)) + t = time.monotonic() + outputs = replay_process(cfgs, inputs, fingerprint=None, disable_progress=True, t=t) + print(f"\nTotal time: {time.monotonic() - t} seconds") + + # Remove message generated by the process under test and merge in the new messages + produces = {o.which() for o in outputs} + inputs = [i for i in inputs if i.which() not in produces] + outputs = sorted(inputs + outputs, key=lambda x: x.logMonoTime) + return outputs + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="process replay v2", + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + # parser.add_argument("--fingerprint", help="The fingerprint to use") + # parser.add_argument("route", help="The route name to use") + parser.add_argument("-n", type=int, default=8, help="Number of processes to use") + args = parser.parse_args() + + cfgs = [c for c in CONFIGS if c.proc_name not in BLACKLIST_PROCS and c.proc_name in WHITELIST_PROCS] + + t = time.monotonic() + with ProcessPoolExecutor(max_workers=args.n) as executor: + futures = [] + for seg in tqdm(SEG_LIST): + futures.append(executor.submit(replay, cfgs, seg)) + + for future in tqdm(futures): + outputs = future.result() + + print('got', len(outputs), 'output messages') + + # fn = f"diff_{seg.replace('/', '_')}.zst" + # print(f"Saving log to {fn}") + # save_log(fn, outputs) + # print(f"\nTotal time: {time.monotonic() - t} seconds") From 92c08f15dd3480bfbbc1acd5bc57029eb967661b Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Fri, 1 Aug 2025 00:24:08 -0700 Subject: [PATCH 06/13] why tf is this an ordereddict? --- selfdrive/test/process_replay/process_replay.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 88151188849a34..278bba3d589ee9 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -4,7 +4,7 @@ import copy import heapq import signal -from collections import Counter, OrderedDict, deque +from collections import Counter, deque from dataclasses import dataclass, field from typing import Any from collections.abc import Callable, Iterable @@ -79,7 +79,7 @@ def open_context(self): messaging.set_fake_prefix(self.proc_name) if self.main_pub is None: - self.events = OrderedDict() + self.events = {} pubs_with_events = [pub for pub in self.pubs if pub not in self.unlocked_pubs] for pub in pubs_with_events: self.events[pub] = messaging.fake_event_handle(pub, enable=True) @@ -295,6 +295,7 @@ def run_step(self, msg: capnp._DynamicStructReader, frs: dict[str, FrameReader] self.pm.send(m.which(), m.as_builder()) # send frames if needed if self.vipc_server is not None and m.which() in self.cfg.vision_pubs: + raise Exception camera_state = getattr(m, m.which()) camera_meta = meta_from_camera_state(m.which()) assert frs is not None From 74c3d251cee6d0c329c04f2e7bb1742d4f95fa4e Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Fri, 1 Aug 2025 00:55:56 -0700 Subject: [PATCH 07/13] document --- selfdrive/test/process_replay/process_replay.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 278bba3d589ee9..23db1e57bdcfff 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -279,7 +279,18 @@ def run_step(self, msg: capnp._DynamicStructReader, frs: dict[str, FrameReader] self.msg_queue.append(msg) if end_of_cycle: - self.rc.wait_for_recv_called() + # print('proc replay waiting for recv called') + # *** for radard, wait for recv to be called on any socket (modelV2, carState, liveTracks) *** + got = self.rc.wait_for_recv_called() + + # *** goes through each socket that called recv and clears recv_called event, then sets recv ready event *** + self.rc.unlock_sockets() + + # *** wait for recv to be called again? *** + # self.rc.wait_for_next_recv(False) + return [] + # print('proc replay recv called done') + # print() # call recv to let sub-sockets reconnect, after we know the process is ready if self.cnt == 0: From 4e9df4d55f9c11d501bfdc1806c087735795252d Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Fri, 1 Aug 2025 01:09:19 -0700 Subject: [PATCH 08/13] todo --- .../test/process_replay/process_replay.py | 60 ++++++++++--------- 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 23db1e57bdcfff..9b491271b57735 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -108,14 +108,16 @@ def send_sync(self, pm, endpoint, dat): def unlock_sockets(self): expected_sets = len(self.events) + # print('events', self.events) while expected_sets > 0: index = messaging.wait_for_one_event(self.all_recv_called_events) + # print('cleared', index) self.all_recv_called_events[index].clear() self.all_recv_ready_events[index].set() expected_sets -= 1 def wait_for_recv_called(self): - messaging.wait_for_one_event(self.all_recv_called_events) + return messaging.wait_for_one_event(self.all_recv_called_events) def wait_for_next_recv(self, trigger_empty_recv): index = messaging.wait_for_one_event(self.all_recv_called_events) @@ -281,42 +283,41 @@ def run_step(self, msg: capnp._DynamicStructReader, frs: dict[str, FrameReader] if end_of_cycle: # print('proc replay waiting for recv called') # *** for radard, wait for recv to be called on any socket (modelV2, carState, liveTracks) *** - got = self.rc.wait_for_recv_called() - - # *** goes through each socket that called recv and clears recv_called event, then sets recv ready event *** - self.rc.unlock_sockets() - - # *** wait for recv to be called again? *** - # self.rc.wait_for_next_recv(False) - return [] - # print('proc replay recv called done') - # print() + # TODO: I think this is redundant. from the previous run_step call, we will have waited for sm.update to run! + self.rc.wait_for_recv_called() # call recv to let sub-sockets reconnect, after we know the process is ready if self.cnt == 0: for s in self.sockets: messaging.recv_one_or_none(s) + self.cnt += 1 # empty recv on drained pub indicates the end of messages, only do that if there're any trigger_empty_recv = False - if self.cfg.main_pub and self.cfg.main_pub_drained: - trigger_empty_recv = next((True for m in self.msg_queue if m.which() == self.cfg.main_pub), False) - - for m in self.msg_queue: - self.pm.send(m.which(), m.as_builder()) - # send frames if needed - if self.vipc_server is not None and m.which() in self.cfg.vision_pubs: - raise Exception - camera_state = getattr(m, m.which()) - camera_meta = meta_from_camera_state(m.which()) - assert frs is not None - img = frs[m.which()].get(camera_state.frameId) - self.vipc_server.send(camera_meta.stream, img.flatten().tobytes(), - camera_state.frameId, camera_state.timestampSof, camera_state.timestampEof) - self.msg_queue = [] - + # if self.cfg.main_pub and self.cfg.main_pub_drained: + # raise Exception + # trigger_empty_recv = next((True for m in self.msg_queue if m.which() == self.cfg.main_pub), False) + + # for m in self.msg_queue: + # self.pm.send(m.which(), m.as_builder()) + # # send frames if needed + # if self.vipc_server is not None and m.which() in self.cfg.vision_pubs: + # raise Exception + # camera_state = getattr(m, m.which()) + # camera_meta = meta_from_camera_state(m.which()) + # assert frs is not None + # img = frs[m.which()].get(camera_state.frameId) + # self.vipc_server.send(camera_meta.stream, img.flatten().tobytes(), + # camera_state.frameId, camera_state.timestampSof, camera_state.timestampEof) + # self.msg_queue = [] + + # input() + # *** goes through each socket that called recv and clears recv_called event, then sets recv ready event *** self.rc.unlock_sockets() + # input() + # *** wait for recv to be called again? *** self.rc.wait_for_next_recv(trigger_empty_recv) + return [] for socket in self.sockets: ms = messaging.drain_sock(socket) @@ -324,7 +325,6 @@ def run_step(self, msg: capnp._DynamicStructReader, frs: dict[str, FrameReader] m = m.as_builder() m.logMonoTime = msg.logMonoTime + int(self.cfg.processing_time * 1e9) output_msgs.append(m.as_reader()) - self.cnt += 1 assert self.process.proc.is_alive() return output_msgs @@ -668,8 +668,10 @@ def replay_process_with_name(name: str | Iterable[str], lr: LogIterable, *args, def replay_process( cfg: ProcessConfig | Iterable[ProcessConfig], lr: LogIterable, frs: dict[str, FrameReader] = None, fingerprint: str = None, return_all_logs: bool = False, custom_params: dict[str, Any] = None, - captured_output_store: dict[str, dict[str, str]] = None, disable_progress: bool = False, t=0 + captured_output_store: dict[str, dict[str, str]] = None, disable_progress: bool = False, t=None ) -> list[capnp._DynamicStructReader]: + if t is None: + t = time.monotonic() if isinstance(cfg, Iterable): cfgs = list(cfg) else: From 43a19eb9d34f32767e7739605c32dbb74e9cad73 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Fri, 1 Aug 2025 01:23:42 -0700 Subject: [PATCH 09/13] stash --- selfdrive/controls/controlsd.py | 2 +- selfdrive/controls/radard.py | 4 ++ .../test/process_replay/process_replay.py | 62 ++++++++++--------- tools/diff/diff.py | 2 +- 4 files changed, 40 insertions(+), 30 deletions(-) diff --git a/selfdrive/controls/controlsd.py b/selfdrive/controls/controlsd.py index 39687ab72a58d7..a8db2864eb5b13 100755 --- a/selfdrive/controls/controlsd.py +++ b/selfdrive/controls/controlsd.py @@ -180,7 +180,7 @@ def publish(self, CC, lac_log): dat.valid = CS.canValid cs = dat.controlsState - cs.curvature = self.curvature + cs.curvature = 420.69 cs.longitudinalPlanMonoTime = self.sm.logMonoTime['longitudinalPlan'] cs.lateralPlanMonoTime = self.sm.logMonoTime['modelV2'] cs.desiredCurvature = self.desired_curvature diff --git a/selfdrive/controls/radard.py b/selfdrive/controls/radard.py index b59f57d4eebc64..246fed60809fa9 100755 --- a/selfdrive/controls/radard.py +++ b/selfdrive/controls/radard.py @@ -268,10 +268,14 @@ def main() -> None: RD = RadarD(CP.radarDelay) while 1: + # print('radard calling update', flush=True) sm.update() + # print('radard finished calling update', flush=True) # RD.update(sm, sm['liveTracks']) + # print('radard calling RD.update', flush=True) RD.publish(pm) + # print('radard finished calling RD.update', flush=True) if __name__ == "__main__": diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 9b491271b57735..8d34cfcd636aa3 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -292,39 +292,46 @@ def run_step(self, msg: capnp._DynamicStructReader, frs: dict[str, FrameReader] messaging.recv_one_or_none(s) self.cnt += 1 + # *** get output msgs from previous inputs *** + for socket in self.sockets: + ms = messaging.drain_sock(socket) + for m in ms: + m = m.as_builder() + m.logMonoTime = msg.logMonoTime + int(self.cfg.processing_time * 1e9) + output_msgs.append(m.as_reader()) + # empty recv on drained pub indicates the end of messages, only do that if there're any trigger_empty_recv = False # if self.cfg.main_pub and self.cfg.main_pub_drained: # raise Exception # trigger_empty_recv = next((True for m in self.msg_queue if m.which() == self.cfg.main_pub), False) - # for m in self.msg_queue: - # self.pm.send(m.which(), m.as_builder()) - # # send frames if needed - # if self.vipc_server is not None and m.which() in self.cfg.vision_pubs: - # raise Exception - # camera_state = getattr(m, m.which()) - # camera_meta = meta_from_camera_state(m.which()) - # assert frs is not None - # img = frs[m.which()].get(camera_state.frameId) - # self.vipc_server.send(camera_meta.stream, img.flatten().tobytes(), - # camera_state.frameId, camera_state.timestampSof, camera_state.timestampEof) - # self.msg_queue = [] + for m in self.msg_queue: + self.pm.send(m.which(), m.as_builder()) + # send frames if needed + if self.vipc_server is not None and m.which() in self.cfg.vision_pubs: + raise Exception + camera_state = getattr(m, m.which()) + camera_meta = meta_from_camera_state(m.which()) + assert frs is not None + img = frs[m.which()].get(camera_state.frameId) + self.vipc_server.send(camera_meta.stream, img.flatten().tobytes(), + camera_state.frameId, camera_state.timestampSof, camera_state.timestampEof) + self.msg_queue = [] # input() # *** goes through each socket that called recv and clears recv_called event, then sets recv ready event *** self.rc.unlock_sockets() # input() # *** wait for recv to be called again? *** - self.rc.wait_for_next_recv(trigger_empty_recv) - return [] - - for socket in self.sockets: - ms = messaging.drain_sock(socket) - for m in ms: - m = m.as_builder() - m.logMonoTime = msg.logMonoTime + int(self.cfg.processing_time * 1e9) - output_msgs.append(m.as_reader()) + # self.rc.wait_for_next_recv(trigger_empty_recv) + + # for socket in self.sockets: + # ms = messaging.drain_sock(socket) + # for m in ms: + # m = m.as_builder() + # m.logMonoTime = msg.logMonoTime + int(self.cfg.processing_time * 1e9) + # output_msgs.append(m.as_reader()) assert self.process.proc.is_alive() return output_msgs @@ -683,7 +690,6 @@ def replay_process( camera_states=any(len(cfg.vision_pubs) != 0 for cfg in cfgs)) # return all_msgs process_logs = _replay_multi_process(cfgs, all_msgs, frs, fingerprint, custom_params, captured_output_store, disable_progress, t) - return [] if return_all_logs: keys = {m.which() for m in process_logs} @@ -758,7 +764,7 @@ def _replay_multi_process( msg = external_pub_queue.popleft() pbar.update(1) else: - raise Exception + # raise Exception _, index = heapq.heappop(internal_pub_index_heap) msg = internal_pub_queue[index] # print('loop pop msg', time.monotonic() - t) @@ -766,11 +772,11 @@ def _replay_multi_process( target_containers = pubs_to_containers[msg.which()] for container in target_containers: output_msgs = container.run_step(msg, frs) - # for m in output_msgs: - # if m.which() in all_pubs: - # internal_pub_queue.append(m) - # heapq.heappush(internal_pub_index_heap, (m.logMonoTime, len(internal_pub_queue) - 1)) - # log_msgs.extend(output_msgs) + for m in output_msgs: + if m.which() in all_pubs: + internal_pub_queue.append(m) + heapq.heappush(internal_pub_index_heap, (m.logMonoTime, len(internal_pub_queue) - 1)) + log_msgs.extend(output_msgs) # print('loop run step', time.monotonic() - t) # print() finally: diff --git a/tools/diff/diff.py b/tools/diff/diff.py index 3ec2f330edf5ac..d8990c4ea33030 100755 --- a/tools/diff/diff.py +++ b/tools/diff/diff.py @@ -37,7 +37,7 @@ def replay(cfgs, seg): parser.add_argument("-n", type=int, default=8, help="Number of processes to use") args = parser.parse_args() - cfgs = [c for c in CONFIGS if c.proc_name not in BLACKLIST_PROCS and c.proc_name in WHITELIST_PROCS] + cfgs = [c for c in CONFIGS if c.proc_name not in BLACKLIST_PROCS]# and c.proc_name in WHITELIST_PROCS] t = time.monotonic() with ProcessPoolExecutor(max_workers=args.n) as executor: From 811bff03f483836e236d2bf1ba6177f65d984593 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Sat, 2 Aug 2025 01:11:43 -0700 Subject: [PATCH 10/13] fixes --- selfdrive/test/process_replay/process_replay.py | 14 +++----------- tools/diff/diff.py | 2 +- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index f80baaf1c92dd4..a143fe8ea66ac5 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -285,25 +285,17 @@ def run_step(self, msg: capnp._DynamicStructReader, frs: dict[str, FrameReader] self.msg_queue.append(msg) if end_of_cycle: - with Timeout(self.cfg.timeout, error_msg=f"timed out testing process {repr(self.cfg.proc_name)}"): + with self.prefix, Timeout(self.cfg.timeout, error_msg=f"timed out testing process {repr(self.cfg.proc_name)}"): # call recv to let sub-sockets reconnect, after we know the process is ready if self.cnt == 0: for s in self.sockets: messaging.recv_one_or_none(s) self.cnt += 1 - # *** get output msgs from previous inputs *** - for socket in self.sockets: - ms = messaging.drain_sock(socket) - for m in ms: - m = m.as_builder() - m.logMonoTime = msg.logMonoTime + int(self.cfg.processing_time * 1e9) - output_msgs.append(m.as_reader()) - # certain processes use drain_sock. need to cause empty recv to break from this loop trigger_empty_recv = False - # if self.cfg.main_pub and self.cfg.main_pub_drained: - # trigger_empty_recv = any(m.which() == self.cfg.main_pub for m in self.msg_queue) + if self.cfg.main_pub and self.cfg.main_pub_drained: + trigger_empty_recv = any(m.which() == self.cfg.main_pub for m in self.msg_queue) # get output msgs from previous inputs output_msgs = self.get_output_msgs(msg.logMonoTime) diff --git a/tools/diff/diff.py b/tools/diff/diff.py index d8990c4ea33030..cf79426b3baa11 100755 --- a/tools/diff/diff.py +++ b/tools/diff/diff.py @@ -9,7 +9,7 @@ SEG_LIST = [ "d9b97c1d3b8c39b2/0000018b--8a62ed4984/1", -] * 1 +] * 100 # these use cameras/run models which are slow BLACKLIST_PROCS = ['modeld', 'dmonitoringmodeld'] From 75f5ebbdd11228af421aeac0abdeffff032887c7 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Sat, 2 Aug 2025 01:16:46 -0700 Subject: [PATCH 11/13] revert --- selfdrive/controls/radard.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/selfdrive/controls/radard.py b/selfdrive/controls/radard.py index 246fed60809fa9..98fce1cb26e7b5 100755 --- a/selfdrive/controls/radard.py +++ b/selfdrive/controls/radard.py @@ -243,7 +243,6 @@ def update(self, sm: messaging.SubMaster, rr: car.RadarData): self.radar_state.leadTwo = get_lead(self.v_ego, self.ready, self.tracks, leads_v3[1], model_v_ego, low_speed_override=False) def publish(self, pm: messaging.PubMaster): - self.radar_state = log.RadarState.new_message() assert self.radar_state is not None radar_msg = messaging.new_message("radarState") @@ -268,14 +267,10 @@ def main() -> None: RD = RadarD(CP.radarDelay) while 1: - # print('radard calling update', flush=True) sm.update() - # print('radard finished calling update', flush=True) - # RD.update(sm, sm['liveTracks']) - # print('radard calling RD.update', flush=True) + RD.update(sm, sm['liveTracks']) RD.publish(pm) - # print('radard finished calling RD.update', flush=True) if __name__ == "__main__": From e91cbf1bea5fd3fd45abe4e8228b60034b52bdd3 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Sat, 2 Aug 2025 01:19:53 -0700 Subject: [PATCH 12/13] more --- common/timeout.py | 3 +-- selfdrive/test/process_replay/process_replay.py | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/common/timeout.py b/common/timeout.py index f9d88bcbe433f3..f953911b924ea5 100644 --- a/common/timeout.py +++ b/common/timeout.py @@ -20,11 +20,10 @@ def handle_timeout(self, signume, frame): raise TimeoutException(self.error_msg) def __enter__(self): - return + # return signal.signal(signal.SIGALRM, self.handle_timeout) signal.alarm(self.seconds) def __exit__(self, exc_type, exc_val, exc_tb): - return # return signal.alarm(0) diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index a143fe8ea66ac5..90accc0c9a4c9e 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -104,7 +104,7 @@ def unlock_sockets(self): expected_sets -= 1 def wait_for_recv_called(self): - return messaging.wait_for_one_event(self.all_recv_called_events) + messaging.wait_for_one_event(self.all_recv_called_events) def wait_for_next_recv(self, trigger_empty_recv): index = messaging.wait_for_one_event(self.all_recv_called_events) @@ -290,7 +290,6 @@ def run_step(self, msg: capnp._DynamicStructReader, frs: dict[str, FrameReader] if self.cnt == 0: for s in self.sockets: messaging.recv_one_or_none(s) - self.cnt += 1 # certain processes use drain_sock. need to cause empty recv to break from this loop trigger_empty_recv = False From 0e23b42ae884166df8d58f761d4e5bf059058184 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Sat, 2 Aug 2025 11:27:17 -0700 Subject: [PATCH 13/13] fix --- tools/diff/diff.py | 5 +++-- tools/lib/logreader.py | 3 +++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/tools/diff/diff.py b/tools/diff/diff.py index cf79426b3baa11..dd4afd4a439d69 100755 --- a/tools/diff/diff.py +++ b/tools/diff/diff.py @@ -37,7 +37,7 @@ def replay(cfgs, seg): parser.add_argument("-n", type=int, default=8, help="Number of processes to use") args = parser.parse_args() - cfgs = [c for c in CONFIGS if c.proc_name not in BLACKLIST_PROCS]# and c.proc_name in WHITELIST_PROCS] + cfgs = [c for c in CONFIGS if c.proc_name not in BLACKLIST_PROCS and c.proc_name in WHITELIST_PROCS] t = time.monotonic() with ProcessPoolExecutor(max_workers=args.n) as executor: @@ -45,6 +45,7 @@ def replay(cfgs, seg): for seg in tqdm(SEG_LIST): futures.append(executor.submit(replay, cfgs, seg)) + print('hi') for future in tqdm(futures): outputs = future.result() @@ -53,4 +54,4 @@ def replay(cfgs, seg): # fn = f"diff_{seg.replace('/', '_')}.zst" # print(f"Saving log to {fn}") # save_log(fn, outputs) - # print(f"\nTotal time: {time.monotonic() - t} seconds") + print(f"\nTotal time: {time.monotonic() - t} seconds") diff --git a/tools/lib/logreader.py b/tools/lib/logreader.py index 90f6f12756e9e4..9201d460416bf4 100755 --- a/tools/lib/logreader.py +++ b/tools/lib/logreader.py @@ -55,6 +55,7 @@ class CachedReader: __slots__ = ("_evt", "_enum") def __init__(self, evt: capnp._DynamicStructReader): + assert not isinstance(evt, CachedReader), "CachedReader should not be nested" """All capnp attribute accesses are expensive, and which() is often called multiple times""" self._evt = evt self._enum: str | None = None @@ -74,6 +75,8 @@ def which(self) -> str: return self._enum def __getattr__(self, name: str): + if name.startswith("__") and name.endswith("__"): + return super().__getattr__(name) return getattr(self._evt, name)