Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP [$500 bounty] test_models: add a test that fuzzes the tx messages #32577

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 142 additions & 4 deletions selfdrive/car/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,34 @@
import importlib
import pytest
import random
import math
import numpy as np
import unittest # noqa: TID251
from collections import defaultdict, Counter
import hypothesis.strategies as st
from hypothesis import Phase, given, settings
from parameterized import parameterized_class

from cereal import messaging, log, car
from cereal.messaging import PubMaster, SubMaster
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params
from openpilot.common.realtime import DT_CTRL
from openpilot.selfdrive.test.fuzzy_generation import FuzzyGenerator
from openpilot.selfdrive.car import gen_empty_fingerprint
from openpilot.selfdrive.car.card import Car
from openpilot.selfdrive.car.fingerprints import all_known_cars, MIGRATION
from openpilot.selfdrive.car.car_helpers import FRAME_FINGERPRINT, interfaces
from openpilot.selfdrive.car.honda.values import CAR as HONDA, HondaFlags
from openpilot.selfdrive.car.ford.values import CAR as FORD, FordFlags
from openpilot.selfdrive.car.hyundai.values import CAR as HYUNDAI, HyundaiFlags
from openpilot.selfdrive.car.honda.values import HondaFlags
from openpilot.selfdrive.car.tests.routes import non_tested_cars, routes, CarTestRoute
from openpilot.selfdrive.car.values import Platform
from openpilot.selfdrive.test.helpers import read_segment_list
from openpilot.selfdrive.test.helpers import read_segment_list, with_processes
from openpilot.system.hardware.hw import DEFAULT_DOWNLOAD_CACHE_ROOT
from openpilot.tools.lib.logreader import LogReader, internal_source, openpilotci_source
from openpilot.tools.lib.route import SegmentName
from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N

from panda.tests.libpanda import libpanda_py

Expand Down Expand Up @@ -154,6 +161,11 @@ def get_testing_data(cls):

@classmethod
def setUpClass(cls):
#from openpilot.selfdrive.car.honda.values import CAR as HONDA
#from openpilot.selfdrive.car.hyundai.values import CAR as HYUNDAI, CANFD_CAR
#if cls.platform is not None and cls.platform not in HYUNDAI:
# print(f"\nskip test for platform {cls.platform}")
# raise unittest.SkipTest
if cls.__name__ == 'TestCarModel' or cls.__name__.endswith('Base'):
raise unittest.SkipTest

Expand Down Expand Up @@ -185,6 +197,15 @@ def setUp(self):
self.CI = self.CarInterface(self.CP.copy(), self.CarController, self.CarState)
assert self.CI

if self._testMethodName == "test_panda_safety_tx_fuzzy":
os.environ["TEST_MODELS"] = "TEST_MODELS"
self.sm = SubMaster(["carControl"])
self.pm = PubMaster(['deviceState', 'pandaStates', 'peripheralState', 'modelV2', 'liveCalibration',
'carOutput', 'driverMonitoringState', 'longitudinalPlan', 'liveLocationKalman',
'managerState', 'liveParameters', 'radarState', 'liveTorqueParameters',
'carState'])

Params().put("CarParams", self.CP.to_bytes())
Params().put_bool("OpenpilotEnabledToggle", self.openpilot_enabled)

# TODO: check safetyModel is in release panda build
Expand All @@ -195,6 +216,7 @@ def setUp(self):
self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}")
self.safety.init_tests()

'''
def test_car_params(self):
if self.CP.dashcamOnly:
self.skipTest("no need to check carParams for dashcamOnly")
Expand Down Expand Up @@ -325,7 +347,7 @@ def test_car_controller(car_control):
@settings(max_examples=MAX_EXAMPLES, deadline=None,
phases=(Phase.reuse, Phase.generate, Phase.shrink))
@given(data=st.data())
def test_panda_safety_carstate_fuzzy(self, data):
def test_panda_safety_rx_fuzzy(self, data):
"""
For each example, pick a random CAN message on the bus and fuzz its data,
checking for panda state mismatches.
Expand Down Expand Up @@ -386,6 +408,122 @@ def test_panda_safety_carstate_fuzzy(self, data):
if self.CP.carName == "honda":
if self.safety.get_acc_main_on() != prev_panda_acc_main_on:
self.assertEqual(CS.cruiseState.available, self.safety.get_acc_main_on())
'''

# Skip stdout/stderr capture with pytest, causes elevated memory usage
@with_processes(["controlsd"])
@settings(max_examples=MAX_EXAMPLES, deadline=None,
phases=(Phase.reuse, Phase.generate, Phase.shrink, Phase.explain))
@given(data=st.data())
def test_panda_safety_tx_fuzzy(self, data):
float32_list_strategy = st.lists(st.floats(width=32, allow_nan=False, allow_infinity=False), min_size=3, max_size=3)
xyzt_data_strategy = st.builds(
log.XYZTData,
y=float32_list_strategy,
)

