diff --git a/odxtools/decodestate.py b/odxtools/decodestate.py index af4efd41..284d4b24 100644 --- a/odxtools/decodestate.py +++ b/odxtools/decodestate.py @@ -2,9 +2,31 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING, Dict +import odxtools.exceptions as exceptions + +from .exceptions import DecodeError +from .odxtypes import AtomicOdxType, DataType + +try: + import bitstruct.c as bitstruct +except ImportError: + import bitstruct + if TYPE_CHECKING: from .tablerow import TableRow +# format specifiers for the data type using the bitstruct module +ODX_TYPE_TO_FORMAT_LETTER = { + DataType.A_INT32: "s", + DataType.A_UINT32: "u", + DataType.A_FLOAT32: "f", + DataType.A_FLOAT64: "f", + DataType.A_BYTEFIELD: "r", + DataType.A_UNICODE2STRING: "r", # UTF-16 strings must be converted explicitly + DataType.A_ASCIISTRING: "r", + DataType.A_UTF8STRING: "r", +} + @dataclass class DecodeState: @@ -35,3 +57,65 @@ class DecodeState: #: values of the table key parameters decoded so far table_keys: Dict[str, "TableRow"] = field(default_factory=dict) + + def extract_atomic_value( + self, + bit_length: int, + base_data_type: DataType, + is_highlow_byte_order: bool, + ) -> AtomicOdxType: + """Extract an internal value from a blob of raw bytes. + + :return: Tuple with the internal value of the object and the + byte position of the first undecoded byte after the + extracted object. + """ + # If the bit length is zero, return "empty" values of each type + if bit_length == 0: + return base_data_type.as_python_type()() + + byte_length = (bit_length + self.cursor_bit_position + 7) // 8 + if self.cursor_byte_position + byte_length > len(self.coded_message): + raise DecodeError(f"Expected a longer message.") + extracted_bytes = self.coded_message[self.cursor_byte_position:self.cursor_byte_position + + byte_length] + + # Apply byteorder for numerical objects. Note that doing this + # here might lead to garbage data being included in the result + # if the data to be extracted is not byte aligned and crosses + # byte boundaries, but it is what the specification says. + if not is_highlow_byte_order and base_data_type in [ + DataType.A_INT32, + DataType.A_UINT32, + DataType.A_FLOAT32, + DataType.A_FLOAT64, + ]: + extracted_bytes = extracted_bytes[::-1] + + padding = (8 - (bit_length + self.cursor_bit_position) % 8) % 8 + internal_value, = bitstruct.unpack_from( + f"{ODX_TYPE_TO_FORMAT_LETTER[base_data_type]}{bit_length}", + extracted_bytes, + offset=padding) + + text_errors = 'strict' if exceptions.strict_mode else 'replace' + if base_data_type == DataType.A_ASCIISTRING: + # The spec says ASCII, meaning only byte values 0-127. + # But in practice, vendors use iso-8859-1, aka latin-1 + # reason being iso-8859-1 never fails since it has a valid + # character mapping for every possible byte sequence. + text_encoding = 'iso-8859-1' + internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors) + elif base_data_type == DataType.A_UTF8STRING: + text_encoding = "utf-8" + internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors) + elif base_data_type == DataType.A_UNICODE2STRING: + # For UTF-16, we need to manually decode the extracted + # bytes to a string + text_encoding = "utf-16-be" if is_highlow_byte_order else "utf-16-le" + internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors) + + self.cursor_byte_position += byte_length + self.cursor_bit_position = 0 + + return internal_value diff --git a/odxtools/diagcodedtype.py b/odxtools/diagcodedtype.py index 3c62cfcd..8570dcb1 100644 --- a/odxtools/diagcodedtype.py +++ b/odxtools/diagcodedtype.py @@ -1,35 +1,22 @@ # SPDX-License-Identifier: MIT import abc from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union + +from .decodestate import ODX_TYPE_TO_FORMAT_LETTER, DecodeState +from .encodestate import EncodeState +from .exceptions import EncodeError, odxassert, odxraise +from .odxlink import OdxLinkDatabase, OdxLinkId +from .odxtypes import AtomicOdxType, DataType try: import bitstruct.c as bitstruct except ImportError: import bitstruct -from . import exceptions -from .decodestate import DecodeState -from .encodestate import EncodeState -from .exceptions import DecodeError, EncodeError, odxassert, odxraise -from .odxlink import OdxLinkDatabase, OdxLinkId -from .odxtypes import AtomicOdxType, DataType - if TYPE_CHECKING: from .diaglayer import DiagLayer -# format specifiers for the data type using the bitstruct module -ODX_TYPE_TO_FORMAT_LETTER = { - DataType.A_INT32: "s", - DataType.A_UINT32: "u", - DataType.A_FLOAT32: "f", - DataType.A_FLOAT64: "f", - DataType.A_BYTEFIELD: "r", - DataType.A_UNICODE2STRING: "r", # UTF-16 strings must be converted explicitly - DataType.A_ASCIISTRING: "r", - DataType.A_UTF8STRING: "r", -} - # Allowed diag-coded types DctType = Literal[ "LEADING-LENGTH-INFO-TYPE", @@ -69,68 +56,6 @@ def dct_type(self) -> DctType: def is_highlow_byte_order(self) -> bool: return self.is_highlow_byte_order_raw in [None, True] - @staticmethod - def _extract_atomic_value( - coded_message: bytes, - byte_position: int, - bit_position: int, - bit_length: int, - base_data_type: DataType, - is_highlow_byte_order: bool, - ) -> Tuple[AtomicOdxType, int]: - """Extract an internal value from a blob of raw bytes. - - :return: Tuple with the internal value of the object and the - byte position of the first undecoded byte after the - extracted object. - """ - # If the bit length is zero, return "empty" values of each type - if bit_length == 0: - return base_data_type.as_python_type()(), byte_position - - byte_length = (bit_length + bit_position + 7) // 8 - if byte_position + byte_length > len(coded_message): - raise DecodeError(f"Expected a longer message.") - cursor_byte_position = byte_position + byte_length - extracted_bytes = coded_message[byte_position:cursor_byte_position] - - # Apply byteorder for numerical objects. Note that doing this - # here might lead to garbage data being included in the result - # if the data to be extracted is not byte aligned and crosses - # byte boundaries, but it is what the specification says. - if not is_highlow_byte_order and base_data_type in [ - DataType.A_INT32, - DataType.A_UINT32, - DataType.A_FLOAT32, - DataType.A_FLOAT64, - ]: - extracted_bytes = extracted_bytes[::-1] - - padding = (8 - (bit_length + bit_position) % 8) % 8 - internal_value, = bitstruct.unpack_from( - f"{ODX_TYPE_TO_FORMAT_LETTER[base_data_type]}{bit_length}", - extracted_bytes, - offset=padding) - - text_errors = 'strict' if exceptions.strict_mode else 'replace' - if base_data_type == DataType.A_ASCIISTRING: - # The spec says ASCII, meaning only byte values 0-127. - # But in practice, vendors use iso-8859-1, aka latin-1 - # reason being iso-8859-1 never fails since it has a valid - # character mapping for every possible byte sequence. - text_encoding = 'iso-8859-1' - internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors) - elif base_data_type == DataType.A_UTF8STRING: - text_encoding = "utf-8" - internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors) - elif base_data_type == DataType.A_UNICODE2STRING: - # For UTF-16, we need to manually decode the extracted - # bytes to a string - text_encoding = "utf-16-be" if is_highlow_byte_order else "utf-16-le" - internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors) - - return internal_value, cursor_byte_position - @staticmethod def _encode_internal_value( internal_value: AtomicOdxType, diff --git a/odxtools/leadinglengthinfotype.py b/odxtools/leadinglengthinfotype.py index 08e4e990..aa8b2124 100644 --- a/odxtools/leadinglengthinfotype.py +++ b/odxtools/leadinglengthinfotype.py @@ -66,18 +66,13 @@ def convert_internal_to_bytes(self, internal_value: Any, encode_state: EncodeSta return length_bytes + value_bytes def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: - coded_message = decode_state.coded_message # Extract length of the parameter value - byte_length, byte_position = self._extract_atomic_value( - coded_message=coded_message, - byte_position=decode_state.cursor_byte_position, - bit_position=decode_state.cursor_bit_position, + byte_length = decode_state.extract_atomic_value( bit_length=self.bit_length, base_data_type=DataType.A_UINT32, # length is an integer is_highlow_byte_order=self.is_highlow_byte_order, ) - decode_state.cursor_bit_position = 0 if not isinstance(byte_length, int): odxraise() @@ -85,14 +80,10 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: # Extract actual value # TODO: The returned value is None if the byte_length is 0. Maybe change it # to some default value like an empty bytearray() or 0? - value, cursor_byte_position = self._extract_atomic_value( - coded_message=coded_message, - byte_position=byte_position, - bit_position=0, + value = decode_state.extract_atomic_value( bit_length=8 * byte_length, base_data_type=self.base_data_type, is_highlow_byte_order=self.is_highlow_byte_order, ) - decode_state.cursor_byte_position = cursor_byte_position return value diff --git a/odxtools/minmaxlengthtype.py b/odxtools/minmaxlengthtype.py index 0319a5b3..f5eb81f9 100644 --- a/odxtools/minmaxlengthtype.py +++ b/odxtools/minmaxlengthtype.py @@ -107,36 +107,40 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: raise DecodeError("The PDU ended before minimum length was reached.") coded_message = decode_state.coded_message - cursor_pos = decode_state.cursor_byte_position + orig_cursor_pos = decode_state.cursor_byte_position termination_seq = self.__termination_sequence() max_terminator_pos = len(coded_message) if self.max_length is not None: - max_terminator_pos = min(max_terminator_pos, cursor_pos + self.max_length) + max_terminator_pos = min(max_terminator_pos, orig_cursor_pos + self.max_length) if self.termination != "END-OF-PDU": # The parameter either ends after the maximum length, at # the end of the PDU or if a termination sequence is # found. - terminator_pos = cursor_pos + self.min_length + # Find the location of the termination sequence. The + # problem here is that the alignment of the termination + # sequence must be correct for it to be a termination + # sequence. (e.g., an odd-aligned double-zero for UTF-16 + # strings is *not* a termination sequence!) + terminator_pos = orig_cursor_pos + self.min_length while True: - # Search the termination sequence - terminator_pos = coded_message.find(termination_seq, terminator_pos, - max_terminator_pos) + terminator_pos = decode_state.coded_message.find(termination_seq, terminator_pos, + max_terminator_pos) if terminator_pos < 0: # termination sequence was not found, i.e., we # are terminated by either the end of the PDU or # our maximum size. (whatever is the smaller # value.) - byte_length = max_terminator_pos - cursor_pos + byte_length = max_terminator_pos - orig_cursor_pos break - elif (terminator_pos - cursor_pos) % len(termination_seq) == 0: + elif (terminator_pos - orig_cursor_pos) % len(termination_seq) == 0: # we found the termination sequence at a position # and it is correctly aligned (two-byte # termination sequences must be word aligned # relative to the beginning of the parameter)! - byte_length = terminator_pos - cursor_pos + byte_length = terminator_pos - orig_cursor_pos break else: # we found the termination sequence, but its @@ -145,38 +149,28 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: terminator_pos += 1 # Extract the value - value, byte_pos = self._extract_atomic_value( - decode_state.coded_message, - byte_position=cursor_pos, - bit_position=0, + value = decode_state.extract_atomic_value( bit_length=8 * byte_length, base_data_type=self.base_data_type, is_highlow_byte_order=self.is_highlow_byte_order, ) - if byte_pos != len(coded_message) and byte_pos - cursor_pos != self.max_length: - byte_pos += len(termination_seq) - - # next byte starts after the actual data and the termination sequence - decode_state.cursor_byte_position = byte_pos - decode_state.cursor_bit_position = 0 + if decode_state.cursor_byte_position != len( + decode_state.coded_message + ) and decode_state.cursor_byte_position - orig_cursor_pos != self.max_length: + # next object starts after the actual data and the termination sequence + decode_state.cursor_byte_position += len(termination_seq) return value else: # If termination == "END-OF-PDU", the parameter ends after max_length # or at the end of the PDU. - byte_length = max_terminator_pos - cursor_pos + byte_length = max_terminator_pos - orig_cursor_pos - value, byte_pos = self._extract_atomic_value( - decode_state.coded_message, - byte_position=cursor_pos, - bit_position=0, + value = decode_state.extract_atomic_value( bit_length=8 * byte_length, base_data_type=self.base_data_type, is_highlow_byte_order=self.is_highlow_byte_order, ) - decode_state.cursor_byte_position = byte_pos - decode_state.cursor_bit_position = 0 - return value diff --git a/odxtools/parameters/matchingrequestparameter.py b/odxtools/parameters/matchingrequestparameter.py index 40ab6fab..fb6b6c46 100644 --- a/odxtools/parameters/matchingrequestparameter.py +++ b/odxtools/parameters/matchingrequestparameter.py @@ -3,7 +3,6 @@ from typing import Optional from ..decodestate import DecodeState -from ..diagcodedtype import DiagCodedType from ..encodestate import EncodeState from ..exceptions import EncodeError from ..odxtypes import DataType, ParameterValue @@ -43,15 +42,11 @@ def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: if self.byte_position is not None: decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position - result, decode_state.cursor_byte_position = DiagCodedType._extract_atomic_value( - coded_message=decode_state.coded_message, - byte_position=decode_state.cursor_byte_position, - bit_position=self.bit_position or 0, + result = decode_state.extract_atomic_value( bit_length=self.byte_length * 8, base_data_type=DataType.A_UINT32, is_highlow_byte_order=False) decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor) - decode_state.cursor_bit_position = 0 return result diff --git a/odxtools/paramlengthinfotype.py b/odxtools/paramlengthinfotype.py index 38aa078c..5c11748c 100644 --- a/odxtools/paramlengthinfotype.py +++ b/odxtools/paramlengthinfotype.py @@ -88,16 +88,10 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: bit_length = 0 # Extract the internal value and return. - value, cursor_byte_position = self._extract_atomic_value( - decode_state.coded_message, - decode_state.cursor_byte_position, - decode_state.cursor_bit_position, + value = decode_state.extract_atomic_value( bit_length, self.base_data_type, self.is_highlow_byte_order, ) - decode_state.cursor_bit_position = 0 - decode_state.cursor_byte_position = cursor_byte_position - return value diff --git a/odxtools/standardlengthtype.py b/odxtools/standardlengthtype.py index 4a42dadf..0b8a729f 100644 --- a/odxtools/standardlengthtype.py +++ b/odxtools/standardlengthtype.py @@ -57,17 +57,11 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state: ) def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: - internal_value, cursor_byte_position = self._extract_atomic_value( - decode_state.coded_message, - decode_state.cursor_byte_position, - decode_state.cursor_bit_position, + internal_value = decode_state.extract_atomic_value( self.bit_length, self.base_data_type, self.is_highlow_byte_order, ) internal_value = self.__apply_mask(internal_value) - decode_state.cursor_byte_position = cursor_byte_position - decode_state.cursor_bit_position = 0 - return internal_value