diff --git a/odxtools/basicstructure.py b/odxtools/basicstructure.py index 5b81a0dd..34b3ec65 100644 --- a/odxtools/basicstructure.py +++ b/odxtools/basicstructure.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: MIT import warnings from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast from xml.etree import ElementTree from .complexdop import ComplexDop @@ -225,13 +225,7 @@ def convert_physical_to_bytes(self, is_end_of_pdu=encode_state.is_end_of_pdu, ) - def convert_bytes_to_physical(self, - decode_state: DecodeState, - bit_position: int = 0) -> Tuple[ParameterValue, int]: - if bit_position != 0: - raise DecodeError("Structures must be aligned, i.e. bit_position=0, but " - f"{self.short_name} was passed the bit position {bit_position}") - + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: # move the origin since positions specified by sub-parameters of # structures are relative to the beginning of the structure object. orig_origin = decode_state.origin_byte_position @@ -239,16 +233,14 @@ def convert_bytes_to_physical(self, result = {} for param in self.parameters: - value, cursor_byte_position = param.decode_from_pdu(decode_state) + value = param.decode_from_pdu(decode_state) result[param.short_name] = value - decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, - cursor_byte_position) - # decoding of the structure finished. move back the origin. + # decoding of the structure finished. go back the original origin. decode_state.origin_byte_position = orig_origin - return result, decode_state.cursor_byte_position + return result def encode(self, coded_request: Optional[bytes] = None, **params: ParameterValue) -> bytes: """ @@ -266,20 +258,20 @@ def encode(self, coded_request: Optional[bytes] = None, **params: ParameterValue is_end_of_pdu=True) def decode(self, message: bytes) -> ParameterValueDict: - # dummy decode state to be passed to convert_bytes_to_physical decode_state = DecodeState(coded_message=message) - param_values, cursor_byte_position = self.convert_bytes_to_physical(decode_state) + param_values = self.decode_from_pdu(decode_state) + + if len(message) != decode_state.cursor_byte_position: + odxraise( + f"The message {message.hex()} probably could not be completely parsed:" + f" Expected length of {decode_state.cursor_byte_position} but got {len(message)}.", + DecodeError) + return {} + if not isinstance(param_values, dict): - odxraise(f"Decoding a structure must result in a dictionary of parameter " - f"values (is {type(param_values)})") - if len(message) != cursor_byte_position: - warnings.warn( - f"The message {message.hex()} is longer than could be parsed." - f" Expected {cursor_byte_position} but got {len(message)}.", - DecodeError, - stacklevel=1, - ) - return param_values + odxraise("Decoding structures must result in a dictionary") + + return cast(ParameterValueDict, param_values) def parameter_dict(self) -> ParameterDict: """ diff --git a/odxtools/dataobjectproperty.py b/odxtools/dataobjectproperty.py index 5927cb93..118a9355 100644 --- a/odxtools/dataobjectproperty.py +++ b/odxtools/dataobjectproperty.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast +from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast from xml.etree import ElementTree from .compumethods.compumethod import CompuMethod @@ -133,22 +133,16 @@ def convert_physical_to_bytes(self, physical_value: Any, encode_state: EncodeSta return self.diag_coded_type.convert_internal_to_bytes( internal_val, encode_state, bit_position=bit_position) - def convert_bytes_to_physical(self, - decode_state: DecodeState, - bit_position: int = 0) -> Tuple[Any, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: """ Convert the internal representation of a value into its physical value. Returns a (physical_value, start_position_of_next_parameter) tuple. """ - odxassert(0 <= bit_position and bit_position < 8) - - decode_state.cursor_bit_position = bit_position internal = self.diag_coded_type.decode_from_pdu(decode_state) if self.compu_method.is_valid_internal_value(internal): - return self.compu_method.convert_internal_to_physical( - internal), decode_state.cursor_byte_position + return self.compu_method.convert_internal_to_physical(internal) else: # TODO: How to prevent this? raise DecodeError( diff --git a/odxtools/dopbase.py b/odxtools/dopbase.py index ff04cb10..5e019c7b 100644 --- a/odxtools/dopbase.py +++ b/odxtools/dopbase.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional from xml.etree import ElementTree from .admindata import AdminData @@ -69,8 +69,6 @@ def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state """Convert the physical value into bytes.""" raise NotImplementedError - def convert_bytes_to_physical(self, - decode_state: DecodeState, - bit_position: int = 0) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: """Extract the bytes from the PDU and convert them to the physical value.""" raise NotImplementedError diff --git a/odxtools/dtcdop.py b/odxtools/dtcdop.py index a8aff015..39bca96a 100644 --- a/odxtools/dtcdop.py +++ b/odxtools/dtcdop.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: MIT # from dataclasses import dataclass, field from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast from xml.etree import ElementTree from .compumethods.compumethod import CompuMethod @@ -87,11 +87,8 @@ def is_visible(self) -> bool: def linked_dtc_dops(self) -> NamedItemList["DtcDop"]: return self._linked_dtc_dops - def convert_bytes_to_physical(self, - decode_state: DecodeState, - bit_position: int = 0) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: - decode_state.cursor_bit_position = bit_position int_trouble_code = self.diag_coded_type.decode_from_pdu(decode_state) if self.compu_method.is_valid_internal_value(int_trouble_code): @@ -110,7 +107,7 @@ def convert_bytes_to_physical(self, if len(dtcs) == 1: # we found exactly one described DTC - return dtcs[0], decode_state.cursor_byte_position + return dtcs[0] # the DTC was not specified. This probably means that the # diagnostic description file is incomplete. We do not bail @@ -129,7 +126,7 @@ def convert_bytes_to_physical(self, sdgs=[], ) - return dtc, decode_state.cursor_byte_position + return dtc def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state: EncodeState, bit_position: int) -> bytes: diff --git a/odxtools/dynamiclengthfield.py b/odxtools/dynamiclengthfield.py index 44d0b752..48c39e40 100644 --- a/odxtools/dynamiclengthfield.py +++ b/odxtools/dynamiclengthfield.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Tuple +from typing import TYPE_CHECKING, Any, Dict, List from xml.etree import ElementTree from .decodestate import DecodeState @@ -55,7 +55,5 @@ def convert_physical_to_bytes( ) -> bytes: raise NotImplementedError() - def convert_bytes_to_physical(self, - decode_state: DecodeState, - bit_position: int = 0) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: raise NotImplementedError() diff --git a/odxtools/endofpdufield.py b/odxtools/endofpdufield.py index 27694d73..36527e6a 100644 --- a/odxtools/endofpdufield.py +++ b/odxtools/endofpdufield.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import List, Optional, Tuple +from typing import List, Optional from xml.etree import ElementTree from .decodestate import DecodeState @@ -58,21 +58,16 @@ def convert_physical_to_bytes( coded_message += self.structure.convert_physical_to_bytes(value, encode_state) return coded_message - def convert_bytes_to_physical(self, - decode_state: DecodeState, - bit_position: int = 0) -> Tuple[ParameterValue, int]: - cursor_byte_position = decode_state.cursor_byte_position + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: + odxassert(not decode_state.cursor_bit_position, + "No bit position can be specified for end-of-pdu fields!") - value = [] - while cursor_byte_position < len(decode_state.coded_message): + result: List[ParameterValue] = [] + while decode_state.cursor_byte_position < len(decode_state.coded_message): # ATTENTION: the ODX specification is very misleading # here: it says that the item is repeated until the end of # the PDU, but it means that DOP of the items that are # repeated are identical, not their values - new_value, cursor_byte_position = self.structure.convert_bytes_to_physical( - decode_state, bit_position=0) - # Update next byte_position - decode_state.cursor_byte_position = cursor_byte_position - value.append(new_value) + result.append(self.structure.decode_from_pdu(decode_state)) - return value, cursor_byte_position + return result diff --git a/odxtools/minmaxlengthtype.py b/odxtools/minmaxlengthtype.py index 83cdd565..ba2af8ad 100644 --- a/odxtools/minmaxlengthtype.py +++ b/odxtools/minmaxlengthtype.py @@ -100,7 +100,9 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state: return value_bytes - def decode_from_pdu(self, decode_state: DecodeState, bit_position: int = 0) -> AtomicOdxType: + def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: + odxassert(decode_state.cursor_bit_position == 0, + "No bit position can be specified for MIN-MAX-LENGTH-TYPE values.") if decode_state.cursor_byte_position + self.min_length > len(decode_state.coded_message): raise DecodeError("The PDU ended before minimum length was reached.") diff --git a/odxtools/multiplexer.py b/odxtools/multiplexer.py index dd007e7d..5829a5ef 100644 --- a/odxtools/multiplexer.py +++ b/odxtools/multiplexer.py @@ -1,13 +1,12 @@ # SPDX-License-Identifier: MIT -from copy import copy from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from xml.etree import ElementTree from .complexdop import ComplexDop from .decodestate import DecodeState from .encodestate import EncodeState -from .exceptions import DecodeError, EncodeError, odxrequire +from .exceptions import DecodeError, EncodeError, odxraise, odxrequire from .multiplexercase import MultiplexerCase from .multiplexerdefaultcase import MultiplexerDefaultCase from .multiplexerswitchkey import MultiplexerSwitchKey @@ -69,6 +68,9 @@ def _get_case_limits(self, case: MultiplexerCase) -> Tuple[AtomicOdxType, Atomic key_type = self.switch_key.dop.physical_type.base_data_type lower_limit = key_type.make_from(case.lower_limit) upper_limit = key_type.make_from(case.upper_limit) + if not isinstance(lower_limit, type(upper_limit)) and not isinstance( + upper_limit, type(lower_limit)): + odxraise("Upper and lower bounds of limits must compareable") return lower_limit, upper_limit def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state: EncodeState, @@ -85,15 +87,15 @@ def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state case_name, case_value = next(iter(physical_value.items())) case_pos = self.byte_position - for case in self.cases or []: - if case.short_name == case_name: - if case._structure: - case_bytes = case._structure.convert_physical_to_bytes( + for mux_case in self.cases or []: + if mux_case.short_name == case_name: + if mux_case._structure: + case_bytes = mux_case._structure.convert_physical_to_bytes( case_value, encode_state, 0) else: case_bytes = b'' - key_value, _ = self._get_case_limits(case) + key_value, _ = self._get_case_limits(mux_case) key_bytes = self.switch_key.dop.convert_physical_to_bytes( key_value, encode_state, bit_position=self.switch_key.bit_position or 0) @@ -106,47 +108,45 @@ def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state raise EncodeError(f"The case {case_name} is not found in Multiplexer {self.short_name}") - def convert_bytes_to_physical(self, - decode_state: DecodeState, - bit_position: int = 0) -> Tuple[ParameterValue, int]: - - if bit_position != 0: - raise DecodeError("Multiplexers must be byte-aligned, i.e. bit_position=0, but " - f"{self.short_name} was passed the bit position {bit_position}") - key_value, key_next_byte = self.switch_key.dop.convert_bytes_to_physical(decode_state) + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: - case_decode_state = copy(decode_state) + # multiplexers are structures and thus the origin position + # must be moved to the start of the multiplexer + orig_origin = decode_state.origin_byte_position + orig_cursor = decode_state.cursor_byte_position if self.byte_position is not None: - case_decode_state.origin_byte_position = decode_state.origin_byte_position + self.byte_position - else: - case_decode_state.origin_byte_position = decode_state.cursor_byte_position + decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position + decode_state.origin_byte_position = decode_state.cursor_byte_position - case_found = False - case_next_byte = 0 - case_value = None + key_value = self.switch_key.dop.decode_from_pdu(decode_state) + + if not isinstance(key_value, int): + odxraise(f"Multiplexer keys must be integers (is '{type(key_value).__name__}'" + f" for multiplexer '{self.short_name}')") + + case_value: Optional[ParameterValue] = None for case in self.cases or []: lower, upper = self._get_case_limits(case) - if lower <= key_value and key_value <= upper: - case_found = True + if lower <= key_value and key_value <= upper: # type: ignore[operator] if case._structure: - case_value, case_next_byte = case._structure.convert_bytes_to_physical( - case_decode_state) + case_value = case._structure.decode_from_pdu(decode_state) break - if not case_found and self.default_case is not None: - case_found = True + if case_value is None and self.default_case is not None: if self.default_case._structure: - case_value, case_next_byte = self.default_case._structure.convert_bytes_to_physical( - case_decode_state) + case_value = self.default_case._structure.decode_from_pdu(decode_state) + + if case_value is None: + odxraise(f"Failed to find a matching case in {self.short_name} for value {key_value!r}", + DecodeError) + + mux_value = (case.short_name, case_value) - if not case_found: - raise DecodeError( - f"Failed to find a matching case in {self.short_name} for value {key_value!r}") + # go back to the original origin + decode_state.origin_byte_position = orig_origin + decode_state.cursor_byte_position = max(orig_cursor, decode_state.cursor_byte_position) - mux_value = {case.short_name: cast(ParameterValue, case_value)} - mux_next_byte = decode_state.cursor_byte_position + max( - key_next_byte + self.switch_key.byte_position, case_next_byte + self.byte_position) - return mux_value, mux_next_byte + return mux_value def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: odxlinks = super()._build_odxlinks() diff --git a/odxtools/parameters/codedconstparameter.py b/odxtools/parameters/codedconstparameter.py index 8c3e3c66..a0d3fc1e 100644 --- a/odxtools/parameters/codedconstparameter.py +++ b/odxtools/parameters/codedconstparameter.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: MIT import warnings from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, Optional from ..decodestate import DecodeState from ..diagcodedtype import DiagCodedType @@ -62,7 +62,7 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: return self.diag_coded_type.convert_internal_to_bytes( self.coded_value, encode_state=encode_state, bit_position=bit_position_int) - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[AtomicOdxType, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: # Extract coded values orig_cursor_pos = decode_state.cursor_byte_position if self.byte_position is not None: @@ -85,7 +85,7 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[AtomicOdxType, int decode_state.cursor_byte_position = max(orig_cursor_pos, decode_state.cursor_byte_position) - return coded_val, decode_state.cursor_byte_position + return coded_val @property def _coded_value_str(self) -> str: diff --git a/odxtools/parameters/dynamicparameter.py b/odxtools/parameters/dynamicparameter.py index 70e27fe4..fc5a6fa3 100644 --- a/odxtools/parameters/dynamicparameter.py +++ b/odxtools/parameters/dynamicparameter.py @@ -1,6 +1,5 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import Tuple from ..decodestate import DecodeState from ..encodestate import EncodeState @@ -26,5 +25,5 @@ def is_settable(self) -> bool: def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: raise NotImplementedError("Encoding a DynamicParameter is not implemented yet.") - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: raise NotImplementedError("Decoding a DynamicParameter is not implemented yet.") diff --git a/odxtools/parameters/lengthkeyparameter.py b/odxtools/parameters/lengthkeyparameter.py index 60112893..d38ce49d 100644 --- a/odxtools/parameters/lengthkeyparameter.py +++ b/odxtools/parameters/lengthkeyparameter.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, Tuple +from typing import TYPE_CHECKING, Any, Dict from ..decodestate import DecodeState from ..encodestate import EncodeState @@ -66,12 +66,12 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: def encode_into_pdu(self, encode_state: EncodeState) -> bytes: return super().encode_into_pdu(encode_state) - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: - phys_val, cursor_byte_position = super().decode_from_pdu(decode_state) + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: + phys_val = super().decode_from_pdu(decode_state) if not isinstance(phys_val, int): odxraise(f"The pysical type of length keys must be an integer, " f"(is {type(phys_val).__name__})") decode_state.length_keys[self.short_name] = phys_val - return phys_val, cursor_byte_position + return phys_val diff --git a/odxtools/parameters/matchingrequestparameter.py b/odxtools/parameters/matchingrequestparameter.py index 545e8ad1..a9edfbe4 100644 --- a/odxtools/parameters/matchingrequestparameter.py +++ b/odxtools/parameters/matchingrequestparameter.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import Optional, Tuple +from typing import Optional from ..decodestate import DecodeState from ..encodestate import EncodeState @@ -37,12 +37,18 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: .request_byte_position:self.request_byte_position + self.byte_length] - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: - byte_position = decode_state.cursor_byte_position + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: + orig_cursor = decode_state.cursor_byte_position if self.byte_position is not None: - byte_position = decode_state.origin_byte_position + self.byte_position + decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position + + byte_position = decode_state.cursor_byte_position bit_position = self.bit_position or 0 byte_length = (8 * self.byte_length + bit_position + 7) // 8 - val_as_bytes = decode_state.coded_message[byte_position:byte_position + byte_length] + result = decode_state.coded_message[byte_position:byte_position + byte_length] + decode_state.cursor_byte_position += byte_length + + decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor) + decode_state.cursor_bit_position = 0 - return val_as_bytes, byte_position + byte_length + return result diff --git a/odxtools/parameters/nrcconstparameter.py b/odxtools/parameters/nrcconstparameter.py index 887e7bc5..060bb5e7 100644 --- a/odxtools/parameters/nrcconstparameter.py +++ b/odxtools/parameters/nrcconstparameter.py @@ -1,8 +1,7 @@ # SPDX-License-Identifier: MIT import warnings -from copy import copy from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional from ..decodestate import DecodeState from ..diagcodedtype import DiagCodedType @@ -78,11 +77,11 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: return self.diag_coded_type.convert_internal_to_bytes( coded_value, encode_state, bit_position=bit_position_int) - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[AtomicOdxType, int]: - decode_state = copy(decode_state) - if self.byte_position is not None and self.byte_position != decode_state.cursor_byte_position: - # Update byte position - decode_state.cursor_byte_position = self.byte_position + def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: + orig_cursor = decode_state.cursor_byte_position + if self.byte_position is not None: + # Update cursor position + decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position # Extract coded values decode_state.cursor_bit_position = self.bit_position or 0 @@ -100,7 +99,9 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[AtomicOdxType, int stacklevel=1, ) - return coded_value, decode_state.cursor_byte_position + decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor) + + return coded_value def get_description_of_valid_values(self) -> str: """return a human-understandable description of valid physical values""" diff --git a/odxtools/parameters/parameter.py b/odxtools/parameters/parameter.py index dd5e61dc..e8743492 100644 --- a/odxtools/parameters/parameter.py +++ b/odxtools/parameters/parameter.py @@ -2,7 +2,7 @@ import abc import warnings from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional from ..decodestate import DecodeState from ..element import NamedElement @@ -93,7 +93,7 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: pass @abc.abstractmethod - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: """Decode the parameter value from the coded message. If the parameter does have a byte position property, the coded bytes the parameter covers are extracted diff --git a/odxtools/parameters/parameterwithdop.py b/odxtools/parameters/parameterwithdop.py index 1423274f..c15f17cc 100644 --- a/odxtools/parameters/parameterwithdop.py +++ b/odxtools/parameters/parameterwithdop.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, Optional from ..dataobjectproperty import DataObjectProperty from ..decodestate import DecodeState @@ -75,17 +75,16 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: return dop.convert_physical_to_bytes( physical_value, encode_state, bit_position=bit_position_int) - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: - orig_cursor_pos = decode_state.cursor_byte_position - if (pos := getattr(self, "byte_position", None)) is not None: - decode_state.cursor_byte_position = decode_state.origin_byte_position + pos + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: + orig_cursor = decode_state.cursor_byte_position + if self.byte_position is not None: + decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position - bit_position = self.bit_position or 0 + decode_state.cursor_bit_position = self.bit_position or 0 # Use DOP to decode - phys_val, cursor_byte_position = self.dop.convert_bytes_to_physical( - decode_state, bit_position=bit_position) + phys_val = self.dop.decode_from_pdu(decode_state) - decode_state.cursor_byte_position = max(orig_cursor_pos, cursor_byte_position) + decode_state.cursor_byte_position = max(orig_cursor, decode_state.cursor_byte_position) - return phys_val, cursor_byte_position + return phys_val diff --git a/odxtools/parameters/physicalconstantparameter.py b/odxtools/parameters/physicalconstantparameter.py index e83833a0..e54184ad 100644 --- a/odxtools/parameters/physicalconstantparameter.py +++ b/odxtools/parameters/physicalconstantparameter.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: MIT import warnings from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, Tuple +from typing import TYPE_CHECKING, Any, Dict from ..dataobjectproperty import DataObjectProperty from ..decodestate import DecodeState @@ -64,18 +64,19 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: return dop.convert_physical_to_bytes( self.physical_constant_value, encode_state, bit_position=bit_position_int) - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: # Decode value - phys_val, cursor_byte_position = super().decode_from_pdu(decode_state) + phys_val = super().decode_from_pdu(decode_state) # Check if decoded value matches expected value if phys_val != self.physical_constant_value: warnings.warn( f"Physical constant parameter does not match! " - f"The parameter {self.short_name} expected physical value {self.physical_constant_value!r} but got {phys_val!r} " - f"at byte position {cursor_byte_position} " + f"The parameter {self.short_name} expected physical value " + f"{self.physical_constant_value!r} but got {phys_val!r} " + f"at byte position {decode_state.cursor_byte_position} " f"in coded message {decode_state.coded_message.hex()}.", DecodeError, stacklevel=1, ) - return phys_val, cursor_byte_position + return phys_val diff --git a/odxtools/parameters/reservedparameter.py b/odxtools/parameters/reservedparameter.py index dafb79f5..70b6338c 100644 --- a/odxtools/parameters/reservedparameter.py +++ b/odxtools/parameters/reservedparameter.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import Optional, Tuple, cast +from typing import Optional, cast from ..decodestate import DecodeState from ..encodestate import EncodeState @@ -31,16 +31,16 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: bit_position_int = self.bit_position if self.bit_position is not None else 0 return (0).to_bytes((self.bit_length + bit_position_int + 7) // 8, "big") - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: - byte_position = ( - self.byte_position - if self.byte_position is not None else decode_state.cursor_byte_position) - abs_bit_position = byte_position * 8 + (self.bit_position or 0) - bit_length = self.bit_length + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: + # move the cursor + orig_cursor = decode_state.cursor_byte_position + if self.byte_position is not None: + decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position - # the cursor points to the first byte which has not been fully - # consumed - cursor_byte_position = (abs_bit_position + bit_length) // 8 + decode_state.cursor_byte_position += ((self.bit_position or 0) + self.bit_length + 7) // 8 + + decode_state.cursor_byte_position = max(orig_cursor, decode_state.cursor_byte_position) + decode_state.cursor_bit_position = 0 # ignore the value of the parameter data - return cast(int, None), cursor_byte_position + return cast(int, None) diff --git a/odxtools/parameters/systemparameter.py b/odxtools/parameters/systemparameter.py index b9433a1e..7c9b8197 100644 --- a/odxtools/parameters/systemparameter.py +++ b/odxtools/parameters/systemparameter.py @@ -1,6 +1,5 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import Tuple from ..decodestate import DecodeState from ..encodestate import EncodeState @@ -28,5 +27,5 @@ def is_settable(self) -> bool: def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: raise NotImplementedError("Encoding a SystemParameter is not implemented yet.") - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: raise NotImplementedError("Decoding a SystemParameter is not implemented yet.") diff --git a/odxtools/parameters/tableentryparameter.py b/odxtools/parameters/tableentryparameter.py index 546140d7..2cdea1f4 100644 --- a/odxtools/parameters/tableentryparameter.py +++ b/odxtools/parameters/tableentryparameter.py @@ -1,6 +1,5 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import Tuple from ..decodestate import DecodeState from ..encodestate import EncodeState @@ -29,5 +28,5 @@ def is_settable(self) -> bool: def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: raise NotImplementedError("Encoding a TableKeyParameter is not implemented yet.") - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: raise NotImplementedError("Decoding a TableKeyParameter is not implemented yet.") diff --git a/odxtools/parameters/tablekeyparameter.py b/odxtools/parameters/tablekeyparameter.py index 8f410599..2e82984c 100644 --- a/odxtools/parameters/tablekeyparameter.py +++ b/odxtools/parameters/tablekeyparameter.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, Optional from ..decodestate import DecodeState from ..encodestate import EncodeState @@ -135,21 +135,20 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: def encode_into_pdu(self, encode_state: EncodeState) -> bytes: return super().encode_into_pdu(encode_state) - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: - if self.byte_position is not None and self.byte_position != decode_state.cursor_byte_position: - cursor_byte_position = self.byte_position + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: + orig_cursor = decode_state.cursor_byte_position + if self.byte_position is not None: + decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position if self.table_row is not None: # the table row to be used is statically specified -> no # need to decode anything! phys_val = self.table_row.short_name - cursor_byte_position = decode_state.cursor_byte_position else: # Use DOP to decode key_dop = odxrequire(self.table.key_dop) - bit_position_int = self.bit_position if self.bit_position is not None else 0 - key_dop_val, cursor_byte_position = key_dop.convert_bytes_to_physical( - decode_state, bit_position=bit_position_int) + decode_state.cursor_bit_position = self.bit_position or 0 + key_dop_val = key_dop.decode_from_pdu(decode_state) table_row_candidates = [x for x in self.table.table_rows if x.key == key_dop_val] if len(table_row_candidates) == 0: @@ -163,4 +162,6 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, in # update the decode_state's table key decode_state.table_keys[self.short_name] = table_row - return phys_val, cursor_byte_position + decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor) + + return phys_val diff --git a/odxtools/parameters/tablestructparameter.py b/odxtools/parameters/tablestructparameter.py index 2539401b..28972cdc 100644 --- a/odxtools/parameters/tablestructparameter.py +++ b/odxtools/parameters/tablestructparameter.py @@ -1,8 +1,7 @@ # SPDX-License-Identifier: MIT import warnings -from copy import copy from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, cast +from typing import TYPE_CHECKING, Any, Dict, Optional, cast from ..decodestate import DecodeState from ..encodestate import EncodeState @@ -126,11 +125,10 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: def encode_into_pdu(self, encode_state: EncodeState) -> bytes: return super().encode_into_pdu(encode_state) - def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]: - if self.byte_position is not None and self.byte_position != decode_state.cursor_byte_position: - next_pos = self.byte_position if self.byte_position is not None else 0 - decode_state = copy(decode_state) - decode_state.cursor_byte_position = next_pos + def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: + orig_cursor = decode_state.cursor_byte_position + if self.byte_position is not None: + decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position # find the selected table row key_name = self.table_key.short_name @@ -141,17 +139,20 @@ def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, in raise odxraise(f"No table key '{key_name}' found when decoding " f"table struct parameter '{str(self.short_name)}'") dummy_val = cast(str, None), cast(int, None) - return dummy_val, decode_state.cursor_byte_position + return dummy_val # Use DOP or structure to decode the value if table_row.dop is not None: dop = table_row.dop - val, i = dop.convert_bytes_to_physical(decode_state) - return (table_row.short_name, val), i + val = dop.decode_from_pdu(decode_state) + decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor) + return (table_row.short_name, val) elif table_row.structure is not None: - val, i = table_row.structure.convert_bytes_to_physical(decode_state) - return (table_row.short_name, val), i + val = table_row.structure.decode_from_pdu(decode_state) + decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor) + return (table_row.short_name, val) else: # the table row associated with the key neither defines a # DOP nor a structure -> ignore it - return (table_row.short_name, cast(int, None)), decode_state.cursor_byte_position + decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor) + return (table_row.short_name, cast(int, None)) diff --git a/odxtools/paramlengthinfotype.py b/odxtools/paramlengthinfotype.py index 7351490f..3819e314 100644 --- a/odxtools/paramlengthinfotype.py +++ b/odxtools/paramlengthinfotype.py @@ -74,7 +74,7 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state: is_highlow_byte_order=self.is_highlow_byte_order, ) - def decode_from_pdu(self, decode_state: DecodeState, bit_position: int = 0) -> AtomicOdxType: + def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: # Find length key with matching ID. bit_length = 0 diff --git a/odxtools/standardlengthtype.py b/odxtools/standardlengthtype.py index c458bc6f..63a03e8b 100644 --- a/odxtools/standardlengthtype.py +++ b/odxtools/standardlengthtype.py @@ -56,7 +56,7 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state: is_highlow_byte_order=self.is_highlow_byte_order, ) - def decode_from_pdu(self, decode_state: DecodeState, bit_position: int = 0) -> AtomicOdxType: + def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: internal_value, cursor_byte_position = self._extract_internal_value( decode_state.coded_message, decode_state.cursor_byte_position, diff --git a/tests/test_diag_coded_types.py b/tests/test_diag_coded_types.py index e1c72536..02151455 100644 --- a/tests/test_diag_coded_types.py +++ b/tests/test_diag_coded_types.py @@ -50,10 +50,10 @@ def test_decode_leading_length_info_type_bytefield(self) -> None: base_type_encoding=None, is_highlow_byte_order_raw=None, ) + # 0xC2 = 0b11000010, with bit_position=1 and bit_lenth=5, the extracted bits are 00001, + # i.e. the leading length is 1, i.e. only the byte 0x3 should be extracted. state = DecodeState( bytes([0x1, 0xC2, 0x3, 0x4]), cursor_byte_position=1, cursor_bit_position=1) - # 0xC2 = 11000010, with bit_position=1 and bit_lenth=5, the extracted bits are 00001, - # i.e. the leading length is 1, i.e. only the byte 0x3 should be extracted. internal_value = dct.decode_from_pdu(state) self.assertEqual(internal_value, bytes([0x3])) @@ -644,7 +644,7 @@ def test_decode_min_max_length_type_end_of_pdu(self) -> None: is_highlow_byte_order_raw=None, ) state = DecodeState(bytes([0x12, 0x34, 0x56, 0x78, 0x9A]), cursor_byte_position=1) - internal_value = dct.decode_from_pdu(state, bit_position=0) + internal_value = dct.decode_from_pdu(state) self.assertEqual(internal_value, bytes([0x34, 0x56, 0x78, 0x9A])) self.assertEqual(state.cursor_byte_position, 5)