mv2_send = messaging.new_message("modelV2")
mv2_send.modelV2.meta.laneChangeState = data.draw(st.sampled_from(list(log.LaneChangeState.schema.enumerants.keys())))
mv2_send.modelV2.meta.laneChangeDirection = data.draw(st.sampled_from(list(log.LaneChangeDirection.schema.enumerants.keys())))
mv2_send.modelV2.meta.hardBrakePredicted = data.draw(st.booleans())
mv2_send.modelV2.meta.desirePrediction = data.draw(st.lists(st.floats(min_value=0.0, max_value=1.0, width=32, allow_nan=False, allow_infinity=False), min_size=4, max_size=4))
mv2_send.modelV2.frameDropPerc = data.draw(st.floats(min_value=0.0, max_value=100.0, width=64, allow_nan=False, allow_infinity=False))
mv2_send.modelV2.action.desiredCurvature = data.draw(st.floats(width=32, allow_nan=False, allow_infinity=False))
mv2_send.modelV2.position.y = data.draw(st.lists(st.floats(width=32, allow_nan=False, allow_infinity=False), min_size=3, max_size=3))
mv2_send.modelV2.laneLineProbs = data.draw(st.lists(st.floats(min_value=0.0, max_value=1.0, width=32, allow_nan=False, allow_infinity=False), min_size=3, max_size=3))
mv2_send.modelV2.laneLines = data.draw(st.lists(xyzt_data_strategy, min_size=2, max_size=2))
self.pm.send("modelV2", mv2_send)

lc_send = messaging.new_message("liveCalibration")
lc_send.liveCalibration.calStatus = log.LiveCalibrationData.Status.calibrated
self.pm.send("liveCalibration", lc_send)

co_send = messaging.new_message("carOutput")
co_send.carOutput.actuatorsOutput.steer = data.draw(st.floats(min_value=0.0, max_value=2500.0, width=32, allow_nan=False, allow_infinity=False))
co_send.carOutput.actuatorsOutput.steeringAngleDeg = data.draw(st.floats(min_value=-40.0, max_value=40.0, width=32, allow_nan=False, allow_infinity=False))
self.pm.send("carOutput", co_send)

dms_send = messaging.new_message("driverMonitoringState")
dms_send.driverMonitoringState.events = [FuzzyGenerator.get_random_msg(data.draw, car.CarEvent, real_floats=True) for _ in range(4)]
dms_send.driverMonitoringState.awarenessStatus = data.draw(st.floats(width=32, allow_nan=False, allow_infinity=False))
self.pm.send("driverMonitoringState", dms_send)

lgp_send = messaging.new_message("longitudinalPlan")
lgp_send.longitudinalPlan.fcw = data.draw(st.booleans())
lgp_send.longitudinalPlan.speeds = data.draw(st.lists(st.floats(width=32, allow_nan=False, allow_infinity=False), min_size=CONTROL_N, max_size=CONTROL_N))
lgp_send.longitudinalPlan.accels = data.draw(st.lists(st.floats(width=32, allow_nan=False, allow_infinity=False), min_size=CONTROL_N, max_size=CONTROL_N))
lgp_send.longitudinalPlan.hasLead = data.draw(st.booleans())
self.pm.send("longitudinalPlan", lgp_send)

llk_send = messaging.new_message("liveLocationKalman")
llk_send.liveLocationKalman.posenetOK = data.draw(st.booleans())
llk_send.liveLocationKalman.inputsOK = data.draw(st.booleans())
llk_send.liveLocationKalman.gpsOK = data.draw(st.booleans())
llk_send.liveLocationKalman.angularVelocityCalibrated.value = data.draw(st.lists(st.floats(min_value=-1.0, max_value=1.0, width=64, allow_nan=False, allow_infinity=False), min_size=3, max_size=3))
llk_send.liveLocationKalman.deviceStable = data.draw(st.booleans())
min_v = np.float32(math.radians(0))
max_v = np.float32(math.radians(180))
llk_send.liveLocationKalman.calibratedOrientationNED.value = data.draw(st.lists(st.floats(min_value=min_v, max_value=max_v, width=64, allow_nan=False, allow_infinity=False), min_size=3, max_size=3))
self.pm.send("liveLocationKalman", llk_send)

ms_send = messaging.new_message("managerState")
self.pm.send("managerState", ms_send)

lp_send = messaging.new_message("liveParameters")
lp_send.liveParameters.valid = data.draw(st.booleans())
lp_send.liveParameters.stiffnessFactor = data.draw(st.floats(width=32, allow_nan=False, allow_infinity=False))
lp_send.liveParameters.angleOffsetDeg = data.draw(st.floats(min_value=0.0, max_value=360.0, width=32, allow_nan=False, allow_infinity=False))
lp_send.liveParameters.steerRatio = data.draw(st.floats(min_value=0.0, max_value=1.0, width=32, allow_nan=False, allow_infinity=False))
min_v = np.float32(math.radians(-10))
max_v = np.float32(math.radians(10))
lp_send.liveParameters.roll = data.draw(st.floats(min_value=min_v, max_value=max_v, width=32, allow_nan=False, allow_infinity=False))
self.pm.send("liveParameters", lp_send)

