From 46d619682ecba5fe5667a44b640af1f561847b47 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Thu, 15 Feb 2024 10:20:14 +0100 Subject: [PATCH 1/5] browse tool: some cleanups for whatever reason, mypy started to complain recently. Maybe this has something to do with the recent switch to `InquirerPy`... Signed-off-by: Andreas Lauser Signed-off-by: Florian Jost --- odxtools/cli/browse.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/odxtools/cli/browse.py b/odxtools/cli/browse.py index 541e6899..9c8b3bcd 100644 --- a/odxtools/cli/browse.py +++ b/odxtools/cli/browse.py @@ -2,7 +2,7 @@ import argparse import logging import sys -from typing import List, Optional, Union +from typing import List, Optional, Union, cast from InquirerPy import prompt as PI_prompt from tabulate import tabulate # TODO: switch to rich tables @@ -99,7 +99,7 @@ def prompt_single_parameter_value(parameter: Parameter) -> Optional[AtomicOdxTyp return None elif parameter.physical_type.base_data_type is not None: return _convert_string_to_odx_type( - answer.get(parameter.short_name), parameter.physical_type.base_data_type) + cast(str, answer.get(parameter.short_name)), parameter.physical_type.base_data_type) else: logging.warning( f"Parameter {parameter.short_name} does not have a physical data type. Param details: {parameter}" @@ -148,7 +148,7 @@ def encode_message_interactively(sub_service: Union[Request, Response], lambda input: _convert_string_to_bytes(input), }] answer = PI_prompt(answered_request_prompt) - answered_request = answer.get("request") + answered_request = cast(bytes, answer.get("request")) print(f"Input interpretation as list: {list(answered_request)}") # Request values for parameters @@ -270,7 +270,10 @@ def browse(odxdb: Database) -> None: if answer.get("variant") == "[exit]": return - variant = odxdb.diag_layers[answer.get("variant")] + variant_name = answer.get("variant") + assert isinstance(variant_name, str) + variant = odxdb.diag_layers[variant_name] + print(f"{type(answer.get('variant'))=}") assert isinstance(variant, DiagLayer) if (rx_id := variant.get_receive_id()) is not None: @@ -287,7 +290,6 @@ def browse(odxdb: Database) -> None: f"{variant.variant_type.value} '{variant.short_name}' (Receive ID: {recv_id}, Send ID: {send_id})" ) - service_sn = 0 while True: services: List[DiagService] = [ s for s in variant.services if isinstance(s, DiagService) @@ -307,6 +309,7 @@ def browse(odxdb: Database) -> None: break service_sn = answer.get("service") + assert isinstance(service_sn, str) service = variant.services[service_sn] assert isinstance(service, DiagService) @@ -341,8 +344,8 @@ def browse(odxdb: Database) -> None: continue codec = answer.get("message_type") - if codec is not None: + assert isinstance(codec, (Request, Response)) table = extract_parameter_tabulation_data(codec.parameters) table_str = tabulate(table, headers='keys', tablefmt='presto') print(table_str) From 35d143761a5324aa29bec566d2c472f8cafec5f5 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Thu, 15 Feb 2024 10:18:59 +0100 Subject: [PATCH 2/5] make matching request parameters truly positionable i.e., if they are non-byte aligned their value will no longer feature garbage at the beginning and/or at the end. Besides this, their values are now integers, which is more intuitive IMO. Signed-off-by: Andreas Lauser Signed-off-by: Florian Jost --- odxtools/parameters/matchingrequestparameter.py | 15 +++++++++------ tests/test_decoding.py | 2 +- tests/test_somersault.py | 8 ++------ 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/odxtools/parameters/matchingrequestparameter.py b/odxtools/parameters/matchingrequestparameter.py index a9edfbe4..32f2896e 100644 --- a/odxtools/parameters/matchingrequestparameter.py +++ b/odxtools/parameters/matchingrequestparameter.py @@ -3,9 +3,10 @@ from typing import Optional from ..decodestate import DecodeState +from ..diagcodedtype import DiagCodedType from ..encodestate import EncodeState from ..exceptions import EncodeError -from ..odxtypes import ParameterValue +from ..odxtypes import DataType, ParameterValue from .parameter import Parameter, ParameterType @@ -42,11 +43,13 @@ def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: if self.byte_position is not None: decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position - byte_position = decode_state.cursor_byte_position - bit_position = self.bit_position or 0 - byte_length = (8 * self.byte_length + bit_position + 7) // 8 - result = decode_state.coded_message[byte_position:byte_position + byte_length] - decode_state.cursor_byte_position += byte_length + result, decode_state.cursor_byte_position = DiagCodedType._extract_internal_value( + coded_message=decode_state.coded_message, + byte_position=decode_state.cursor_byte_position, + bit_position=self.bit_position or 0, + bit_length=self.byte_length * 8, + base_data_type=DataType.A_UINT32, + is_highlow_byte_order=False) decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor) decode_state.cursor_bit_position = 0 diff --git a/tests/test_decoding.py b/tests/test_decoding.py index 0c7361be..459fbd7f 100644 --- a/tests/test_decoding.py +++ b/tests/test_decoding.py @@ -1242,7 +1242,7 @@ def test_decode_response(self) -> None: coding_object=message, param_dict={ "SID": sid, - "matching_req_param": bytes([0xAB]) + "matching_req_param": 0xAB }, ) decoded_message = diag_layer.decode(coded_message)[0] diff --git a/tests/test_somersault.py b/tests/test_somersault.py index 5397dfd7..0da49eae 100644 --- a/tests/test_somersault.py +++ b/tests/test_somersault.py @@ -259,7 +259,7 @@ def test_code_table_params(self) -> None: self.assertEqual( decoded_resp_data["last_pos_response"][1] # type: ignore[index, call-overload] ["num_flips_done"], # type: ignore[index, call-overload] - bytes([123])) + 123) self.assertEqual( decoded_resp_data["last_pos_response"][1] # type: ignore[index, call-overload] ["sault_time"], # type: ignore[index, call-overload] @@ -352,11 +352,7 @@ def test_decode_response(self) -> None: m = messages[0] self.assertEqual(m.coded_message.hex(), "fa03ff") self.assertEqual(m.coding_object, pos_response) - self.assertEqual(m.param_dict, { - "sid": 0xFA, - "num_flips_done": bytearray([0x03]), - "sault_time": 255 - }) + self.assertEqual(m.param_dict, {"sid": 0xFA, "num_flips_done": 0x03, "sault_time": 255}) class TestNavigation(unittest.TestCase): From d643820864ad17a83fc3508000588536a7a72159 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Thu, 15 Feb 2024 10:19:14 +0100 Subject: [PATCH 3/5] streamline `ParamLengthInfoType.decode_from_pdu()` a bit Signed-off-by: Andreas Lauser Signed-off-by: Florian Jost --- odxtools/paramlengthinfotype.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/odxtools/paramlengthinfotype.py b/odxtools/paramlengthinfotype.py index 3819e314..eb6847a5 100644 --- a/odxtools/paramlengthinfotype.py +++ b/odxtools/paramlengthinfotype.py @@ -75,10 +75,7 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state: ) def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: - # Find length key with matching ID. - bit_length = 0 - - # The bit length of the parameter to be extracted is given by the length key. + # First, we need to find a length key with matching ID. if self.length_key.short_name not in decode_state.length_keys: odxraise(f"Unspecified mandatory length key parameter " f"{self.length_key.short_name}") From 59324c3ee26751dbd70a9692b78aac40b279fd9f Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Thu, 15 Feb 2024 10:19:29 +0100 Subject: [PATCH 4/5] DiagCodedType: rename `_extract_internal_value()` to `_extract_atomic_value()` Signed-off-by: Andreas Lauser Signed-off-by: Florian Jost --- odxtools/diagcodedtype.py | 2 +- odxtools/leadinglengthinfotype.py | 4 ++-- odxtools/minmaxlengthtype.py | 4 ++-- odxtools/parameters/matchingrequestparameter.py | 2 +- odxtools/paramlengthinfotype.py | 2 +- odxtools/standardlengthtype.py | 2 +- 6 files changed, 8 insertions(+), 8 deletions(-) diff --git a/odxtools/diagcodedtype.py b/odxtools/diagcodedtype.py index e984e3d3..3c62cfcd 100644 --- a/odxtools/diagcodedtype.py +++ b/odxtools/diagcodedtype.py @@ -70,7 +70,7 @@ def is_highlow_byte_order(self) -> bool: return self.is_highlow_byte_order_raw in [None, True] @staticmethod - def _extract_internal_value( + def _extract_atomic_value( coded_message: bytes, byte_position: int, bit_position: int, diff --git a/odxtools/leadinglengthinfotype.py b/odxtools/leadinglengthinfotype.py index baf4a2d8..08e4e990 100644 --- a/odxtools/leadinglengthinfotype.py +++ b/odxtools/leadinglengthinfotype.py @@ -69,7 +69,7 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: coded_message = decode_state.coded_message # Extract length of the parameter value - byte_length, byte_position = self._extract_internal_value( + byte_length, byte_position = self._extract_atomic_value( coded_message=coded_message, byte_position=decode_state.cursor_byte_position, bit_position=decode_state.cursor_bit_position, @@ -85,7 +85,7 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: # Extract actual value # TODO: The returned value is None if the byte_length is 0. Maybe change it # to some default value like an empty bytearray() or 0? - value, cursor_byte_position = self._extract_internal_value( + value, cursor_byte_position = self._extract_atomic_value( coded_message=coded_message, byte_position=byte_position, bit_position=0, diff --git a/odxtools/minmaxlengthtype.py b/odxtools/minmaxlengthtype.py index ba2af8ad..0319a5b3 100644 --- a/odxtools/minmaxlengthtype.py +++ b/odxtools/minmaxlengthtype.py @@ -145,7 +145,7 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: terminator_pos += 1 # Extract the value - value, byte_pos = self._extract_internal_value( + value, byte_pos = self._extract_atomic_value( decode_state.coded_message, byte_position=cursor_pos, bit_position=0, @@ -167,7 +167,7 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: # or at the end of the PDU. byte_length = max_terminator_pos - cursor_pos - value, byte_pos = self._extract_internal_value( + value, byte_pos = self._extract_atomic_value( decode_state.coded_message, byte_position=cursor_pos, bit_position=0, diff --git a/odxtools/parameters/matchingrequestparameter.py b/odxtools/parameters/matchingrequestparameter.py index 32f2896e..40ab6fab 100644 --- a/odxtools/parameters/matchingrequestparameter.py +++ b/odxtools/parameters/matchingrequestparameter.py @@ -43,7 +43,7 @@ def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: if self.byte_position is not None: decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position - result, decode_state.cursor_byte_position = DiagCodedType._extract_internal_value( + result, decode_state.cursor_byte_position = DiagCodedType._extract_atomic_value( coded_message=decode_state.coded_message, byte_position=decode_state.cursor_byte_position, bit_position=self.bit_position or 0, diff --git a/odxtools/paramlengthinfotype.py b/odxtools/paramlengthinfotype.py index eb6847a5..38aa078c 100644 --- a/odxtools/paramlengthinfotype.py +++ b/odxtools/paramlengthinfotype.py @@ -88,7 +88,7 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: bit_length = 0 # Extract the internal value and return. - value, cursor_byte_position = self._extract_internal_value( + value, cursor_byte_position = self._extract_atomic_value( decode_state.coded_message, decode_state.cursor_byte_position, decode_state.cursor_bit_position, diff --git a/odxtools/standardlengthtype.py b/odxtools/standardlengthtype.py index 63a03e8b..4a42dadf 100644 --- a/odxtools/standardlengthtype.py +++ b/odxtools/standardlengthtype.py @@ -57,7 +57,7 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state: ) def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: - internal_value, cursor_byte_position = self._extract_internal_value( + internal_value, cursor_byte_position = self._extract_atomic_value( decode_state.coded_message, decode_state.cursor_byte_position, decode_state.cursor_bit_position, From 72595015542ab709484b787bcbfc165ebbad2fa8 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Fri, 16 Feb 2024 16:02:30 +0100 Subject: [PATCH 5/5] move the method for extracting atomic values to DecodeState thanks to [at]kayoub5 for the proposal. Signed-off-by: Andreas Lauser Signed-off-by: Florian Jost --- odxtools/decodestate.py | 84 +++++++++++++++++ odxtools/diagcodedtype.py | 89 ++----------------- odxtools/leadinglengthinfotype.py | 13 +-- odxtools/minmaxlengthtype.py | 48 +++++----- .../parameters/matchingrequestparameter.py | 7 +- odxtools/paramlengthinfotype.py | 8 +- odxtools/standardlengthtype.py | 8 +- 7 files changed, 117 insertions(+), 140 deletions(-) diff --git a/odxtools/decodestate.py b/odxtools/decodestate.py index af4efd41..284d4b24 100644 --- a/odxtools/decodestate.py +++ b/odxtools/decodestate.py @@ -2,9 +2,31 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING, Dict +import odxtools.exceptions as exceptions + +from .exceptions import DecodeError +from .odxtypes import AtomicOdxType, DataType + +try: + import bitstruct.c as bitstruct +except ImportError: + import bitstruct + if TYPE_CHECKING: from .tablerow import TableRow +# format specifiers for the data type using the bitstruct module +ODX_TYPE_TO_FORMAT_LETTER = { + DataType.A_INT32: "s", + DataType.A_UINT32: "u", + DataType.A_FLOAT32: "f", + DataType.A_FLOAT64: "f", + DataType.A_BYTEFIELD: "r", + DataType.A_UNICODE2STRING: "r", # UTF-16 strings must be converted explicitly + DataType.A_ASCIISTRING: "r", + DataType.A_UTF8STRING: "r", +} + @dataclass class DecodeState: @@ -35,3 +57,65 @@ class DecodeState: #: values of the table key parameters decoded so far table_keys: Dict[str, "TableRow"] = field(default_factory=dict) + + def extract_atomic_value( + self, + bit_length: int, + base_data_type: DataType, + is_highlow_byte_order: bool, + ) -> AtomicOdxType: + """Extract an internal value from a blob of raw bytes. + + :return: Tuple with the internal value of the object and the + byte position of the first undecoded byte after the + extracted object. + """ + # If the bit length is zero, return "empty" values of each type + if bit_length == 0: + return base_data_type.as_python_type()() + + byte_length = (bit_length + self.cursor_bit_position + 7) // 8 + if self.cursor_byte_position + byte_length > len(self.coded_message): + raise DecodeError(f"Expected a longer message.") + extracted_bytes = self.coded_message[self.cursor_byte_position:self.cursor_byte_position + + byte_length] + + # Apply byteorder for numerical objects. Note that doing this + # here might lead to garbage data being included in the result + # if the data to be extracted is not byte aligned and crosses + # byte boundaries, but it is what the specification says. + if not is_highlow_byte_order and base_data_type in [ + DataType.A_INT32, + DataType.A_UINT32, + DataType.A_FLOAT32, + DataType.A_FLOAT64, + ]: + extracted_bytes = extracted_bytes[::-1] + + padding = (8 - (bit_length + self.cursor_bit_position) % 8) % 8 + internal_value, = bitstruct.unpack_from( + f"{ODX_TYPE_TO_FORMAT_LETTER[base_data_type]}{bit_length}", + extracted_bytes, + offset=padding) + + text_errors = 'strict' if exceptions.strict_mode else 'replace' + if base_data_type == DataType.A_ASCIISTRING: + # The spec says ASCII, meaning only byte values 0-127. + # But in practice, vendors use iso-8859-1, aka latin-1 + # reason being iso-8859-1 never fails since it has a valid + # character mapping for every possible byte sequence. + text_encoding = 'iso-8859-1' + internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors) + elif base_data_type == DataType.A_UTF8STRING: + text_encoding = "utf-8" + internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors) + elif base_data_type == DataType.A_UNICODE2STRING: + # For UTF-16, we need to manually decode the extracted + # bytes to a string + text_encoding = "utf-16-be" if is_highlow_byte_order else "utf-16-le" + internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors) + + self.cursor_byte_position += byte_length + self.cursor_bit_position = 0 + + return internal_value diff --git a/odxtools/diagcodedtype.py b/odxtools/diagcodedtype.py index 3c62cfcd..8570dcb1 100644 --- a/odxtools/diagcodedtype.py +++ b/odxtools/diagcodedtype.py @@ -1,35 +1,22 @@ # SPDX-License-Identifier: MIT import abc from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union + +from .decodestate import ODX_TYPE_TO_FORMAT_LETTER, DecodeState +from .encodestate import EncodeState +from .exceptions import EncodeError, odxassert, odxraise +from .odxlink import OdxLinkDatabase, OdxLinkId +from .odxtypes import AtomicOdxType, DataType try: import bitstruct.c as bitstruct except ImportError: import bitstruct -from . import exceptions -from .decodestate import DecodeState -from .encodestate import EncodeState -from .exceptions import DecodeError, EncodeError, odxassert, odxraise -from .odxlink import OdxLinkDatabase, OdxLinkId -from .odxtypes import AtomicOdxType, DataType - if TYPE_CHECKING: from .diaglayer import DiagLayer -# format specifiers for the data type using the bitstruct module -ODX_TYPE_TO_FORMAT_LETTER = { - DataType.A_INT32: "s", - DataType.A_UINT32: "u", - DataType.A_FLOAT32: "f", - DataType.A_FLOAT64: "f", - DataType.A_BYTEFIELD: "r", - DataType.A_UNICODE2STRING: "r", # UTF-16 strings must be converted explicitly - DataType.A_ASCIISTRING: "r", - DataType.A_UTF8STRING: "r", -} - # Allowed diag-coded types DctType = Literal[ "LEADING-LENGTH-INFO-TYPE", @@ -69,68 +56,6 @@ def dct_type(self) -> DctType: def is_highlow_byte_order(self) -> bool: return self.is_highlow_byte_order_raw in [None, True] - @staticmethod - def _extract_atomic_value( - coded_message: bytes, - byte_position: int, - bit_position: int, - bit_length: int, - base_data_type: DataType, - is_highlow_byte_order: bool, - ) -> Tuple[AtomicOdxType, int]: - """Extract an internal value from a blob of raw bytes. - - :return: Tuple with the internal value of the object and the - byte position of the first undecoded byte after the - extracted object. - """ - # If the bit length is zero, return "empty" values of each type - if bit_length == 0: - return base_data_type.as_python_type()(), byte_position - - byte_length = (bit_length + bit_position + 7) // 8 - if byte_position + byte_length > len(coded_message): - raise DecodeError(f"Expected a longer message.") - cursor_byte_position = byte_position + byte_length - extracted_bytes = coded_message[byte_position:cursor_byte_position] - - # Apply byteorder for numerical objects. Note that doing this - # here might lead to garbage data being included in the result - # if the data to be extracted is not byte aligned and crosses - # byte boundaries, but it is what the specification says. - if not is_highlow_byte_order and base_data_type in [ - DataType.A_INT32, - DataType.A_UINT32, - DataType.A_FLOAT32, - DataType.A_FLOAT64, - ]: - extracted_bytes = extracted_bytes[::-1] - - padding = (8 - (bit_length + bit_position) % 8) % 8 - internal_value, = bitstruct.unpack_from( - f"{ODX_TYPE_TO_FORMAT_LETTER[base_data_type]}{bit_length}", - extracted_bytes, - offset=padding) - - text_errors = 'strict' if exceptions.strict_mode else 'replace' - if base_data_type == DataType.A_ASCIISTRING: - # The spec says ASCII, meaning only byte values 0-127. - # But in practice, vendors use iso-8859-1, aka latin-1 - # reason being iso-8859-1 never fails since it has a valid - # character mapping for every possible byte sequence. - text_encoding = 'iso-8859-1' - internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors) - elif base_data_type == DataType.A_UTF8STRING: - text_encoding = "utf-8" - internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors) - elif base_data_type == DataType.A_UNICODE2STRING: - # For UTF-16, we need to manually decode the extracted - # bytes to a string - text_encoding = "utf-16-be" if is_highlow_byte_order else "utf-16-le" - internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors) - - return internal_value, cursor_byte_position - @staticmethod def _encode_internal_value( internal_value: AtomicOdxType, diff --git a/odxtools/leadinglengthinfotype.py b/odxtools/leadinglengthinfotype.py index 08e4e990..aa8b2124 100644 --- a/odxtools/leadinglengthinfotype.py +++ b/odxtools/leadinglengthinfotype.py @@ -66,18 +66,13 @@ def convert_internal_to_bytes(self, internal_value: Any, encode_state: EncodeSta return length_bytes + value_bytes def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: - coded_message = decode_state.coded_message # Extract length of the parameter value - byte_length, byte_position = self._extract_atomic_value( - coded_message=coded_message, - byte_position=decode_state.cursor_byte_position, - bit_position=decode_state.cursor_bit_position, + byte_length = decode_state.extract_atomic_value( bit_length=self.bit_length, base_data_type=DataType.A_UINT32, # length is an integer is_highlow_byte_order=self.is_highlow_byte_order, ) - decode_state.cursor_bit_position = 0 if not isinstance(byte_length, int): odxraise() @@ -85,14 +80,10 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: # Extract actual value # TODO: The returned value is None if the byte_length is 0. Maybe change it # to some default value like an empty bytearray() or 0? - value, cursor_byte_position = self._extract_atomic_value( - coded_message=coded_message, - byte_position=byte_position, - bit_position=0, + value = decode_state.extract_atomic_value( bit_length=8 * byte_length, base_data_type=self.base_data_type, is_highlow_byte_order=self.is_highlow_byte_order, ) - decode_state.cursor_byte_position = cursor_byte_position return value diff --git a/odxtools/minmaxlengthtype.py b/odxtools/minmaxlengthtype.py index 0319a5b3..f5eb81f9 100644 --- a/odxtools/minmaxlengthtype.py +++ b/odxtools/minmaxlengthtype.py @@ -107,36 +107,40 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: raise DecodeError("The PDU ended before minimum length was reached.") coded_message = decode_state.coded_message - cursor_pos = decode_state.cursor_byte_position + orig_cursor_pos = decode_state.cursor_byte_position termination_seq = self.__termination_sequence() max_terminator_pos = len(coded_message) if self.max_length is not None: - max_terminator_pos = min(max_terminator_pos, cursor_pos + self.max_length) + max_terminator_pos = min(max_terminator_pos, orig_cursor_pos + self.max_length) if self.termination != "END-OF-PDU": # The parameter either ends after the maximum length, at # the end of the PDU or if a termination sequence is # found. - terminator_pos = cursor_pos + self.min_length + # Find the location of the termination sequence. The + # problem here is that the alignment of the termination + # sequence must be correct for it to be a termination + # sequence. (e.g., an odd-aligned double-zero for UTF-16 + # strings is *not* a termination sequence!) + terminator_pos = orig_cursor_pos + self.min_length while True: - # Search the termination sequence - terminator_pos = coded_message.find(termination_seq, terminator_pos, - max_terminator_pos) + terminator_pos = decode_state.coded_message.find(termination_seq, terminator_pos, + max_terminator_pos) if terminator_pos < 0: # termination sequence was not found, i.e., we # are terminated by either the end of the PDU or # our maximum size. (whatever is the smaller # value.) - byte_length = max_terminator_pos - cursor_pos + byte_length = max_terminator_pos - orig_cursor_pos break - elif (terminator_pos - cursor_pos) % len(termination_seq) == 0: + elif (terminator_pos - orig_cursor_pos) % len(termination_seq) == 0: # we found the termination sequence at a position # and it is correctly aligned (two-byte # termination sequences must be word aligned # relative to the beginning of the parameter)! - byte_length = terminator_pos - cursor_pos + byte_length = terminator_pos - orig_cursor_pos break else: # we found the termination sequence, but its @@ -145,38 +149,28 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: terminator_pos += 1 # Extract the value - value, byte_pos = self._extract_atomic_value( - decode_state.coded_message, - byte_position=cursor_pos, - bit_position=0, + value = decode_state.extract_atomic_value( bit_length=8 * byte_length, base_data_type=self.base_data_type, is_highlow_byte_order=self.is_highlow_byte_order, ) - if byte_pos != len(coded_message) and byte_pos - cursor_pos != self.max_length: - byte_pos += len(termination_seq) - - # next byte starts after the actual data and the termination sequence - decode_state.cursor_byte_position = byte_pos - decode_state.cursor_bit_position = 0 + if decode_state.cursor_byte_position != len( + decode_state.coded_message + ) and decode_state.cursor_byte_position - orig_cursor_pos != self.max_length: + # next object starts after the actual data and the termination sequence + decode_state.cursor_byte_position += len(termination_seq) return value else: # If termination == "END-OF-PDU", the parameter ends after max_length # or at the end of the PDU. - byte_length = max_terminator_pos - cursor_pos + byte_length = max_terminator_pos - orig_cursor_pos - value, byte_pos = self._extract_atomic_value( - decode_state.coded_message, - byte_position=cursor_pos, - bit_position=0, + value = decode_state.extract_atomic_value( bit_length=8 * byte_length, base_data_type=self.base_data_type, is_highlow_byte_order=self.is_highlow_byte_order, ) - decode_state.cursor_byte_position = byte_pos - decode_state.cursor_bit_position = 0 - return value diff --git a/odxtools/parameters/matchingrequestparameter.py b/odxtools/parameters/matchingrequestparameter.py index 40ab6fab..fb6b6c46 100644 --- a/odxtools/parameters/matchingrequestparameter.py +++ b/odxtools/parameters/matchingrequestparameter.py @@ -3,7 +3,6 @@ from typing import Optional from ..decodestate import DecodeState -from ..diagcodedtype import DiagCodedType from ..encodestate import EncodeState from ..exceptions import EncodeError from ..odxtypes import DataType, ParameterValue @@ -43,15 +42,11 @@ def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: if self.byte_position is not None: decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position - result, decode_state.cursor_byte_position = DiagCodedType._extract_atomic_value( - coded_message=decode_state.coded_message, - byte_position=decode_state.cursor_byte_position, - bit_position=self.bit_position or 0, + result = decode_state.extract_atomic_value( bit_length=self.byte_length * 8, base_data_type=DataType.A_UINT32, is_highlow_byte_order=False) decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor) - decode_state.cursor_bit_position = 0 return result diff --git a/odxtools/paramlengthinfotype.py b/odxtools/paramlengthinfotype.py index 38aa078c..5c11748c 100644 --- a/odxtools/paramlengthinfotype.py +++ b/odxtools/paramlengthinfotype.py @@ -88,16 +88,10 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: bit_length = 0 # Extract the internal value and return. - value, cursor_byte_position = self._extract_atomic_value( - decode_state.coded_message, - decode_state.cursor_byte_position, - decode_state.cursor_bit_position, + value = decode_state.extract_atomic_value( bit_length, self.base_data_type, self.is_highlow_byte_order, ) - decode_state.cursor_bit_position = 0 - decode_state.cursor_byte_position = cursor_byte_position - return value diff --git a/odxtools/standardlengthtype.py b/odxtools/standardlengthtype.py index 4a42dadf..0b8a729f 100644 --- a/odxtools/standardlengthtype.py +++ b/odxtools/standardlengthtype.py @@ -57,17 +57,11 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state: ) def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: - internal_value, cursor_byte_position = self._extract_atomic_value( - decode_state.coded_message, - decode_state.cursor_byte_position, - decode_state.cursor_bit_position, + internal_value = decode_state.extract_atomic_value( self.bit_length, self.base_data_type, self.is_highlow_byte_order, ) internal_value = self.__apply_mask(internal_value) - decode_state.cursor_byte_position = cursor_byte_position - decode_state.cursor_bit_position = 0 - return internal_value