From cc21772426cd6c95fc991cf3214743b5a60e1522 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Tue, 13 Feb 2024 14:19:19 +0100 Subject: [PATCH 1/3] bring the decoding code for DOPs and parameters closer to the common API "closer" because they still return a (value, cursor_position) tuple instead of just the value. Signed-off-by: Andreas Lauser Signed-off-by: Florian Jost --- odxtools/basicstructure.py | 31 ++++++-------- odxtools/dataobjectproperty.py | 7 +--- odxtools/dopbase.py | 4 +- odxtools/dtcdop.py | 5 +-- odxtools/dynamiclengthfield.py | 4 +- odxtools/endofpdufield.py | 7 +--- odxtools/minmaxlengthtype.py | 4 +- odxtools/multiplexer.py | 45 ++++++++++++--------- odxtools/parameters/parameterwithdop.py | 5 +-- odxtools/parameters/tablekeyparameter.py | 5 +-- odxtools/parameters/tablestructparameter.py | 4 +- odxtools/paramlengthinfotype.py | 2 +- odxtools/standardlengthtype.py | 2 +- tests/test_diag_coded_types.py | 7 ++-- 14 files changed, 59 insertions(+), 73 deletions(-) diff --git a/odxtools/basicstructure.py b/odxtools/basicstructure.py index 5b81a0dd..b4430812 100644 --- a/odxtools/basicstructure.py +++ b/odxtools/basicstructure.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: MIT import warnings from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast from xml.etree import ElementTree from .complexdop import ComplexDop @@ -225,13 +225,7 @@ def convert_physical_to_bytes(self, is_end_of_pdu=encode_state.is_end_of_pdu, ) - def convert_bytes_to_physical(self, - decode_state: DecodeState, - bit_position: int = 0) -> Tuple[ParameterValue, int]: - if bit_position != 0: - raise DecodeError("Structures must be aligned, i.e. bit_position=0, but " - f"{self.short_name} was passed the bit position {bit_position}") - + def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: # move the origin since positions specified by sub-parameters of # structures are relative to the beginning of the structure object. orig_origin = decode_state.origin_byte_position @@ -245,7 +239,7 @@ def convert_bytes_to_physical(self, decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, cursor_byte_position) - # decoding of the structure finished. move back the origin. + # decoding of the structure finished. go back the original origin. decode_state.origin_byte_position = orig_origin return result, decode_state.cursor_byte_position @@ -266,20 +260,21 @@ def encode(self, coded_request: Optional[bytes] = None, **params: ParameterValue is_end_of_pdu=True) def decode(self, message: bytes) -> ParameterValueDict: - # dummy decode state to be passed to convert_bytes_to_physical decode_state = DecodeState(coded_message=message) - param_values, cursor_byte_position = self.convert_bytes_to_physical(decode_state) - if not isinstance(param_values, dict): - odxraise(f"Decoding a structure must result in a dictionary of parameter " - f"values (is {type(param_values)})") - if len(message) != cursor_byte_position: + param_values, _ = self.decode_from_pdu(decode_state) + + if len(message) != decode_state.cursor_byte_position: warnings.warn( - f"The message {message.hex()} is longer than could be parsed." - f" Expected {cursor_byte_position} but got {len(message)}.", + f"The message {message.hex()} probably could not be completely parsed:" + f" Expected length of {decode_state.cursor_byte_position} but got {len(message)}.", DecodeError, stacklevel=1, ) - return param_values + + if not isinstance(param_values, dict): + odxraise("Decoding structures must result in a dictionary") + + return cast(ParameterValueDict, param_values) def parameter_dict(self) -> ParameterDict: """ diff --git a/odxtools/dataobjectproperty.py b/odxtools/dataobjectproperty.py index 5927cb93..b2d3bbeb 100644 --- a/odxtools/dataobjectproperty.py +++ b/odxtools/dataobjectproperty.py @@ -133,17 +133,12 @@ def convert_physical_to_bytes(self, physical_value: Any, encode_state: EncodeSta return self.diag_coded_type.convert_internal_to_bytes( internal_val, encode_state, bit_position=bit_position) - def convert_bytes_to_physical(self, - decode_state: DecodeState, - bit_position: int = 0) -> Tuple[Any, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[Any, int]: """ Convert the internal representation of a value into its physical value. Returns a (physical_value, start_position_of_next_parameter) tuple. """ - odxassert(0 <= bit_position and bit_position < 8) - - decode_state.cursor_bit_position = bit_position internal = self.diag_coded_type.decode_from_pdu(decode_state) if self.compu_method.is_valid_internal_value(internal): diff --git a/odxtools/dopbase.py b/odxtools/dopbase.py index ff04cb10..a6d0b950 100644 --- a/odxtools/dopbase.py +++ b/odxtools/dopbase.py @@ -69,8 +69,6 @@ def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state """Convert the physical value into bytes.""" raise NotImplementedError - def convert_bytes_to_physical(self, - decode_state: DecodeState, - bit_position: int = 0) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: """Extract the bytes from the PDU and convert them to the physical value.""" raise NotImplementedError diff --git a/odxtools/dtcdop.py b/odxtools/dtcdop.py index a8aff015..fa2830a1 100644 --- a/odxtools/dtcdop.py +++ b/odxtools/dtcdop.py @@ -87,11 +87,8 @@ def is_visible(self) -> bool: def linked_dtc_dops(self) -> NamedItemList["DtcDop"]: return self._linked_dtc_dops - def convert_bytes_to_physical(self, - decode_state: DecodeState, - bit_position: int = 0) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: - decode_state.cursor_bit_position = bit_position int_trouble_code = self.diag_coded_type.decode_from_pdu(decode_state) if self.compu_method.is_valid_internal_value(int_trouble_code): diff --git a/odxtools/dynamiclengthfield.py b/odxtools/dynamiclengthfield.py index 44d0b752..c85e90d5 100644 --- a/odxtools/dynamiclengthfield.py +++ b/odxtools/dynamiclengthfield.py @@ -55,7 +55,5 @@ def convert_physical_to_bytes( ) -> bytes: raise NotImplementedError() - def convert_bytes_to_physical(self, - decode_state: DecodeState, - bit_position: int = 0) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: raise NotImplementedError() diff --git a/odxtools/endofpdufield.py b/odxtools/endofpdufield.py index 27694d73..2ea6c020 100644 --- a/odxtools/endofpdufield.py +++ b/odxtools/endofpdufield.py @@ -58,9 +58,7 @@ def convert_physical_to_bytes( coded_message += self.structure.convert_physical_to_bytes(value, encode_state) return coded_message - def convert_bytes_to_physical(self, - decode_state: DecodeState, - bit_position: int = 0) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: cursor_byte_position = decode_state.cursor_byte_position value = [] @@ -69,8 +67,7 @@ def convert_bytes_to_physical(self, # here: it says that the item is repeated until the end of # the PDU, but it means that DOP of the items that are # repeated are identical, not their values - new_value, cursor_byte_position = self.structure.convert_bytes_to_physical( - decode_state, bit_position=0) + new_value, cursor_byte_position = self.structure.decode_from_pdu(decode_state) # Update next byte_position decode_state.cursor_byte_position = cursor_byte_position value.append(new_value) diff --git a/odxtools/minmaxlengthtype.py b/odxtools/minmaxlengthtype.py index 83cdd565..ba2af8ad 100644 --- a/odxtools/minmaxlengthtype.py +++ b/odxtools/minmaxlengthtype.py @@ -100,7 +100,9 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state: return value_bytes - def decode_from_pdu(self, decode_state: DecodeState, bit_position: int = 0) -> AtomicOdxType: + def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: + odxassert(decode_state.cursor_bit_position == 0, + "No bit position can be specified for MIN-MAX-LENGTH-TYPE values.") if decode_state.cursor_byte_position + self.min_length > len(decode_state.coded_message): raise DecodeError("The PDU ended before minimum length was reached.") diff --git a/odxtools/multiplexer.py b/odxtools/multiplexer.py index dd007e7d..e3140834 100644 --- a/odxtools/multiplexer.py +++ b/odxtools/multiplexer.py @@ -1,5 +1,4 @@ # SPDX-License-Identifier: MIT -from copy import copy from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast from xml.etree import ElementTree @@ -7,12 +6,12 @@ from .complexdop import ComplexDop from .decodestate import DecodeState from .encodestate import EncodeState -from .exceptions import DecodeError, EncodeError, odxrequire +from .exceptions import DecodeError, EncodeError, odxraise, odxrequire from .multiplexercase import MultiplexerCase from .multiplexerdefaultcase import MultiplexerDefaultCase from .multiplexerswitchkey import MultiplexerSwitchKey from .odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId -from .odxtypes import AtomicOdxType, ParameterValue, odxstr_to_bool +from .odxtypes import ParameterValue, odxstr_to_bool from .utils import dataclass_fields_asdict if TYPE_CHECKING: @@ -65,10 +64,14 @@ def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) -> def is_visible(self) -> bool: return self.is_visible_raw is True - def _get_case_limits(self, case: MultiplexerCase) -> Tuple[AtomicOdxType, AtomicOdxType]: + def _get_case_limits(self, case: MultiplexerCase) -> Tuple[int, int]: key_type = self.switch_key.dop.physical_type.base_data_type lower_limit = key_type.make_from(case.lower_limit) upper_limit = key_type.make_from(case.upper_limit) + if not isinstance(lower_limit, int): + odxraise("Bounds of limits must be integers") + if not isinstance(upper_limit, int): + odxraise("Bounds of limits must be integers") return lower_limit, upper_limit def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state: EncodeState, @@ -106,20 +109,21 @@ def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state raise EncodeError(f"The case {case_name} is not found in Multiplexer {self.short_name}") - def convert_bytes_to_physical(self, - decode_state: DecodeState, - bit_position: int = 0) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: - if bit_position != 0: - raise DecodeError("Multiplexers must be byte-aligned, i.e. bit_position=0, but " - f"{self.short_name} was passed the bit position {bit_position}") - key_value, key_next_byte = self.switch_key.dop.convert_bytes_to_physical(decode_state) - - case_decode_state = copy(decode_state) + # multiplexers are structures and thus the origin position + # must be moved to the start of the multiplexer + orig_origin = decode_state.origin_byte_position if self.byte_position is not None: - case_decode_state.origin_byte_position = decode_state.origin_byte_position + self.byte_position + decode_state.origin_byte_position = decode_state.origin_byte_position + self.byte_position else: - case_decode_state.origin_byte_position = decode_state.cursor_byte_position + decode_state.origin_byte_position = decode_state.cursor_byte_position + + key_value, key_next_byte = self.switch_key.dop.decode_from_pdu(decode_state) + + if not isinstance(key_value, int): + odxraise(f"Multiplexer keys must be integers (is '{type(key_value).__name__}'" + f" for multiplexer '{self.short_name}')") case_found = False case_next_byte = 0 @@ -129,15 +133,14 @@ def convert_bytes_to_physical(self, if lower <= key_value and key_value <= upper: case_found = True if case._structure: - case_value, case_next_byte = case._structure.convert_bytes_to_physical( - case_decode_state) + case_value, case_next_byte = case._structure.decode_from_pdu(decode_state) break if not case_found and self.default_case is not None: case_found = True if self.default_case._structure: - case_value, case_next_byte = self.default_case._structure.convert_bytes_to_physical( - case_decode_state) + case_value, case_next_byte = self.default_case._structure.decode_from_pdu( + decode_state) if not case_found: raise DecodeError( @@ -146,6 +149,10 @@ def convert_bytes_to_physical(self, mux_value = {case.short_name: cast(ParameterValue, case_value)} mux_next_byte = decode_state.cursor_byte_position + max( key_next_byte + self.switch_key.byte_position, case_next_byte + self.byte_position) + + # go back to the original origin + decode_state.origin_byte_position = orig_origin + return mux_value, mux_next_byte def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: diff --git a/odxtools/parameters/parameterwithdop.py b/odxtools/parameters/parameterwithdop.py index 1423274f..bb46d5ba 100644 --- a/odxtools/parameters/parameterwithdop.py +++ b/odxtools/parameters/parameterwithdop.py @@ -80,11 +80,10 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, in if (pos := getattr(self, "byte_position", None)) is not None: decode_state.cursor_byte_position = decode_state.origin_byte_position + pos - bit_position = self.bit_position or 0 + decode_state.cursor_bit_position = self.bit_position or 0 # Use DOP to decode - phys_val, cursor_byte_position = self.dop.convert_bytes_to_physical( - decode_state, bit_position=bit_position) + phys_val, cursor_byte_position = self.dop.decode_from_pdu(decode_state) decode_state.cursor_byte_position = max(orig_cursor_pos, cursor_byte_position) diff --git a/odxtools/parameters/tablekeyparameter.py b/odxtools/parameters/tablekeyparameter.py index 8f410599..96efcb49 100644 --- a/odxtools/parameters/tablekeyparameter.py +++ b/odxtools/parameters/tablekeyparameter.py @@ -147,9 +147,8 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, in else: # Use DOP to decode key_dop = odxrequire(self.table.key_dop) - bit_position_int = self.bit_position if self.bit_position is not None else 0 - key_dop_val, cursor_byte_position = key_dop.convert_bytes_to_physical( - decode_state, bit_position=bit_position_int) + decode_state.cursor_bit_position = self.bit_position or 0 + key_dop_val, cursor_byte_position = key_dop.decode_from_pdu(decode_state) table_row_candidates = [x for x in self.table.table_rows if x.key == key_dop_val] if len(table_row_candidates) == 0: diff --git a/odxtools/parameters/tablestructparameter.py b/odxtools/parameters/tablestructparameter.py index 2539401b..6efab431 100644 --- a/odxtools/parameters/tablestructparameter.py +++ b/odxtools/parameters/tablestructparameter.py @@ -146,10 +146,10 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, in # Use DOP or structure to decode the value if table_row.dop is not None: dop = table_row.dop - val, i = dop.convert_bytes_to_physical(decode_state) + val, i = dop.decode_from_pdu(decode_state) return (table_row.short_name, val), i elif table_row.structure is not None: - val, i = table_row.structure.convert_bytes_to_physical(decode_state) + val, i = table_row.structure.decode_from_pdu(decode_state) return (table_row.short_name, val), i else: # the table row associated with the key neither defines a diff --git a/odxtools/paramlengthinfotype.py b/odxtools/paramlengthinfotype.py index 7351490f..3819e314 100644 --- a/odxtools/paramlengthinfotype.py +++ b/odxtools/paramlengthinfotype.py @@ -74,7 +74,7 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state: is_highlow_byte_order=self.is_highlow_byte_order, ) - def decode_from_pdu(self, decode_state: DecodeState, bit_position: int = 0) -> AtomicOdxType: + def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: # Find length key with matching ID. bit_length = 0 diff --git a/odxtools/standardlengthtype.py b/odxtools/standardlengthtype.py index c458bc6f..63a03e8b 100644 --- a/odxtools/standardlengthtype.py +++ b/odxtools/standardlengthtype.py @@ -56,7 +56,7 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state: is_highlow_byte_order=self.is_highlow_byte_order, ) - def decode_from_pdu(self, decode_state: DecodeState, bit_position: int = 0) -> AtomicOdxType: + def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: internal_value, cursor_byte_position = self._extract_internal_value( decode_state.coded_message, decode_state.cursor_byte_position, diff --git a/tests/test_diag_coded_types.py b/tests/test_diag_coded_types.py index e1c72536..e256b559 100644 --- a/tests/test_diag_coded_types.py +++ b/tests/test_diag_coded_types.py @@ -50,10 +50,9 @@ def test_decode_leading_length_info_type_bytefield(self) -> None: base_type_encoding=None, is_highlow_byte_order_raw=None, ) - state = DecodeState( - bytes([0x1, 0xC2, 0x3, 0x4]), cursor_byte_position=1, cursor_bit_position=1) - # 0xC2 = 11000010, with bit_position=1 and bit_lenth=5, the extracted bits are 00001, + # 0xC2 = 0b11000010, with bit_position=1 and bit_lenth=5, the extracted bits are 00001, # i.e. the leading length is 1, i.e. only the byte 0x3 should be extracted. + state = DecodeState(bytes([0x1, 0xC2, 0x3, 0x4]), cursor_byte_position=1, cursor_bit_position=1) internal_value = dct.decode_from_pdu(state) self.assertEqual(internal_value, bytes([0x3])) @@ -644,7 +643,7 @@ def test_decode_min_max_length_type_end_of_pdu(self) -> None: is_highlow_byte_order_raw=None, ) state = DecodeState(bytes([0x12, 0x34, 0x56, 0x78, 0x9A]), cursor_byte_position=1) - internal_value = dct.decode_from_pdu(state, bit_position=0) + internal_value = dct.decode_from_pdu(state) self.assertEqual(internal_value, bytes([0x34, 0x56, 0x78, 0x9A])) self.assertEqual(state.cursor_byte_position, 5) From c7a2d57972b51fb7e156f4b1b4ff979114782efa Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Tue, 13 Feb 2024 14:19:49 +0100 Subject: [PATCH 2/3] parameters.decode_from_pdu(): only return the value this used to be a (value, cursor_position) tuple. With this all decode_from_pdu() functions behave the same: they take a DecodeState object and return the value which corresponds to this object. Also, the DecodeState passed is updated internally to the location where the next item to be decoded is supposed to be. Signed-off-by: Andreas Lauser Signed-off-by: Florian Jost --- odxtools/basicstructure.py | 12 +++--- odxtools/dataobjectproperty.py | 7 ++-- odxtools/dopbase.py | 4 +- odxtools/dtcdop.py | 8 ++-- odxtools/dynamiclengthfield.py | 4 +- odxtools/endofpdufield.py | 18 ++++----- odxtools/multiplexer.py | 38 ++++++++----------- odxtools/parameters/codedconstparameter.py | 6 +-- odxtools/parameters/dynamicparameter.py | 3 +- odxtools/parameters/lengthkeyparameter.py | 8 ++-- .../parameters/matchingrequestparameter.py | 27 ++++++++----- odxtools/parameters/nrcconstparameter.py | 17 +++++---- odxtools/parameters/parameter.py | 4 +- odxtools/parameters/parameterwithdop.py | 16 ++++---- .../parameters/physicalconstantparameter.py | 13 ++++--- odxtools/parameters/reservedparameter.py | 22 +++++------ odxtools/parameters/systemparameter.py | 3 +- odxtools/parameters/tableentryparameter.py | 3 +- odxtools/parameters/tablekeyparameter.py | 16 ++++---- odxtools/parameters/tablestructparameter.py | 27 ++++++------- tests/test_decoding.py | 2 +- tests/test_diag_coded_types.py | 3 +- tests/test_somersault.py | 8 +--- 23 files changed, 133 insertions(+), 136 deletions(-) diff --git a/odxtools/basicstructure.py b/odxtools/basicstructure.py index b4430812..b4881dc6 100644 --- a/odxtools/basicstructure.py +++ b/odxtools/basicstructure.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: MIT import warnings from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast +from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast from xml.etree import ElementTree from .complexdop import ComplexDop @@ -225,7 +225,7 @@ def convert_physical_to_bytes(self, is_end_of_pdu=encode_state.is_end_of_pdu, ) - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: # move the origin since positions specified by sub-parameters of # structures are relative to the beginning of the structure object. orig_origin = decode_state.origin_byte_position @@ -233,16 +233,14 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, in result = {} for param in self.parameters: - value, cursor_byte_position = param.decode_from_pdu(decode_state) + value = param.decode_from_pdu(decode_state) result[param.short_name] = value - decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, - cursor_byte_position) # decoding of the structure finished. go back the original origin. decode_state.origin_byte_position = orig_origin - return result, decode_state.cursor_byte_position + return result def encode(self, coded_request: Optional[bytes] = None, **params: ParameterValue) -> bytes: """ @@ -261,7 +259,7 @@ def encode(self, coded_request: Optional[bytes] = None, **params: ParameterValue def decode(self, message: bytes) -> ParameterValueDict: decode_state = DecodeState(coded_message=message) - param_values, _ = self.decode_from_pdu(decode_state) + param_values = self.decode_from_pdu(decode_state) if len(message) != decode_state.cursor_byte_position: warnings.warn( diff --git a/odxtools/dataobjectproperty.py b/odxtools/dataobjectproperty.py index b2d3bbeb..118a9355 100644 --- a/odxtools/dataobjectproperty.py +++ b/odxtools/dataobjectproperty.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast +from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast from xml.etree import ElementTree from .compumethods.compumethod import CompuMethod @@ -133,7 +133,7 @@ def convert_physical_to_bytes(self, physical_value: Any, encode_state: EncodeSta return self.diag_coded_type.convert_internal_to_bytes( internal_val, encode_state, bit_position=bit_position) - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[Any, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: """ Convert the internal representation of a value into its physical value. @@ -142,8 +142,7 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[Any, int]: internal = self.diag_coded_type.decode_from_pdu(decode_state) if self.compu_method.is_valid_internal_value(internal): - return self.compu_method.convert_internal_to_physical( - internal), decode_state.cursor_byte_position + return self.compu_method.convert_internal_to_physical(internal) else: # TODO: How to prevent this? raise DecodeError( diff --git a/odxtools/dopbase.py b/odxtools/dopbase.py index a6d0b950..5e019c7b 100644 --- a/odxtools/dopbase.py +++ b/odxtools/dopbase.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional from xml.etree import ElementTree from .admindata import AdminData @@ -69,6 +69,6 @@ def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state """Convert the physical value into bytes.""" raise NotImplementedError - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: """Extract the bytes from the PDU and convert them to the physical value.""" raise NotImplementedError diff --git a/odxtools/dtcdop.py b/odxtools/dtcdop.py index fa2830a1..39bca96a 100644 --- a/odxtools/dtcdop.py +++ b/odxtools/dtcdop.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: MIT # from dataclasses import dataclass, field from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast from xml.etree import ElementTree from .compumethods.compumethod import CompuMethod @@ -87,7 +87,7 @@ def is_visible(self) -> bool: def linked_dtc_dops(self) -> NamedItemList["DtcDop"]: return self._linked_dtc_dops - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: int_trouble_code = self.diag_coded_type.decode_from_pdu(decode_state) @@ -107,7 +107,7 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, in if len(dtcs) == 1: # we found exactly one described DTC - return dtcs[0], decode_state.cursor_byte_position + return dtcs[0] # the DTC was not specified. This probably means that the # diagnostic description file is incomplete. We do not bail @@ -126,7 +126,7 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, in sdgs=[], ) - return dtc, decode_state.cursor_byte_position + return dtc def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state: EncodeState, bit_position: int) -> bytes: diff --git a/odxtools/dynamiclengthfield.py b/odxtools/dynamiclengthfield.py index c85e90d5..48c39e40 100644 --- a/odxtools/dynamiclengthfield.py +++ b/odxtools/dynamiclengthfield.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Tuple +from typing import TYPE_CHECKING, Any, Dict, List from xml.etree import ElementTree from .decodestate import DecodeState @@ -55,5 +55,5 @@ def convert_physical_to_bytes( ) -> bytes: raise NotImplementedError() - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: raise NotImplementedError() diff --git a/odxtools/endofpdufield.py b/odxtools/endofpdufield.py index 2ea6c020..36527e6a 100644 --- a/odxtools/endofpdufield.py +++ b/odxtools/endofpdufield.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import List, Optional, Tuple +from typing import List, Optional from xml.etree import ElementTree from .decodestate import DecodeState @@ -58,18 +58,16 @@ def convert_physical_to_bytes( coded_message += self.structure.convert_physical_to_bytes(value, encode_state) return coded_message - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: - cursor_byte_position = decode_state.cursor_byte_position + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: + odxassert(not decode_state.cursor_bit_position, + "No bit position can be specified for end-of-pdu fields!") - value = [] - while cursor_byte_position < len(decode_state.coded_message): + result: List[ParameterValue] = [] + while decode_state.cursor_byte_position < len(decode_state.coded_message): # ATTENTION: the ODX specification is very misleading # here: it says that the item is repeated until the end of # the PDU, but it means that DOP of the items that are # repeated are identical, not their values - new_value, cursor_byte_position = self.structure.decode_from_pdu(decode_state) - # Update next byte_position - decode_state.cursor_byte_position = cursor_byte_position - value.append(new_value) + result.append(self.structure.decode_from_pdu(decode_state)) - return value, cursor_byte_position + return result diff --git a/odxtools/multiplexer.py b/odxtools/multiplexer.py index e3140834..d15bf6dd 100644 --- a/odxtools/multiplexer.py +++ b/odxtools/multiplexer.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from xml.etree import ElementTree from .complexdop import ComplexDop @@ -109,51 +109,45 @@ def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state raise EncodeError(f"The case {case_name} is not found in Multiplexer {self.short_name}") - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: # multiplexers are structures and thus the origin position # must be moved to the start of the multiplexer orig_origin = decode_state.origin_byte_position + orig_cursor = decode_state.cursor_byte_position if self.byte_position is not None: - decode_state.origin_byte_position = decode_state.origin_byte_position + self.byte_position - else: - decode_state.origin_byte_position = decode_state.cursor_byte_position + decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position + decode_state.origin_byte_position = decode_state.cursor_byte_position - key_value, key_next_byte = self.switch_key.dop.decode_from_pdu(decode_state) + key_value = self.switch_key.dop.decode_from_pdu(decode_state) if not isinstance(key_value, int): odxraise(f"Multiplexer keys must be integers (is '{type(key_value).__name__}'" f" for multiplexer '{self.short_name}')") - case_found = False - case_next_byte = 0 - case_value = None + case_value: Optional[ParameterValue] = None for case in self.cases or []: lower, upper = self._get_case_limits(case) if lower <= key_value and key_value <= upper: - case_found = True if case._structure: - case_value, case_next_byte = case._structure.decode_from_pdu(decode_state) + case_value = case._structure.decode_from_pdu(decode_state) break - if not case_found and self.default_case is not None: - case_found = True + if case_value is None and self.default_case is not None: if self.default_case._structure: - case_value, case_next_byte = self.default_case._structure.decode_from_pdu( - decode_state) + case_value = self.default_case._structure.decode_from_pdu(decode_state) - if not case_found: - raise DecodeError( - f"Failed to find a matching case in {self.short_name} for value {key_value!r}") + if case_value is None: + odxraise(f"Failed to find a matching case in {self.short_name} for value {key_value!r}", + DecodeError) - mux_value = {case.short_name: cast(ParameterValue, case_value)} - mux_next_byte = decode_state.cursor_byte_position + max( - key_next_byte + self.switch_key.byte_position, case_next_byte + self.byte_position) + mux_value = (case.short_name, case_value) # go back to the original origin decode_state.origin_byte_position = orig_origin + decode_state.cursor_byte_position = max(orig_cursor, decode_state.cursor_byte_position) - return mux_value, mux_next_byte + return mux_value def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: odxlinks = super()._build_odxlinks() diff --git a/odxtools/parameters/codedconstparameter.py b/odxtools/parameters/codedconstparameter.py index 8c3e3c66..a0d3fc1e 100644 --- a/odxtools/parameters/codedconstparameter.py +++ b/odxtools/parameters/codedconstparameter.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: MIT import warnings from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, Optional from ..decodestate import DecodeState from ..diagcodedtype import DiagCodedType @@ -62,7 +62,7 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: return self.diag_coded_type.convert_internal_to_bytes( self.coded_value, encode_state=encode_state, bit_position=bit_position_int) - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[AtomicOdxType, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: # Extract coded values orig_cursor_pos = decode_state.cursor_byte_position if self.byte_position is not None: @@ -85,7 +85,7 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[AtomicOdxType, int decode_state.cursor_byte_position = max(orig_cursor_pos, decode_state.cursor_byte_position) - return coded_val, decode_state.cursor_byte_position + return coded_val @property def _coded_value_str(self) -> str: diff --git a/odxtools/parameters/dynamicparameter.py b/odxtools/parameters/dynamicparameter.py index 70e27fe4..fc5a6fa3 100644 --- a/odxtools/parameters/dynamicparameter.py +++ b/odxtools/parameters/dynamicparameter.py @@ -1,6 +1,5 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import Tuple from ..decodestate import DecodeState from ..encodestate import EncodeState @@ -26,5 +25,5 @@ def is_settable(self) -> bool: def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: raise NotImplementedError("Encoding a DynamicParameter is not implemented yet.") - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: raise NotImplementedError("Decoding a DynamicParameter is not implemented yet.") diff --git a/odxtools/parameters/lengthkeyparameter.py b/odxtools/parameters/lengthkeyparameter.py index 60112893..d38ce49d 100644 --- a/odxtools/parameters/lengthkeyparameter.py +++ b/odxtools/parameters/lengthkeyparameter.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, Tuple +from typing import TYPE_CHECKING, Any, Dict from ..decodestate import DecodeState from ..encodestate import EncodeState @@ -66,12 +66,12 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: def encode_into_pdu(self, encode_state: EncodeState) -> bytes: return super().encode_into_pdu(encode_state) - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: - phys_val, cursor_byte_position = super().decode_from_pdu(decode_state) + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: + phys_val = super().decode_from_pdu(decode_state) if not isinstance(phys_val, int): odxraise(f"The pysical type of length keys must be an integer, " f"(is {type(phys_val).__name__})") decode_state.length_keys[self.short_name] = phys_val - return phys_val, cursor_byte_position + return phys_val diff --git a/odxtools/parameters/matchingrequestparameter.py b/odxtools/parameters/matchingrequestparameter.py index 545e8ad1..32f2896e 100644 --- a/odxtools/parameters/matchingrequestparameter.py +++ b/odxtools/parameters/matchingrequestparameter.py @@ -1,11 +1,12 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import Optional, Tuple +from typing import Optional from ..decodestate import DecodeState +from ..diagcodedtype import DiagCodedType from ..encodestate import EncodeState from ..exceptions import EncodeError -from ..odxtypes import ParameterValue +from ..odxtypes import DataType, ParameterValue from .parameter import Parameter, ParameterType @@ -37,12 +38,20 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: .request_byte_position:self.request_byte_position + self.byte_length] - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: - byte_position = decode_state.cursor_byte_position + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: + orig_cursor = decode_state.cursor_byte_position if self.byte_position is not None: - byte_position = decode_state.origin_byte_position + self.byte_position - bit_position = self.bit_position or 0 - byte_length = (8 * self.byte_length + bit_position + 7) // 8 - val_as_bytes = decode_state.coded_message[byte_position:byte_position + byte_length] + decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position - return val_as_bytes, byte_position + byte_length + result, decode_state.cursor_byte_position = DiagCodedType._extract_internal_value( + coded_message=decode_state.coded_message, + byte_position=decode_state.cursor_byte_position, + bit_position=self.bit_position or 0, + bit_length=self.byte_length * 8, + base_data_type=DataType.A_UINT32, + is_highlow_byte_order=False) + + decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor) + decode_state.cursor_bit_position = 0 + + return result diff --git a/odxtools/parameters/nrcconstparameter.py b/odxtools/parameters/nrcconstparameter.py index 887e7bc5..060bb5e7 100644 --- a/odxtools/parameters/nrcconstparameter.py +++ b/odxtools/parameters/nrcconstparameter.py @@ -1,8 +1,7 @@ # SPDX-License-Identifier: MIT import warnings -from copy import copy from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional from ..decodestate import DecodeState from ..diagcodedtype import DiagCodedType @@ -78,11 +77,11 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: return self.diag_coded_type.convert_internal_to_bytes( coded_value, encode_state, bit_position=bit_position_int) - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[AtomicOdxType, int]: - decode_state = copy(decode_state) - if self.byte_position is not None and self.byte_position != decode_state.cursor_byte_position: - # Update byte position - decode_state.cursor_byte_position = self.byte_position + def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: + orig_cursor = decode_state.cursor_byte_position + if self.byte_position is not None: + # Update cursor position + decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position # Extract coded values decode_state.cursor_bit_position = self.bit_position or 0 @@ -100,7 +99,9 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[AtomicOdxType, int stacklevel=1, ) - return coded_value, decode_state.cursor_byte_position + decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor) + + return coded_value def get_description_of_valid_values(self) -> str: """return a human-understandable description of valid physical values""" diff --git a/odxtools/parameters/parameter.py b/odxtools/parameters/parameter.py index dd5e61dc..e8743492 100644 --- a/odxtools/parameters/parameter.py +++ b/odxtools/parameters/parameter.py @@ -2,7 +2,7 @@ import abc import warnings from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional from ..decodestate import DecodeState from ..element import NamedElement @@ -93,7 +93,7 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: pass @abc.abstractmethod - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: """Decode the parameter value from the coded message. If the parameter does have a byte position property, the coded bytes the parameter covers are extracted diff --git a/odxtools/parameters/parameterwithdop.py b/odxtools/parameters/parameterwithdop.py index bb46d5ba..c15f17cc 100644 --- a/odxtools/parameters/parameterwithdop.py +++ b/odxtools/parameters/parameterwithdop.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, Optional from ..dataobjectproperty import DataObjectProperty from ..decodestate import DecodeState @@ -75,16 +75,16 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: return dop.convert_physical_to_bytes( physical_value, encode_state, bit_position=bit_position_int) - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: - orig_cursor_pos = decode_state.cursor_byte_position - if (pos := getattr(self, "byte_position", None)) is not None: - decode_state.cursor_byte_position = decode_state.origin_byte_position + pos + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: + orig_cursor = decode_state.cursor_byte_position + if self.byte_position is not None: + decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position decode_state.cursor_bit_position = self.bit_position or 0 # Use DOP to decode - phys_val, cursor_byte_position = self.dop.decode_from_pdu(decode_state) + phys_val = self.dop.decode_from_pdu(decode_state) - decode_state.cursor_byte_position = max(orig_cursor_pos, cursor_byte_position) + decode_state.cursor_byte_position = max(orig_cursor, decode_state.cursor_byte_position) - return phys_val, cursor_byte_position + return phys_val diff --git a/odxtools/parameters/physicalconstantparameter.py b/odxtools/parameters/physicalconstantparameter.py index e83833a0..e54184ad 100644 --- a/odxtools/parameters/physicalconstantparameter.py +++ b/odxtools/parameters/physicalconstantparameter.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: MIT import warnings from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, Tuple +from typing import TYPE_CHECKING, Any, Dict from ..dataobjectproperty import DataObjectProperty from ..decodestate import DecodeState @@ -64,18 +64,19 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: return dop.convert_physical_to_bytes( self.physical_constant_value, encode_state, bit_position=bit_position_int) - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: # Decode value - phys_val, cursor_byte_position = super().decode_from_pdu(decode_state) + phys_val = super().decode_from_pdu(decode_state) # Check if decoded value matches expected value if phys_val != self.physical_constant_value: warnings.warn( f"Physical constant parameter does not match! " - f"The parameter {self.short_name} expected physical value {self.physical_constant_value!r} but got {phys_val!r} " - f"at byte position {cursor_byte_position} " + f"The parameter {self.short_name} expected physical value " + f"{self.physical_constant_value!r} but got {phys_val!r} " + f"at byte position {decode_state.cursor_byte_position} " f"in coded message {decode_state.coded_message.hex()}.", DecodeError, stacklevel=1, ) - return phys_val, cursor_byte_position + return phys_val diff --git a/odxtools/parameters/reservedparameter.py b/odxtools/parameters/reservedparameter.py index dafb79f5..70b6338c 100644 --- a/odxtools/parameters/reservedparameter.py +++ b/odxtools/parameters/reservedparameter.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import Optional, Tuple, cast +from typing import Optional, cast from ..decodestate import DecodeState from ..encodestate import EncodeState @@ -31,16 +31,16 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: bit_position_int = self.bit_position if self.bit_position is not None else 0 return (0).to_bytes((self.bit_length + bit_position_int + 7) // 8, "big") - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: - byte_position = ( - self.byte_position - if self.byte_position is not None else decode_state.cursor_byte_position) - abs_bit_position = byte_position * 8 + (self.bit_position or 0) - bit_length = self.bit_length + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: + # move the cursor + orig_cursor = decode_state.cursor_byte_position + if self.byte_position is not None: + decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position - # the cursor points to the first byte which has not been fully - # consumed - cursor_byte_position = (abs_bit_position + bit_length) // 8 + decode_state.cursor_byte_position += ((self.bit_position or 0) + self.bit_length + 7) // 8 + + decode_state.cursor_byte_position = max(orig_cursor, decode_state.cursor_byte_position) + decode_state.cursor_bit_position = 0 # ignore the value of the parameter data - return cast(int, None), cursor_byte_position + return cast(int, None) diff --git a/odxtools/parameters/systemparameter.py b/odxtools/parameters/systemparameter.py index b9433a1e..7c9b8197 100644 --- a/odxtools/parameters/systemparameter.py +++ b/odxtools/parameters/systemparameter.py @@ -1,6 +1,5 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import Tuple from ..decodestate import DecodeState from ..encodestate import EncodeState @@ -28,5 +27,5 @@ def is_settable(self) -> bool: def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: raise NotImplementedError("Encoding a SystemParameter is not implemented yet.") - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: raise NotImplementedError("Decoding a SystemParameter is not implemented yet.") diff --git a/odxtools/parameters/tableentryparameter.py b/odxtools/parameters/tableentryparameter.py index 546140d7..2cdea1f4 100644 --- a/odxtools/parameters/tableentryparameter.py +++ b/odxtools/parameters/tableentryparameter.py @@ -1,6 +1,5 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import Tuple from ..decodestate import DecodeState from ..encodestate import EncodeState @@ -29,5 +28,5 @@ def is_settable(self) -> bool: def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: raise NotImplementedError("Encoding a TableKeyParameter is not implemented yet.") - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: raise NotImplementedError("Decoding a TableKeyParameter is not implemented yet.") diff --git a/odxtools/parameters/tablekeyparameter.py b/odxtools/parameters/tablekeyparameter.py index 96efcb49..2e82984c 100644 --- a/odxtools/parameters/tablekeyparameter.py +++ b/odxtools/parameters/tablekeyparameter.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, Optional from ..decodestate import DecodeState from ..encodestate import EncodeState @@ -135,20 +135,20 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: def encode_into_pdu(self, encode_state: EncodeState) -> bytes: return super().encode_into_pdu(encode_state) - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: - if self.byte_position is not None and self.byte_position != decode_state.cursor_byte_position: - cursor_byte_position = self.byte_position + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: + orig_cursor = decode_state.cursor_byte_position + if self.byte_position is not None: + decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position if self.table_row is not None: # the table row to be used is statically specified -> no # need to decode anything! phys_val = self.table_row.short_name - cursor_byte_position = decode_state.cursor_byte_position else: # Use DOP to decode key_dop = odxrequire(self.table.key_dop) decode_state.cursor_bit_position = self.bit_position or 0 - key_dop_val, cursor_byte_position = key_dop.decode_from_pdu(decode_state) + key_dop_val = key_dop.decode_from_pdu(decode_state) table_row_candidates = [x for x in self.table.table_rows if x.key == key_dop_val] if len(table_row_candidates) == 0: @@ -162,4 +162,6 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, in # update the decode_state's table key decode_state.table_keys[self.short_name] = table_row - return phys_val, cursor_byte_position + decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor) + + return phys_val diff --git a/odxtools/parameters/tablestructparameter.py b/odxtools/parameters/tablestructparameter.py index 6efab431..28972cdc 100644 --- a/odxtools/parameters/tablestructparameter.py +++ b/odxtools/parameters/tablestructparameter.py @@ -1,8 +1,7 @@ # SPDX-License-Identifier: MIT import warnings -from copy import copy from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, cast +from typing import TYPE_CHECKING, Any, Dict, Optional, cast from ..decodestate import DecodeState from ..encodestate import EncodeState @@ -126,11 +125,10 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: def encode_into_pdu(self, encode_state: EncodeState) -> bytes: return super().encode_into_pdu(encode_state) - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: - if self.byte_position is not None and self.byte_position != decode_state.cursor_byte_position: - next_pos = self.byte_position if self.byte_position is not None else 0 - decode_state = copy(decode_state) - decode_state.cursor_byte_position = next_pos + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: + orig_cursor = decode_state.cursor_byte_position + if self.byte_position is not None: + decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position # find the selected table row key_name = self.table_key.short_name @@ -141,17 +139,20 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, in raise odxraise(f"No table key '{key_name}' found when decoding " f"table struct parameter '{str(self.short_name)}'") dummy_val = cast(str, None), cast(int, None) - return dummy_val, decode_state.cursor_byte_position + return dummy_val # Use DOP or structure to decode the value if table_row.dop is not None: dop = table_row.dop - val, i = dop.decode_from_pdu(decode_state) - return (table_row.short_name, val), i + val = dop.decode_from_pdu(decode_state) + decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor) + return (table_row.short_name, val) elif table_row.structure is not None: - val, i = table_row.structure.decode_from_pdu(decode_state) - return (table_row.short_name, val), i + val = table_row.structure.decode_from_pdu(decode_state) + decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor) + return (table_row.short_name, val) else: # the table row associated with the key neither defines a # DOP nor a structure -> ignore it - return (table_row.short_name, cast(int, None)), decode_state.cursor_byte_position + decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor) + return (table_row.short_name, cast(int, None)) diff --git a/tests/test_decoding.py b/tests/test_decoding.py index 0c7361be..459fbd7f 100644 --- a/tests/test_decoding.py +++ b/tests/test_decoding.py @@ -1242,7 +1242,7 @@ def test_decode_response(self) -> None: coding_object=message, param_dict={ "SID": sid, - "matching_req_param": bytes([0xAB]) + "matching_req_param": 0xAB }, ) decoded_message = diag_layer.decode(coded_message)[0] diff --git a/tests/test_diag_coded_types.py b/tests/test_diag_coded_types.py index e256b559..02151455 100644 --- a/tests/test_diag_coded_types.py +++ b/tests/test_diag_coded_types.py @@ -52,7 +52,8 @@ def test_decode_leading_length_info_type_bytefield(self) -> None: ) # 0xC2 = 0b11000010, with bit_position=1 and bit_lenth=5, the extracted bits are 00001, # i.e. the leading length is 1, i.e. only the byte 0x3 should be extracted. - state = DecodeState(bytes([0x1, 0xC2, 0x3, 0x4]), cursor_byte_position=1, cursor_bit_position=1) + state = DecodeState( + bytes([0x1, 0xC2, 0x3, 0x4]), cursor_byte_position=1, cursor_bit_position=1) internal_value = dct.decode_from_pdu(state) self.assertEqual(internal_value, bytes([0x3])) diff --git a/tests/test_somersault.py b/tests/test_somersault.py index 5397dfd7..0da49eae 100644 --- a/tests/test_somersault.py +++ b/tests/test_somersault.py @@ -259,7 +259,7 @@ def test_code_table_params(self) -> None: self.assertEqual( decoded_resp_data["last_pos_response"][1] # type: ignore[index, call-overload] ["num_flips_done"], # type: ignore[index, call-overload] - bytes([123])) + 123) self.assertEqual( decoded_resp_data["last_pos_response"][1] # type: ignore[index, call-overload] ["sault_time"], # type: ignore[index, call-overload] @@ -352,11 +352,7 @@ def test_decode_response(self) -> None: m = messages[0] self.assertEqual(m.coded_message.hex(), "fa03ff") self.assertEqual(m.coding_object, pos_response) - self.assertEqual(m.param_dict, { - "sid": 0xFA, - "num_flips_done": bytearray([0x03]), - "sault_time": 255 - }) + self.assertEqual(m.param_dict, {"sid": 0xFA, "num_flips_done": 0x03, "sault_time": 255}) class TestNavigation(unittest.TestCase): From 6b49fbb19408e13888cf9ff06606b3e4fc391785 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Tue, 13 Feb 2024 22:25:27 +0100 Subject: [PATCH 3/3] consider review comments thanks goes to [at]kayoub5. Signed-off-by: Andreas Lauser Signed-off-by: Florian Jost --- odxtools/basicstructure.py | 7 +++--- odxtools/multiplexer.py | 23 +++++++++---------- .../parameters/matchingrequestparameter.py | 15 +++++------- tests/test_decoding.py | 2 +- tests/test_somersault.py | 8 +++++-- 5 files changed, 27 insertions(+), 28 deletions(-) diff --git a/odxtools/basicstructure.py b/odxtools/basicstructure.py index b4881dc6..34b3ec65 100644 --- a/odxtools/basicstructure.py +++ b/odxtools/basicstructure.py @@ -262,12 +262,11 @@ def decode(self, message: bytes) -> ParameterValueDict: param_values = self.decode_from_pdu(decode_state) if len(message) != decode_state.cursor_byte_position: - warnings.warn( + odxraise( f"The message {message.hex()} probably could not be completely parsed:" f" Expected length of {decode_state.cursor_byte_position} but got {len(message)}.", - DecodeError, - stacklevel=1, - ) + DecodeError) + return {} if not isinstance(param_values, dict): odxraise("Decoding structures must result in a dictionary") diff --git a/odxtools/multiplexer.py b/odxtools/multiplexer.py index d15bf6dd..5829a5ef 100644 --- a/odxtools/multiplexer.py +++ b/odxtools/multiplexer.py @@ -11,7 +11,7 @@ from .multiplexerdefaultcase import MultiplexerDefaultCase from .multiplexerswitchkey import MultiplexerSwitchKey from .odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId -from .odxtypes import ParameterValue, odxstr_to_bool +from .odxtypes import AtomicOdxType, ParameterValue, odxstr_to_bool from .utils import dataclass_fields_asdict if TYPE_CHECKING: @@ -64,14 +64,13 @@ def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) -> def is_visible(self) -> bool: return self.is_visible_raw is True - def _get_case_limits(self, case: MultiplexerCase) -> Tuple[int, int]: + def _get_case_limits(self, case: MultiplexerCase) -> Tuple[AtomicOdxType, AtomicOdxType]: key_type = self.switch_key.dop.physical_type.base_data_type lower_limit = key_type.make_from(case.lower_limit) upper_limit = key_type.make_from(case.upper_limit) - if not isinstance(lower_limit, int): - odxraise("Bounds of limits must be integers") - if not isinstance(upper_limit, int): - odxraise("Bounds of limits must be integers") + if not isinstance(lower_limit, type(upper_limit)) and not isinstance( + upper_limit, type(lower_limit)): + odxraise("Upper and lower bounds of limits must compareable") return lower_limit, upper_limit def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state: EncodeState, @@ -88,15 +87,15 @@ def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state case_name, case_value = next(iter(physical_value.items())) case_pos = self.byte_position - for case in self.cases or []: - if case.short_name == case_name: - if case._structure: - case_bytes = case._structure.convert_physical_to_bytes( + for mux_case in self.cases or []: + if mux_case.short_name == case_name: + if mux_case._structure: + case_bytes = mux_case._structure.convert_physical_to_bytes( case_value, encode_state, 0) else: case_bytes = b'' - key_value, _ = self._get_case_limits(case) + key_value, _ = self._get_case_limits(mux_case) key_bytes = self.switch_key.dop.convert_physical_to_bytes( key_value, encode_state, bit_position=self.switch_key.bit_position or 0) @@ -128,7 +127,7 @@ def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: case_value: Optional[ParameterValue] = None for case in self.cases or []: lower, upper = self._get_case_limits(case) - if lower <= key_value and key_value <= upper: + if lower <= key_value and key_value <= upper: # type: ignore[operator] if case._structure: case_value = case._structure.decode_from_pdu(decode_state) break diff --git a/odxtools/parameters/matchingrequestparameter.py b/odxtools/parameters/matchingrequestparameter.py index 32f2896e..a9edfbe4 100644 --- a/odxtools/parameters/matchingrequestparameter.py +++ b/odxtools/parameters/matchingrequestparameter.py @@ -3,10 +3,9 @@ from typing import Optional from ..decodestate import DecodeState -from ..diagcodedtype import DiagCodedType from ..encodestate import EncodeState from ..exceptions import EncodeError -from ..odxtypes import DataType, ParameterValue +from ..odxtypes import ParameterValue from .parameter import Parameter, ParameterType @@ -43,13 +42,11 @@ def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: if self.byte_position is not None: decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position - result, decode_state.cursor_byte_position = DiagCodedType._extract_internal_value( - coded_message=decode_state.coded_message, - byte_position=decode_state.cursor_byte_position, - bit_position=self.bit_position or 0, - bit_length=self.byte_length * 8, - base_data_type=DataType.A_UINT32, - is_highlow_byte_order=False) + byte_position = decode_state.cursor_byte_position + bit_position = self.bit_position or 0 + byte_length = (8 * self.byte_length + bit_position + 7) // 8 + result = decode_state.coded_message[byte_position:byte_position + byte_length] + decode_state.cursor_byte_position += byte_length decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor) decode_state.cursor_bit_position = 0 diff --git a/tests/test_decoding.py b/tests/test_decoding.py index 459fbd7f..0c7361be 100644 --- a/tests/test_decoding.py +++ b/tests/test_decoding.py @@ -1242,7 +1242,7 @@ def test_decode_response(self) -> None: coding_object=message, param_dict={ "SID": sid, - "matching_req_param": 0xAB + "matching_req_param": bytes([0xAB]) }, ) decoded_message = diag_layer.decode(coded_message)[0] diff --git a/tests/test_somersault.py b/tests/test_somersault.py index 0da49eae..5397dfd7 100644 --- a/tests/test_somersault.py +++ b/tests/test_somersault.py @@ -259,7 +259,7 @@ def test_code_table_params(self) -> None: self.assertEqual( decoded_resp_data["last_pos_response"][1] # type: ignore[index, call-overload] ["num_flips_done"], # type: ignore[index, call-overload] - 123) + bytes([123])) self.assertEqual( decoded_resp_data["last_pos_response"][1] # type: ignore[index, call-overload] ["sault_time"], # type: ignore[index, call-overload] @@ -352,7 +352,11 @@ def test_decode_response(self) -> None: m = messages[0] self.assertEqual(m.coded_message.hex(), "fa03ff") self.assertEqual(m.coding_object, pos_response) - self.assertEqual(m.param_dict, {"sid": 0xFA, "num_flips_done": 0x03, "sault_time": 255}) + self.assertEqual(m.param_dict, { + "sid": 0xFA, + "num_flips_done": bytearray([0x03]), + "sault_time": 255 + }) class TestNavigation(unittest.TestCase):