From 1380bc5aa2da1169211f28cb6fea27c249bf0c3f Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Tue, 23 Apr 2024 12:56:28 +0200 Subject: [PATCH 1/3] move the `encode()` methods to the `Request` and `Response` classes the actual encoding is still done in `BasicStructure`, but within `encode_into_pdu()`. Signed-off-by: Andreas Lauser Signed-off-by: Florian Jost --- odxtools/basicstructure.py | 20 -------------------- odxtools/diagservice.py | 2 +- odxtools/request.py | 13 +++++++++++++ odxtools/response.py | 22 +++++++++------------- tests/test_decoding.py | 2 +- 5 files changed, 24 insertions(+), 35 deletions(-) diff --git a/odxtools/basicstructure.py b/odxtools/basicstructure.py index 59d97c67..71bed3f3 100644 --- a/odxtools/basicstructure.py +++ b/odxtools/basicstructure.py @@ -247,26 +247,6 @@ def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: return result - def encode(self, coded_request: Optional[bytes] = None, **kwargs: ParameterValue) -> bytes: - """ - Composes an UDS message as bytes for this service. - Parameters: - ---------- - coded_request: bytes - coded request (only needed when encoding a response) - kwargs: dict - Parameters of the RPC as mapping from SHORT-NAME of the parameter to the value - """ - encode_state = EncodeState( - coded_message=bytearray(), - parameter_values=kwargs, - triggering_request=coded_request, - is_end_of_pdu=True) - - self.encode_into_pdu(physical_value=kwargs, encode_state=encode_state) - - return encode_state.coded_message - def decode(self, message: bytes) -> ParameterValueDict: decode_state = DecodeState(coded_message=message) param_values = self.decode_from_pdu(decode_state) diff --git a/odxtools/diagservice.py b/odxtools/diagservice.py index bb16f984..82480907 100644 --- a/odxtools/diagservice.py +++ b/odxtools/diagservice.py @@ -236,7 +236,7 @@ def encode_request(self, **kwargs: ParameterValue) -> bytes: set(kwargs.keys()).issubset(rq_all_param_names), f"Unknown parameters specified for encoding: {kwargs.keys()}, " f"known parameters are: {rq_all_param_names}") - return self.request.encode(coded_request=None, **kwargs) + return self.request.encode(**kwargs) def encode_positive_response(self, coded_request: bytes, diff --git a/odxtools/request.py b/odxtools/request.py index cdda9b5f..f861af98 100644 --- a/odxtools/request.py +++ b/odxtools/request.py @@ -4,7 +4,9 @@ from xml.etree import ElementTree from .basicstructure import BasicStructure +from .encodestate import EncodeState from .odxlink import OdxDocFragment +from .odxtypes import ParameterValue from .utils import dataclass_fields_asdict @@ -20,3 +22,14 @@ def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) -> kwargs = dataclass_fields_asdict(BasicStructure.from_et(et_element, doc_frags)) return Request(**kwargs) + + def encode(self, **kwargs: ParameterValue) -> bytes: + encode_state = EncodeState( + coded_message=bytearray(), + parameter_values=kwargs, + triggering_request=None, + is_end_of_pdu=True) + + self.encode_into_pdu(physical_value=kwargs, encode_state=encode_state) + + return encode_state.coded_message diff --git a/odxtools/response.py b/odxtools/response.py index 8d91c91b..f2f4e408 100644 --- a/odxtools/response.py +++ b/odxtools/response.py @@ -5,10 +5,10 @@ from xml.etree import ElementTree from .basicstructure import BasicStructure +from .encodestate import EncodeState from .exceptions import odxraise from .odxlink import OdxDocFragment from .odxtypes import ParameterValue -from .parameters.matchingrequestparameter import MatchingRequestParameter from .utils import dataclass_fields_asdict @@ -38,17 +38,13 @@ def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) -> return Response(response_type=response_type, **kwargs) - def encode(self, coded_request: Optional[bytes] = None, **params: ParameterValue) -> bytes: - if coded_request is not None: - # Extract MATCHING-REQUEST-PARAMs from the coded - # request. TODO: this should be done by - # MatchingRequestParam itself! - for param in self.parameters: - if isinstance(param, MatchingRequestParameter): - byte_pos = param.request_byte_position - byte_length = param.byte_length + def encode(self, coded_request: Optional[bytes] = None, **kwargs: ParameterValue) -> bytes: + encode_state = EncodeState( + coded_message=bytearray(), + parameter_values=kwargs, + triggering_request=coded_request, + is_end_of_pdu=True) - val = coded_request[byte_pos:byte_pos + byte_length] - params[param.short_name] = val + self.encode_into_pdu(physical_value=kwargs, encode_state=encode_state) - return super().encode(coded_request=coded_request, **params) + return encode_state.coded_message diff --git a/tests/test_decoding.py b/tests/test_decoding.py index 368524dc..9a8db7b0 100644 --- a/tests/test_decoding.py +++ b/tests/test_decoding.py @@ -2042,7 +2042,7 @@ def test_physical_constant_parameter(self) -> None: actual_param_dict = request.decode(expected_coded_message) self.assertEqual(dict(actual_param_dict), expected_param_dict) - actual_coded_message = request.encode(None, **expected_param_dict) + actual_coded_message = request.encode(**expected_param_dict) self.assertEqual(actual_coded_message, expected_coded_message) self.assertRaises(DecodeError, request.decode, bytes([0x12, 0x34])) From 9bc7cf5de4cb644e69c2abb8b55095e7a5716360 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Tue, 23 Apr 2024 12:56:43 +0200 Subject: [PATCH 2/3] EncodeState: remove the `parameter_values` attribute the parameter values are now passed directly to the respective `encode_into_pdu()` methods. Signed-off-by: Andreas Lauser Signed-off-by: Florian Jost --- odxtools/basicstructure.py | 3 +-- odxtools/encodestate.py | 5 +---- odxtools/request.py | 5 +---- odxtools/response.py | 5 +---- tests/test_diag_coded_types.py | 31 +++++++++++-------------------- 5 files changed, 15 insertions(+), 34 deletions(-) diff --git a/odxtools/basicstructure.py b/odxtools/basicstructure.py index 71bed3f3..93c6c2e8 100644 --- a/odxtools/basicstructure.py +++ b/odxtools/basicstructure.py @@ -74,8 +74,7 @@ def get_static_bit_length(self) -> Optional[int]: return byte_length * 8 def coded_const_prefix(self, request_prefix: bytes = b'') -> bytes: - encode_state = EncodeState( - coded_message=bytearray(), parameter_values={}, triggering_request=request_prefix) + encode_state = EncodeState(coded_message=bytearray(), triggering_request=request_prefix) for param in self.parameters: if (isinstance(param, MatchingRequestParameter) and param.request_byte_position < len(request_prefix)) or \ diff --git a/odxtools/encodestate.py b/odxtools/encodestate.py index e2833518..ebcc77b5 100644 --- a/odxtools/encodestate.py +++ b/odxtools/encodestate.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: MIT import warnings from dataclasses import dataclass, field -from typing import Any, Dict, Optional +from typing import Dict, Optional from .exceptions import OdxWarning @@ -14,9 +14,6 @@ class EncodeState: #: payload that has been constructed so far coded_message: bytearray - #: a mapping from short name to value for each parameter - parameter_values: Dict[str, Any] - #: The absolute position in bytes from the beginning of the PDU to #: which relative positions refer to, e.g., the beginning of the #: structure. diff --git a/odxtools/request.py b/odxtools/request.py index f861af98..4fba2fe7 100644 --- a/odxtools/request.py +++ b/odxtools/request.py @@ -25,10 +25,7 @@ def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) -> def encode(self, **kwargs: ParameterValue) -> bytes: encode_state = EncodeState( - coded_message=bytearray(), - parameter_values=kwargs, - triggering_request=None, - is_end_of_pdu=True) + coded_message=bytearray(), triggering_request=None, is_end_of_pdu=True) self.encode_into_pdu(physical_value=kwargs, encode_state=encode_state) diff --git a/odxtools/response.py b/odxtools/response.py index f2f4e408..3876d377 100644 --- a/odxtools/response.py +++ b/odxtools/response.py @@ -40,10 +40,7 @@ def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) -> def encode(self, coded_request: Optional[bytes] = None, **kwargs: ParameterValue) -> bytes: encode_state = EncodeState( - coded_message=bytearray(), - parameter_values=kwargs, - triggering_request=coded_request, - is_end_of_pdu=True) + coded_message=bytearray(), triggering_request=coded_request, is_end_of_pdu=True) self.encode_into_pdu(physical_value=kwargs, encode_state=encode_state) diff --git a/tests/test_diag_coded_types.py b/tests/test_diag_coded_types.py index 719546a2..ed5123cb 100644 --- a/tests/test_diag_coded_types.py +++ b/tests/test_diag_coded_types.py @@ -76,7 +76,7 @@ def test_encode_leading_length_info_type_bytefield(self) -> None: base_type_encoding=None, is_highlow_byte_order_raw=None, ) - state = EncodeState(bytearray(), parameter_values={}, cursor_bit_position=3) + state = EncodeState(bytearray(), cursor_bit_position=3) dct.encode_into_pdu(bytes([0x3]), state) self.assertEqual(state.coded_message.hex(), "0803") @@ -88,8 +88,7 @@ def test_decode_leading_length_info_type_bytefield2(self) -> None: is_highlow_byte_order_raw=None, ) - state = EncodeState( - bytearray.fromhex("0000ff00"), parameter_values={}, cursor_bit_position=3) + state = EncodeState(bytearray.fromhex("0000ff00"), cursor_bit_position=3) dct.encode_into_pdu(bytes([0xcc]), state) self.assertEqual(state.coded_message.hex(), "08ccff00") self.assertEqual(state.cursor_byte_position, 2) @@ -133,7 +132,7 @@ def test_encode_leading_length_info_type_unicode2string(self) -> None: base_type_encoding=None, is_highlow_byte_order_raw=None, ) - state = EncodeState(coded_message=bytearray(), parameter_values={}) + state = EncodeState(coded_message=bytearray()) dct.encode_into_pdu("a9", state) self.assertEqual(state.coded_message, bytes([0x4, 0x00, 0x61, 0x00, 0x39])) @@ -145,7 +144,6 @@ def test_encode_leading_length_info_type_unicode2string(self) -> None: ) state = EncodeState( coded_message=bytearray(), - parameter_values={}, cursor_byte_position=0, cursor_bit_position=0, origin_byte_position=0) @@ -412,8 +410,7 @@ def test_encode_param_info_length_type_uint(self) -> None: odxlinks = OdxLinkDatabase() odxlinks.update({length_key_id: length_key}) dct._resolve_odxlinks(odxlinks) - state = EncodeState( - coded_message=bytearray([0xcc]), parameter_values={}, cursor_byte_position=2) + state = EncodeState(coded_message=bytearray([0xcc]), cursor_byte_position=2) dct.encode_into_pdu(0x12345, state) self.assertEqual(state.coded_message.hex(), "cc00012345") self.assertEqual(state.length_keys.get("length_key"), 24) @@ -690,8 +687,7 @@ def test_encode_min_max_length_type_hex_ff(self) -> None: termination="HEX-FF", is_highlow_byte_order_raw=None, ) - state = EncodeState( - coded_message=bytearray([0x00]), parameter_values={}, is_end_of_pdu=False) + state = EncodeState(coded_message=bytearray([0x00]), is_end_of_pdu=False) dct.encode_into_pdu(bytes([0x34, 0x56]), state) self.assertEqual(state.coded_message, bytes([0x34, 0x56, 0xFF])) @@ -704,8 +700,7 @@ def test_encode_min_max_length_type_zero(self) -> None: termination="ZERO", is_highlow_byte_order_raw=None, ) - state = EncodeState( - coded_message=bytearray([0x00]), parameter_values={}, is_end_of_pdu=False) + state = EncodeState(coded_message=bytearray([0x00]), is_end_of_pdu=False) dct.encode_into_pdu("Hi", state) self.assertEqual(state.coded_message, bytes([0x48, 0x69, 0x0])) @@ -720,18 +715,16 @@ def test_encode_min_max_length_type_end_of_pdu(self) -> None: termination=termination, is_highlow_byte_order_raw=None, ) - state = EncodeState(coded_message=bytearray(), parameter_values={}, is_end_of_pdu=True) + state = EncodeState(coded_message=bytearray(), is_end_of_pdu=True) dct.encode_into_pdu(bytes([0x34, 0x56, 0x78, 0x9A]), state) self.assertEqual(state.coded_message.hex(), "3456789a") if termination == "END-OF-PDU": - state = EncodeState( - coded_message=bytearray(), parameter_values={}, is_end_of_pdu=False) + state = EncodeState(coded_message=bytearray(), is_end_of_pdu=False) self.assertRaises(OdxError, dct.encode_into_pdu, bytes([0x34, 0x56, 0x78, 0x9A]), state) else: - state = EncodeState( - coded_message=bytearray(), parameter_values={}, is_end_of_pdu=False) + state = EncodeState(coded_message=bytearray(), is_end_of_pdu=False) dct.encode_into_pdu(bytes([0x34, 0x56, 0x78, 0x9A]), state) self.assertTrue(state.coded_message.hex().startswith("3456789a")) @@ -746,8 +739,7 @@ def test_encode_min_max_length_type_min_length(self) -> None: termination=termination, is_highlow_byte_order_raw=None, ) - state = EncodeState( - coded_message=bytearray([0x00]), parameter_values={}, is_end_of_pdu=True) + state = EncodeState(coded_message=bytearray([0x00]), is_end_of_pdu=True) dct.encode_into_pdu(bytes([0x34, 0x56]), state) self.assertTrue(state.coded_message.hex().startswith("3456")) self.assertRaises( @@ -768,8 +760,7 @@ def test_encode_min_max_length_type_max_length(self) -> None: termination=termination, is_highlow_byte_order_raw=None, ) - state = EncodeState( - coded_message=bytearray([0x00]), parameter_values={}, is_end_of_pdu=True) + state = EncodeState(coded_message=bytearray([0x00]), is_end_of_pdu=True) dct.encode_into_pdu(bytes([0x34, 0x56, 0x78]), state) self.assertEqual(state.coded_message, bytes([0x34, 0x56, 0x78])) self.assertRaises( From 8296f91b94a64b337f49d55b467963e74c9143f9 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Tue, 23 Apr 2024 12:57:13 +0200 Subject: [PATCH 3/3] EncodeState: rename `encode_atomic_value()` to `emplace_bytes()` Signed-off-by: Andreas Lauser Signed-off-by: Florian Jost --- odxtools/basicstructure.py | 2 +- odxtools/dynamiclengthfield.py | 2 +- odxtools/encodestate.py | 3 ++- odxtools/leadinglengthinfotype.py | 2 +- odxtools/minmaxlengthtype.py | 2 +- odxtools/parameters/matchingrequestparameter.py | 4 ++-- odxtools/parameters/reservedparameter.py | 2 +- odxtools/parameters/tablekeyparameter.py | 2 +- odxtools/paramlengthinfotype.py | 2 +- odxtools/standardlengthtype.py | 2 +- odxtools/staticfield.py | 2 +- 11 files changed, 13 insertions(+), 12 deletions(-) diff --git a/odxtools/basicstructure.py b/odxtools/basicstructure.py index 93c6c2e8..f558a00d 100644 --- a/odxtools/basicstructure.py +++ b/odxtools/basicstructure.py @@ -208,7 +208,7 @@ def encode_into_pdu(self, physical_value: Optional[ParameterValue], # position directly after the structure and let # EncodeState add the padding as needed. encode_state.cursor_byte_position = encode_state.origin_byte_position + self.byte_size - encode_state.emplace_atomic_value(b'', "") + encode_state.emplace_bytes(b'', "") # encode the length- and table keys. This cannot be done above # because we allow these to be defined implicitly (i.e. they diff --git a/odxtools/dynamiclengthfield.py b/odxtools/dynamiclengthfield.py index 042fd377..acd5f924 100644 --- a/odxtools/dynamiclengthfield.py +++ b/odxtools/dynamiclengthfield.py @@ -82,7 +82,7 @@ def encode_into_pdu(self, physical_value: ParameterValue, encode_state: EncodeSt # ensure the correct message size if the field is empty if len(physical_value) == 0: - encode_state.emplace_atomic_value(b"", "") + encode_state.emplace_bytes(b"", "") # move cursor and origin positions encode_state.origin_byte_position = orig_origin diff --git a/odxtools/encodestate.py b/odxtools/encodestate.py index ebcc77b5..69caedd8 100644 --- a/odxtools/encodestate.py +++ b/odxtools/encodestate.py @@ -47,7 +47,7 @@ class EncodeState: #: (needed for MinMaxLengthType, EndOfPduField, etc.) is_end_of_pdu: bool = True - def emplace_atomic_value(self, new_data: bytes, param_name: str) -> None: + def emplace_bytes(self, new_data: bytes, param_name: Optional[str] = None) -> None: pos = self.cursor_byte_position # Make blob longer if necessary @@ -60,6 +60,7 @@ def emplace_atomic_value(self, new_data: bytes, param_name: str) -> None: # the value to be inserted is bitwise "disjoint" from the # value which is already in the PDU... if self.coded_message[pos + i] & new_data[i] != 0: + param_name = "" if param_name is None else param_name warnings.warn( f"Object '{param_name}' overlaps with another parameter (bits are already set)", OdxWarning, diff --git a/odxtools/leadinglengthinfotype.py b/odxtools/leadinglengthinfotype.py index 89bf1e80..a52f7fc8 100644 --- a/odxtools/leadinglengthinfotype.py +++ b/odxtools/leadinglengthinfotype.py @@ -71,7 +71,7 @@ def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeSta is_highlow_byte_order=self.is_highlow_byte_order, ) - encode_state.emplace_atomic_value(length_bytes + value_bytes, "") + encode_state.emplace_bytes(length_bytes + value_bytes, "") @override def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: diff --git a/odxtools/minmaxlengthtype.py b/odxtools/minmaxlengthtype.py index 50e81022..10be50c9 100644 --- a/odxtools/minmaxlengthtype.py +++ b/odxtools/minmaxlengthtype.py @@ -108,7 +108,7 @@ def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeSta f"(Is: {len(value_bytes)} bytes.)", EncodeError) return - encode_state.emplace_atomic_value(value_bytes, "") + encode_state.emplace_bytes(value_bytes, "") def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: odxassert(decode_state.cursor_bit_position == 0, diff --git a/odxtools/parameters/matchingrequestparameter.py b/odxtools/parameters/matchingrequestparameter.py index a696ca0f..709020a5 100644 --- a/odxtools/parameters/matchingrequestparameter.py +++ b/odxtools/parameters/matchingrequestparameter.py @@ -71,8 +71,8 @@ def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue], f"bytes, need at least {rq_pos + rq_len} bytes", EncodeError) return - encode_state.emplace_atomic_value(encode_state.triggering_request[rq_pos:rq_pos + rq_len], - self.short_name) + encode_state.emplace_bytes(encode_state.triggering_request[rq_pos:rq_pos + rq_len], + self.short_name) @override def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue: diff --git a/odxtools/parameters/reservedparameter.py b/odxtools/parameters/reservedparameter.py index d8c87329..33520600 100644 --- a/odxtools/parameters/reservedparameter.py +++ b/odxtools/parameters/reservedparameter.py @@ -53,7 +53,7 @@ def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue], encode_state: EncodeState) -> None: raw_data = (0).to_bytes((encode_state.cursor_bit_position + self.bit_length + 7) // 8, "big") - encode_state.emplace_atomic_value(raw_data, self.short_name) + encode_state.emplace_bytes(raw_data, self.short_name) @override def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue: diff --git a/odxtools/parameters/tablekeyparameter.py b/odxtools/parameters/tablekeyparameter.py index 42178a6c..253dd2d4 100644 --- a/odxtools/parameters/tablekeyparameter.py +++ b/odxtools/parameters/tablekeyparameter.py @@ -185,7 +185,7 @@ def encode_placeholder_into_pdu(self, physical_value: Optional[ParameterValue], EncodeError) return - encode_state.emplace_atomic_value(bytes([0] * (size // 8)), self.short_name) + encode_state.emplace_bytes(bytes([0] * (size // 8)), self.short_name) encode_state.cursor_byte_position = max(orig_pos, encode_state.cursor_byte_position) encode_state.cursor_bit_position = 0 diff --git a/odxtools/paramlengthinfotype.py b/odxtools/paramlengthinfotype.py index dc2afd45..67ead7a8 100644 --- a/odxtools/paramlengthinfotype.py +++ b/odxtools/paramlengthinfotype.py @@ -87,7 +87,7 @@ def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeSta is_highlow_byte_order=self.is_highlow_byte_order, ) - encode_state.emplace_atomic_value(raw_data, "") + encode_state.emplace_bytes(raw_data, "") def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: # First, we need to find a length key with matching ID. diff --git a/odxtools/standardlengthtype.py b/odxtools/standardlengthtype.py index 7057b2fa..beb42e3c 100644 --- a/odxtools/standardlengthtype.py +++ b/odxtools/standardlengthtype.py @@ -57,7 +57,7 @@ def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeSta base_data_type=self.base_data_type, is_highlow_byte_order=self.is_highlow_byte_order, ) - encode_state.emplace_atomic_value(raw_data, "") + encode_state.emplace_bytes(raw_data, "") @override def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: diff --git a/odxtools/staticfield.py b/odxtools/staticfield.py index 1631985e..ae028770 100644 --- a/odxtools/staticfield.py +++ b/odxtools/staticfield.py @@ -72,7 +72,7 @@ def encode_into_pdu(self, physical_value: ParameterValue, encode_state: EncodeSt encode_state.cursor_byte_position = pos_before + self.item_byte_size elif pos_after - pos_before < self.item_byte_size: # add some padding bytes - encode_state.emplace_atomic_value( + encode_state.emplace_bytes( b'\x00' * (self.item_byte_size - (pos_after - pos_before)), "") @override