rs_send = messaging.new_message("radarState")
rs_send.radarState.radarErrors = [data.draw(st.sampled_from(list(car.RadarData.Error.schema.enumerants.keys()))) for _ in range(2)]
self.pm.send("radarState", rs_send)

ltp_send = messaging.new_message("liveTorqueParameters")
ltp_send.liveTorqueParameters.useParams = data.draw(st.booleans())
ltp_send.liveTorqueParameters.latAccelFactorFiltered = data.draw(st.floats(width=32, allow_nan=False, allow_infinity=False))
ltp_send.liveTorqueParameters.latAccelOffsetFiltered = data.draw(st.floats(width=32, allow_nan=False, allow_infinity=False))
ltp_send.liveTorqueParameters.frictionCoefficientFiltered = data.draw(st.floats(width=32, allow_nan=False, allow_infinity=False))
self.pm.send("liveTorqueParameters", ltp_send)

button_events_data_strategy = st.builds(
car.CarState.ButtonEvent,
pressed=st.booleans(),
type=st.sampled_from(list(car.CarState.ButtonEvent.Type.schema.enumerants.keys()))
)

cs_send = messaging.new_message("carState")
cs_send.carState.vEgo = data.draw(st.floats(width=32, allow_nan=False, allow_infinity=False))
cs_send.carState.aEgo = data.draw(st.floats(width=32, allow_nan=False, allow_infinity=False))
cs_send.carState.steeringPressed = data.draw(st.booleans())
cs_send.carState.steeringAngleDeg = data.draw(st.floats(min_value=-40.0, max_value=40.0, width=32, allow_nan=False, allow_infinity=False))
cs_send.carState.buttonEvents = data.draw(st.lists(button_events_data_strategy, min_size=2, max_size=2))
cs_send.carState.brakePressed = data.draw(st.booleans())
self.pm.send("carState", cs_send)

self.sm.update(0)
CC = self.sm['carControl']

def tx_hook():
now_nanos = 0
for _ in range(round(10.0 / DT_CTRL)):
self.CI.update(CC, [])
act, sendcan = self.CI.apply(CC, now_nanos)
now_nanos += DT_CTRL * 1e9
for addr, _, dat, bus in sendcan:
to_send = libpanda_py.make_CANPacket(addr, bus % 4, dat)
tx_ok = self.safety.safety_tx_hook(to_send)
self.assertTrue(tx_ok, f"TX issue car {self.CP.carName} addr {hex(addr)} bus {bus}")

self.safety.set_controls_allowed(True)
tx_hook()


'''

def test_panda_safety_carstate(self):
"""
Expand Down Expand Up @@ -473,13 +611,13 @@ def test_panda_safety_carstate(self):
def test_route_on_ci_bucket(self):
self.assertTrue(self.test_route_on_bucket, "Route not on CI bucket. " +
"This is fine to fail for WIP car ports, just let us know and we can upload your routes to the CI bucket.")
'''


@parameterized_class(('platform', 'test_route'), get_test_cases())
@pytest.mark.xdist_group_class_property('test_route')
class TestCarModel(TestCarModelBase):
pass


if __name__ == "__main__":
unittest.main()
5 changes: 4 additions & 1 deletion selfdrive/controls/controlsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

REPLAY = "REPLAY" in os.environ
SIMULATION = "SIMULATION" in os.environ
TEST_MODELS = "TEST_MODELS" in os.environ
TESTING_CLOSET = "TESTING_CLOSET" in os.environ
IGNORE_PROCESSES = {"loggerd", "encoderd", "statsd"}

Expand Down Expand Up @@ -94,6 +95,8 @@ def __init__(self, CI=None):
if REPLAY:
# no vipc in replay will make them ignored anyways
ignore += ['roadCameraState', 'wideRoadCameraState']
if TEST_MODELS:
ignore += [self.camera_packets, self.sensor_packets, 'testJoystick']
self.sm = messaging.SubMaster(['deviceState', 'pandaStates', 'peripheralState', 'modelV2', 'liveCalibration',
'carOutput', 'driverMonitoringState', 'longitudinalPlan', 'liveLocationKalman',
'managerState', 'liveParameters', 'radarState', 'liveTorqueParameters',
Expand Down Expand Up @@ -133,7 +136,7 @@ def __init__(self, CI=None):
elif self.CP.lateralTuning.which() == 'torque':
self.LaC = LatControlTorque(self.CP, self.CI)

self.initialized = False
self.initialized = False if not TEST_MODELS else True
self.state = State.disabled
self.enabled = False
self.active = False
Expand Down
Loading