From 295f67ee34625e482d7b5324c4731358d5a76880 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Mon, 22 Apr 2024 13:31:24 +0200 Subject: [PATCH 1/2] refactor encoding: parameters Signed-off-by: Andreas Lauser Signed-off-by: Gerrit Ecke --- odxtools/basicstructure.py | 114 ++++++++++-------- odxtools/dynamiclengthfield.py | 36 +++--- odxtools/parameters/codedconstparameter.py | 35 +++--- odxtools/parameters/dynamicparameter.py | 9 +- odxtools/parameters/lengthkeyparameter.py | 82 ++++++++++--- .../parameters/matchingrequestparameter.py | 34 ++++-- odxtools/parameters/nrcconstparameter.py | 44 +++---- odxtools/parameters/parameter.py | 72 +++++------ odxtools/parameters/parameterwithdop.py | 13 +- .../parameters/physicalconstantparameter.py | 29 +++-- odxtools/parameters/reservedparameter.py | 7 +- odxtools/parameters/systemparameter.py | 5 +- odxtools/parameters/tableentryparameter.py | 5 +- odxtools/parameters/tablekeyparameter.py | 112 ++++++++++++----- odxtools/parameters/tablestructparameter.py | 83 +++++++------ odxtools/parameters/valueparameter.py | 30 ++--- tests/test_encoding.py | 8 +- tests/test_somersault.py | 4 +- 18 files changed, 436 insertions(+), 286 deletions(-) diff --git a/odxtools/basicstructure.py b/odxtools/basicstructure.py index 4f32583c..fed0d9e5 100644 --- a/odxtools/basicstructure.py +++ b/odxtools/basicstructure.py @@ -74,13 +74,13 @@ def get_static_bit_length(self) -> Optional[int]: return ((length + 7) // 8) * 8 def coded_const_prefix(self, request_prefix: bytes = b'') -> bytes: - prefix = b'' encode_state = EncodeState( - bytearray(prefix), parameter_values={}, triggering_request=request_prefix) + coded_message=bytearray(), parameter_values={}, triggering_request=request_prefix) + for param in self.parameters: - if isinstance(param, (CodedConstParameter, NrcConstParameter, MatchingRequestParameter, - PhysicalConstantParameter)): - encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state)) + if (isinstance(param, MatchingRequestParameter) and param.request_byte_position < len(request_prefix)) or \ + isinstance(param, (CodedConstParameter, NrcConstParameter, PhysicalConstantParameter)): + param.encode_into_pdu(physical_value=None, encode_state=encode_state) else: break return encode_state.coded_message @@ -122,52 +122,69 @@ def convert_physical_to_internal(self, triggering_coded_request: Optional[bytes], is_end_of_pdu: bool = True) -> bytes: + encode_state = EncodeState( + bytearray(), + parameter_values=cast(Dict[str, ParameterValue], param_value), + triggering_request=triggering_coded_request, + is_end_of_pdu=False) + if not isinstance(param_value, dict): - raise EncodeError( + odxraise( f"Expected a dictionary for the values of structure {self.short_name}, " - f"got {type(param_value)}") + f"got {type(param_value).__name__}", EncodeError) + elif encode_state.cursor_bit_position != 0: + odxraise( + f"Structures must be byte aligned, but " + f"{self.short_name} requested to be at bit position " + f"{encode_state.cursor_bit_position}", EncodeError) + encode_state.bit_position = 0 # in strict mode, ensure that no values for unknown parameters are specified. if strict_mode: - param_names = [param.short_name for param in self.parameters] - for param_key in param_value: - if param_key not in param_names: - odxraise(f"Value for unknown parameter '{param_key}' specified") - - encode_state = EncodeState( - bytearray(), - dict(param_value), - triggering_request=triggering_coded_request, - is_end_of_pdu=False) + param_names = {param.short_name for param in self.parameters} + for param_value_name in param_value: + if param_value_name not in param_names: + odxraise(f"Value for unknown parameter '{param_value_name}' specified " + f"for structure {self.short_name}") for param in self.parameters: - if param == self.parameters[-1]: - # The last parameter is at the end of the PDU if the - # structure itself is at the end of the PDU. TODO: - # This assumes that the last parameter specified in - # the ODX is located last in the PDU... + if id(param) == id(self.parameters[-1]): + # The last parameter of the structure is at the end of + # the PDU if the structure itself is at the end of the + # PDU. TODO: This assumes that the last parameter + # specified in the ODX is located last in the PDU... encode_state.is_end_of_pdu = is_end_of_pdu - if isinstance( - param, - (LengthKeyParameter, TableKeyParameter)) and param.short_name in param_value: - # This is a hack to always encode a dummy value for - # length- and table keys. since these can be specified + if isinstance(param, (LengthKeyParameter, TableKeyParameter)): + # At this point, we encode a placeholder value for length- + # and table keys, since these can be specified # implicitly (i.e., by means of parameters that use - # these keys), to avoid getting an "overlapping + # these keys). To avoid getting an "overlapping # parameter" warning, we must encode a value of zero # into the PDU here and add the real value of the - # parameter in a post processing step. - tmp = encode_state.parameter_values.pop(param.short_name) - encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state)) - encode_state.parameter_values[param.short_name] = tmp + # parameter in a post-processing step. + param.encode_placeholder_into_pdu( + physical_value=param_value.get(param.short_name), encode_state=encode_state) + continue - encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state)) + if param.is_required and param.short_name not in param_value: + odxraise(f"No value for required parameter {param.short_name} specified", + EncodeError) - if self.byte_size is not None and len(encode_state.coded_message) < self.byte_size: - # Padding bytes needed - encode_state.coded_message = encode_state.coded_message.ljust(self.byte_size, b"\0") + param.encode_into_pdu( + physical_value=param_value.get(param.short_name), encode_state=encode_state) + + if self.byte_size is not None: + if len(encode_state.coded_message) < self.byte_size: + # Padding bytes needed + encode_state.coded_message = encode_state.coded_message.ljust(self.byte_size, b"\0") + elif len(encode_state.coded_message) > self.byte_size: + odxraise( + f"Encoded structure {self.short_name} is too large: " + f"{len(encode_state.coded_message)} instead of {self.byte_size} " + f"bytes", EncodeError) + return # encode the length- and table keys. This cannot be done above # because we allow these to be defined implicitly (i.e. they @@ -177,22 +194,25 @@ def convert_physical_to_internal(self, # the current parameter is neither a length- nor a table key continue - # Encode the key parameter into the message - encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state)) + # Encode the value of the key parameter into the message + param.encode_value_into_pdu(encode_state=encode_state) # Assert that length is as expected - self._validate_coded_message(encode_state.coded_message) + self._validate_coded_message_size(encode_state.cursor_byte_position - + encode_state.origin_byte_position) - return bytearray(encode_state.coded_message) + return encode_state.coded_message - def _validate_coded_message(self, coded_message: bytes) -> None: + def _validate_coded_message_size(self, coded_byte_len: int) -> None: if self.byte_size is not None: # We definitely broke something if we didn't respect the explicit byte_size - odxassert( - len(coded_message) == self.byte_size, - "Verification of coded message {coded_message.hex()} failed: Incorrect size.") - # No need to check further + if self.byte_size != coded_byte_len: + warnings.warn( + "Verification of coded message failed: Incorrect size.", + OdxWarning, + stacklevel=1) + return bit_length = self.get_static_bit_length() @@ -201,12 +221,12 @@ def _validate_coded_message(self, coded_message: bytes) -> None: # Nothing to check return - if len(coded_message) * 8 != bit_length: + if coded_byte_len * 8 != bit_length: # We may have broke something # but it could be that bit_length was mis calculated and not the actual bytes are wrong # Could happen with overlapping parameters and parameters with gaps warnings.warn( - f"Verification of coded message '{coded_message.hex()}' possibly failed: Size may be incorrect.", + "Verification of coded message possibly failed: Size may be incorrect.", OdxWarning, stacklevel=1) diff --git a/odxtools/dynamiclengthfield.py b/odxtools/dynamiclengthfield.py index cac08603..ff37afe0 100644 --- a/odxtools/dynamiclengthfield.py +++ b/odxtools/dynamiclengthfield.py @@ -60,30 +60,38 @@ def convert_physical_to_bytes( f"Expected a list of values for dynamic length field {self.short_name}, " f"got {type(physical_value)}", EncodeError) + tmp_state = EncodeState( + coded_message=bytearray(), + parameter_values={}, + triggering_request=encode_state.triggering_request, + origin_byte_position=0, + cursor_byte_position=0, + cursor_bit_position=0) + det_num_items = self.determine_number_of_items - field_len = det_num_items.dop.convert_physical_to_bytes( - len(physical_value), encode_state, det_num_items.bit_position or 0) + field_len_bytes = det_num_items.dop.convert_physical_to_bytes( + len(physical_value), tmp_state, det_num_items.bit_position or 0) # hack to emplace the length specifier at the correct location - tmp = encode_state.coded_message - encode_state.coded_message = bytearray() - encode_state.cursor_byte_position = det_num_items.byte_position - encode_state.emplace_atomic_value(field_len, self.short_name + ".num_items") - result = encode_state.coded_message - encode_state.coded_message = tmp + tmp_state.cursor_byte_position = det_num_items.byte_position + tmp_state.cursor_bit_position = det_num_items.bit_position or 0 + tmp_state.emplace_atomic_value(field_len_bytes, self.short_name + ".num_items") # if required, add padding between the length specifier and # the first item - if len(result) < self.offset: - result.extend([0] * (self.offset - len(result))) - elif len(result) > self.offset: + if len(tmp_state.coded_message) < self.offset: + tmp_state.coded_message += b'\00' * (self.offset - len(tmp_state.coded_message)) + tmp_state.cursor_byte_position = self.offset + elif len(field_len_bytes) > self.offset: odxraise(f"The length specifier of field {self.short_name} overlaps " f"with the first item!") + tmp_state.cursor_byte_position = self.offset - for value in physical_value: - result += self.structure.convert_physical_to_bytes(value, encode_state) + for i, value in enumerate(physical_value): + tmp_bytes = self.structure.convert_physical_to_bytes(value, tmp_state) + tmp_state.emplace_atomic_value(tmp_bytes, f"{self.short_name}{i}") - return result + return tmp_state.coded_message def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: diff --git a/odxtools/parameters/codedconstparameter.py b/odxtools/parameters/codedconstparameter.py index a2e8e265..e2ae4c84 100644 --- a/odxtools/parameters/codedconstparameter.py +++ b/odxtools/parameters/codedconstparameter.py @@ -10,9 +10,9 @@ from ..decodestate import DecodeState from ..diagcodedtype import DiagCodedType from ..encodestate import EncodeState -from ..exceptions import DecodeError, odxrequire +from ..exceptions import DecodeError, EncodeError, odxraise, odxrequire from ..odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId -from ..odxtypes import AtomicOdxType, DataType +from ..odxtypes import AtomicOdxType, DataType, ParameterValue from ..utils import dataclass_fields_asdict from .parameter import Parameter, ParameterType @@ -81,30 +81,23 @@ def is_settable(self) -> bool: return False @override - def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: - if (self.short_name in encode_state.parameter_values and - encode_state.parameter_values[self.short_name] != self.coded_value): - raise TypeError(f"The parameter '{self.short_name}' is constant {self._coded_value_str}" - " and thus can not be changed.") - - tmp_state = EncodeState( - bytearray(), - encode_state.parameter_values, - triggering_request=encode_state.triggering_request, - is_end_of_pdu=False, - cursor_byte_position=0, - cursor_bit_position=0, - origin_byte_position=0) - encode_state.cursor_bit_position = self.bit_position or 0 - self.diag_coded_type.encode_into_pdu(self.coded_value, encode_state=tmp_state) - - return tmp_state.coded_message + def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue], + encode_state: EncodeState) -> None: + if physical_value is not None and physical_value != self.coded_value: + odxraise( + f"Value for constant parameter `{self.short_name}` name can " + f"only be specified as {self.coded_value!r} (is: {physical_value!r})", EncodeError) + + internal_value = self.coded_value + + self.diag_coded_type.encode_into_pdu( + internal_value=internal_value, encode_state=encode_state) @override def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: coded_val = self.diag_coded_type.decode_from_pdu(decode_state) - # Check if the coded value in the message is correct. + # Check if the coded value contained by the message is correct. if self.coded_value != coded_val: warnings.warn( f"Coded constant parameter does not match! " diff --git a/odxtools/parameters/dynamicparameter.py b/odxtools/parameters/dynamicparameter.py index 1323bc95..215a3b6e 100644 --- a/odxtools/parameters/dynamicparameter.py +++ b/odxtools/parameters/dynamicparameter.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import List +from typing import List, Optional from xml.etree import ElementTree from typing_extensions import override @@ -41,9 +41,10 @@ def is_settable(self) -> bool: raise NotImplementedError(".is_settable for a DynamicParameter") @override - def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: - raise NotImplementedError("Encoding a DynamicParameter is not implemented yet.") + def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue], + encode_state: EncodeState) -> None: + raise NotImplementedError("Encoding DynamicParameter is not implemented yet.") @override def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue: - raise NotImplementedError("Decoding a DynamicParameter is not implemented yet.") + raise NotImplementedError("Decoding DynamicParameter is not implemented yet.") diff --git a/odxtools/parameters/lengthkeyparameter.py b/odxtools/parameters/lengthkeyparameter.py index d04e572b..501d21f7 100644 --- a/odxtools/parameters/lengthkeyparameter.py +++ b/odxtools/parameters/lengthkeyparameter.py @@ -1,13 +1,13 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List +from typing import TYPE_CHECKING, Any, Dict, List, Optional from xml.etree import ElementTree -from typing_extensions import override +from typing_extensions import final, override from ..decodestate import DecodeState from ..encodestate import EncodeState -from ..exceptions import odxraise, odxrequire +from ..exceptions import EncodeError, odxraise, odxrequire from ..odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId from ..odxtypes import ParameterValue from ..utils import dataclass_fields_asdict @@ -77,19 +77,73 @@ def is_settable(self) -> bool: return True @override - def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: - physical_value = encode_state.length_keys.get(self.short_name) - if physical_value is None: - physical_value = encode_state.parameter_values.get(self.short_name, 0) + @final + def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue], + encode_state: EncodeState) -> None: + # if you get this exception, you ought to use + # `.encode_placeholder_into_pdu()` followed by (after the + # value of the length key has been determined) + # `.encode_value_into_pdu()`. + raise RuntimeError("_encode_positioned_into_pdu() cannot be called for length keys.") + + def encode_placeholder_into_pdu(self, physical_value: Optional[ParameterValue], + encode_state: EncodeState) -> None: + + if physical_value is not None: + if not self.dop.is_valid_physical_value(physical_value): + odxraise(f"Invalid explicitly specified physical value '{physical_value!r}' " + f"for length key '{self.short_name}'.") + + lkv = encode_state.length_keys.get(self.short_name) + if lkv is not None and lkv != physical_value: + odxraise(f"Got conflicting values for length key {self.short_name}: " + f"{lkv} and {physical_value!r}") + + if not isinstance(physical_value, int): + odxraise( + f"Value of length key {self.short_name} is of type {type(physical_value).__name__} " + f"instead of int") + + encode_state.length_keys[self.short_name] = physical_value + + orig_cursor = encode_state.cursor_byte_position + pos = encode_state.cursor_byte_position + if self.byte_position is not None: + pos = encode_state.origin_byte_position + self.byte_position + encode_state.key_pos[self.short_name] = pos + encode_state.cursor_byte_position = pos bit_pos = self.bit_position or 0 - dop = odxrequire(super().dop, - f"A DOP is required for length key parameter {self.short_name}") - return dop.convert_physical_to_bytes(physical_value, encode_state, bit_position=bit_pos) - - @override - def encode_into_pdu(self, encode_state: EncodeState) -> bytes: - return super().encode_into_pdu(encode_state) + bit_size = self.dop.get_static_bit_length() + if bit_size is None: + odxraise("The DOP of length key {self.short_name} must exhibit a fixed size.", + EncodeError) + return + + raw_data = b'\x00' * ((bit_pos + bit_size + 7) // 8) + encode_state.emplace_atomic_value(raw_data, self.short_name) + + encode_state.cursor_byte_position = max(encode_state.cursor_byte_position, orig_cursor) + encode_state.cursor_bit_position = 0 + + def encode_value_into_pdu(self, encode_state: EncodeState) -> None: + + if self.short_name not in encode_state.length_keys: + odxraise( + f"Length key {self.short_name} has not been defined before " + f"it is required.", EncodeError) + return + else: + physical_value = encode_state.length_keys[self.short_name] + + encode_state.cursor_byte_position = encode_state.key_pos[self.short_name] + encode_state.cursor_bit_position = self.bit_position or 0 + + raw_data = self.dop.convert_physical_to_bytes( + physical_value=odxrequire(physical_value), + encode_state=encode_state, + bit_position=encode_state.cursor_bit_position) + encode_state.emplace_atomic_value(raw_data, self.short_name) @override def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue: diff --git a/odxtools/parameters/matchingrequestparameter.py b/odxtools/parameters/matchingrequestparameter.py index 8711c237..a696ca0f 100644 --- a/odxtools/parameters/matchingrequestparameter.py +++ b/odxtools/parameters/matchingrequestparameter.py @@ -7,7 +7,7 @@ from ..decodestate import DecodeState from ..encodestate import EncodeState -from ..exceptions import EncodeError, odxrequire +from ..exceptions import EncodeError, odxraise, odxrequire from ..odxlink import OdxDocFragment from ..odxtypes import DataType, ParameterValue from ..utils import dataclass_fields_asdict @@ -52,19 +52,31 @@ def is_settable(self) -> bool: return False @override - def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: - if not encode_state.triggering_request: - raise EncodeError(f"Parameter '{self.short_name}' is of matching request type," - " but no original request has been specified.") - return encode_state.triggering_request[self - .request_byte_position:self.request_byte_position + - self.byte_length] + def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue], + encode_state: EncodeState) -> None: + if encode_state.triggering_request is None: + odxraise( + f"Parameter '{self.short_name}' is of matching request type," + f" but no original request has been specified.", EncodeError) + return + + rq_pos = self.request_byte_position + rq_len = self.byte_length + + if len(encode_state.triggering_request) < rq_pos + rq_len: + odxraise( + f"Specified triggering request 0x{encode_state.triggering_request.hex()} " + f"is not long enough to encode matching request parameter " + f"'{self.short_name}': Have {len(encode_state.triggering_request)} " + f"bytes, need at least {rq_pos + rq_len} bytes", EncodeError) + return + + encode_state.emplace_atomic_value(encode_state.triggering_request[rq_pos:rq_pos + rq_len], + self.short_name) @override def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue: - result = decode_state.extract_atomic_value( + return decode_state.extract_atomic_value( bit_length=self.byte_length * 8, base_data_type=DataType.A_UINT32, is_highlow_byte_order=False) - - return result diff --git a/odxtools/parameters/nrcconstparameter.py b/odxtools/parameters/nrcconstparameter.py index 42ad3ec4..487c75f0 100644 --- a/odxtools/parameters/nrcconstparameter.py +++ b/odxtools/parameters/nrcconstparameter.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: MIT import warnings from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast from xml.etree import ElementTree from typing_extensions import override @@ -10,9 +10,9 @@ from ..decodestate import DecodeState from ..diagcodedtype import DiagCodedType from ..encodestate import EncodeState -from ..exceptions import DecodeError, EncodeError, odxrequire +from ..exceptions import DecodeError, EncodeError, odxraise, odxrequire from ..odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId -from ..odxtypes import AtomicOdxType, DataType +from ..odxtypes import AtomicOdxType, DataType, ParameterValue from ..utils import dataclass_fields_asdict from .parameter import Parameter, ParameterType @@ -83,7 +83,7 @@ def internal_data_type(self) -> DataType: @property @override def is_required(self) -> bool: - return False + return len(self.coded_values) > 1 @property @override @@ -91,30 +91,26 @@ def is_settable(self) -> bool: return True @override - def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: - if self.short_name in encode_state.parameter_values: - if encode_state.parameter_values[self.short_name] not in self.coded_values: - raise EncodeError(f"The parameter '{self.short_name}' must have" - f" one of the constant values {self.coded_values}") + def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue], + encode_state: EncodeState) -> None: + coded_value: ParameterValue + if physical_value is not None: + if physical_value not in self.coded_values: + odxraise( + f"The value of parameter '{self.short_name}' must " + f" be one of {self.coded_values} (is: {physical_value!r})", EncodeError) + coded_value = self.coded_values[0] else: - coded_value = encode_state.parameter_values[self.short_name] + coded_value = physical_value else: - # If the user does not select one, just select any. - # I think it does not matter ... + # If the user does not select a value, just select + # any. (This branch should only be taken if there is only + # one possible coded value because if there are more, + # specifying a parameter value is mandatory, + # cf. the `.is_required` property.) coded_value = self.coded_values[0] - tmp_state = EncodeState( - bytearray(), - encode_state.parameter_values, - triggering_request=encode_state.triggering_request, - is_end_of_pdu=False, - cursor_byte_position=0, - cursor_bit_position=0, - origin_byte_position=0) - encode_state.cursor_bit_position = self.bit_position or 0 - self.diag_coded_type.encode_into_pdu(coded_value, encode_state=tmp_state) - - return tmp_state.coded_message + self.diag_coded_type.encode_into_pdu(cast(AtomicOdxType, coded_value), encode_state) @override def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType: diff --git a/odxtools/parameters/parameter.py b/odxtools/parameters/parameter.py index 463b8789..7705802c 100644 --- a/odxtools/parameters/parameter.py +++ b/odxtools/parameters/parameter.py @@ -117,12 +117,35 @@ def is_settable(self) -> bool: """ raise NotImplementedError(".is_settable is not implemented by the concrete parameter class") - def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: - """Get the coded value of the parameter given the encode state. - Note that this method is called by `encode_into_pdu`. + @final + def encode_into_pdu(self, physical_value: Optional[ParameterValue], + encode_state: EncodeState) -> None: + """Convert a physical value into its encoded form and place it into the PDU + + Also, adapt the `encode_state` so that it points to where the next + parameter is located (if the parameter does not explicitly specify a + position) """ + + orig_cursor = encode_state.cursor_byte_position + if self.byte_position is not None: + encode_state.cursor_byte_position = encode_state.origin_byte_position + self.byte_position + + encode_state.cursor_bit_position = self.bit_position or 0 + + self._encode_positioned_into_pdu(physical_value, encode_state) + + encode_state.cursor_byte_position = max(encode_state.cursor_byte_position, orig_cursor) + encode_state.cursor_bit_position = 0 + + def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue], + encode_state: EncodeState) -> None: + """Method which actually encodes the parameter + + Its location is managed by `Parameter`.""" raise NotImplementedError( - ".get_coded_value_as_bytes() is not implemented by the concrete parameter class") + f"Required method '_encode_positioned_into_pdu()' not implemented by " + f"child class {type(self).__name__}") @final def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: @@ -144,42 +167,5 @@ def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterVal Its location is managed by `Parameter`.""" raise NotImplementedError( - "Required method '_decode_positioned_from_pdu()' not implemented by child class") - - def encode_into_pdu(self, encode_state: EncodeState) -> bytes: - """Encode the value of a parameter into a binary blob and return it - - If the byte position of the parameter is not defined, - the byte code is appended to the blob. - - Technical note for subclasses: The default implementation - tries to compute the coded value via - `self.get_coded_value_as_bytes(encoded_state)` and inserts it - into the PDU. Thus it usually suffices to overwrite - `get_coded_value_as_bytes()` instead of `encode_into_pdu()`. - - Parameters: - ---------- - encode_state: EncodeState, i.e. a named tuple with attributes - * coded_message: bytes, the message encoded so far - * parameter_values: List[ParameterValuePairs] - * triggering_coded_request: bytes - - Returns: - ------- - bytes - the message's blob after adding the encoded parameter into it - - """ - msg_blob = encode_state.coded_message - param_blob = self.get_coded_value_as_bytes(encode_state) - - if self.byte_position is not None: - byte_position = self.byte_position - else: - byte_position = len(msg_blob) - - encode_state.cursor_byte_position = byte_position - encode_state.emplace_atomic_value(param_blob, self.short_name) - - return encode_state.coded_message + f"Required method '_decode_positioned_from_pdu()' not implemented by " + f"child class {type(self).__name__}") diff --git a/odxtools/parameters/parameterwithdop.py b/odxtools/parameters/parameterwithdop.py index a222fc84..bfcdd4c8 100644 --- a/odxtools/parameters/parameterwithdop.py +++ b/odxtools/parameters/parameterwithdop.py @@ -91,12 +91,13 @@ def physical_type(self) -> Optional[PhysicalType]: return None @override - def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: - dop = odxrequire(self.dop, "Reference to DOP is not resolved") - physical_value = encode_state.parameter_values[self.short_name] - bit_position_int = self.bit_position if self.bit_position is not None else 0 - return dop.convert_physical_to_bytes( - physical_value, encode_state, bit_position=bit_position_int) + def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue], + encode_state: EncodeState) -> None: + raw_data = self.dop.convert_physical_to_bytes( + physical_value=odxrequire(physical_value), + encode_state=encode_state, + bit_position=encode_state.cursor_bit_position) + encode_state.emplace_atomic_value(raw_data, self.short_name) @override def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue: diff --git a/odxtools/parameters/physicalconstantparameter.py b/odxtools/parameters/physicalconstantparameter.py index c15191a5..1771c1cb 100644 --- a/odxtools/parameters/physicalconstantparameter.py +++ b/odxtools/parameters/physicalconstantparameter.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List +from typing import TYPE_CHECKING, Any, Dict, List, Optional from xml.etree import ElementTree from typing_extensions import override @@ -8,7 +8,7 @@ from ..dataobjectproperty import DataObjectProperty from ..decodestate import DecodeState from ..encodestate import EncodeState -from ..exceptions import DecodeError, odxraise, odxrequire +from ..exceptions import DecodeError, EncodeError, odxraise, odxrequire from ..odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId from ..odxtypes import ParameterValue from ..utils import dataclass_fields_asdict @@ -74,17 +74,19 @@ def is_settable(self) -> bool: return False @override - def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: - dop = odxrequire(self.dop, "Reference to DOP is not resolved") - if (self.short_name in encode_state.parameter_values and - encode_state.parameter_values[self.short_name] != self.physical_constant_value): - raise TypeError( - f"The parameter '{self.short_name}' is constant {self.physical_constant_value!r}" - f" and thus can not be changed.") - - bit_position_int = self.bit_position if self.bit_position is not None else 0 - return dop.convert_physical_to_bytes( - self.physical_constant_value, encode_state, bit_position=bit_position_int) + def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue], + encode_state: EncodeState) -> None: + if physical_value is not None and physical_value != self.physical_constant_value: + odxraise( + f"Value for constant parameter `{self.short_name}` name can " + f"only be specified as {self.physical_constant_value!r} (is: {physical_value!r})", + EncodeError) + + raw_data = self.dop.convert_physical_to_bytes( + physical_value=odxrequire(self.physical_constant_value), + encode_state=encode_state, + bit_position=encode_state.cursor_bit_position) + encode_state.emplace_atomic_value(raw_data, self.short_name) @override def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue: @@ -99,4 +101,5 @@ def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterVal f"{self.physical_constant_value!r} but got {phys_val!r} " f"at byte position {decode_state.cursor_byte_position} " f"in coded message {decode_state.coded_message.hex()}.", DecodeError) + return phys_val diff --git a/odxtools/parameters/reservedparameter.py b/odxtools/parameters/reservedparameter.py index c44b3482..d8c87329 100644 --- a/odxtools/parameters/reservedparameter.py +++ b/odxtools/parameters/reservedparameter.py @@ -49,8 +49,11 @@ def get_static_bit_length(self) -> Optional[int]: return self.bit_length @override - def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: - return (0).to_bytes(((self.bit_position or 0) + self.bit_length + 7) // 8, "big") + def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue], + encode_state: EncodeState) -> None: + raw_data = (0).to_bytes((encode_state.cursor_bit_position + self.bit_length + 7) // 8, + "big") + encode_state.emplace_atomic_value(raw_data, self.short_name) @override def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue: diff --git a/odxtools/parameters/systemparameter.py b/odxtools/parameters/systemparameter.py index 34f0615f..63c9cb74 100644 --- a/odxtools/parameters/systemparameter.py +++ b/odxtools/parameters/systemparameter.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import List +from typing import List, Optional from xml.etree import ElementTree from typing_extensions import override @@ -46,7 +46,8 @@ def is_settable(self) -> bool: raise NotImplementedError("SystemParameter.is_settable is not implemented yet.") @override - def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: + def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue], + encode_state: EncodeState) -> None: raise NotImplementedError("Encoding a SystemParameter is not implemented yet.") @override diff --git a/odxtools/parameters/tableentryparameter.py b/odxtools/parameters/tableentryparameter.py index 358118ef..d777cfe3 100644 --- a/odxtools/parameters/tableentryparameter.py +++ b/odxtools/parameters/tableentryparameter.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: MIT from dataclasses import dataclass -from typing import TYPE_CHECKING, List +from typing import TYPE_CHECKING, List, Optional from xml.etree import ElementTree from typing_extensions import override @@ -59,7 +59,8 @@ def is_settable(self) -> bool: raise NotImplementedError("TableEntryParameter.is_settable is not implemented yet.") @override - def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: + def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue], + encode_state: EncodeState) -> None: raise NotImplementedError("Encoding a TableEntryParameter is not implemented yet.") @property diff --git a/odxtools/parameters/tablekeyparameter.py b/odxtools/parameters/tablekeyparameter.py index e0f2e148..ffee0bf0 100644 --- a/odxtools/parameters/tablekeyparameter.py +++ b/odxtools/parameters/tablekeyparameter.py @@ -3,7 +3,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional from xml.etree import ElementTree -from typing_extensions import override +from typing_extensions import final, override from ..decodestate import DecodeState from ..encodestate import EncodeState @@ -133,46 +133,98 @@ def is_settable(self) -> bool: return True @override - def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: - tr_short_name = encode_state.parameter_values.get(self.short_name) - - if tr_short_name is None: - # the table key has not been defined explicitly yet, but - # it is most likely implicitly defined by the associated - # TABLE-STRUCT parameters. Use all-zeros as a standin for - # the real data... + @final + def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue], + encode_state: EncodeState) -> None: + # if you get this exception, you ought to use + # `.encode_placeholder_into_pdu()` followed by (after the + # value of the table key has been determined) + # `.encode_value_into_pdu()`. + raise RuntimeError("_encode_positioned_into_pdu() cannot be called for table keys.") + + def encode_placeholder_into_pdu(self, physical_value: Optional[ParameterValue], + encode_state: EncodeState) -> None: + + if physical_value is not None: key_dop = self.table.key_dop if key_dop is None: - raise EncodeError(f"Table '{self.table.short_name}' does not define " - f"a KEY-DOP, but is used in TABLE-KEY parameter " - f"'{self.short_name}'") + odxraise( + f"Table '{self.table.short_name}' does not define " + f"a KEY-DOP, but is used by TABLE-KEY parameter " + f"'{self.short_name}'", EncodeError) + return + + if not isinstance(physical_value, str): + odxraise(f"Invalid type for for table key '{self.short_name}' specified. " + f"(expect name of table row.)") + + tkv = encode_state.table_keys.get(self.short_name) + if tkv is not None and tkv != physical_value: + odxraise(f"Got conflicting values for table key {self.short_name}: " + f"{tkv} and {physical_value!r}") + + encode_state.table_keys[self.short_name] = physical_value + + orig_cursor = encode_state.cursor_byte_position + pos = encode_state.cursor_byte_position + if self.byte_position is not None: + pos = encode_state.origin_byte_position + self.byte_position + encode_state.key_pos[self.short_name] = pos + encode_state.cursor_byte_position = pos - byte_len = (odxrequire(key_dop.get_static_bit_length()) + 7) // 8 - if self.bit_position is not None and self.bit_position > 0: - byte_len += 1 + key_dop = self.table.key_dop + if key_dop is None: + odxraise(f"No KEY-DOP specified for table {self.table.short_name}") + return + + bit_pos = self.bit_position or 0 + bit_size = key_dop.get_static_bit_length() + if bit_size is None: + odxraise("The DOP of table key {self.short_name} must exhibit a fixed size.", + EncodeError) + return + + raw_data = b'\x00' * ((bit_pos + bit_size + 7) // 8) + encode_state.emplace_atomic_value(raw_data, self.short_name) - return bytes([0] * byte_len) + encode_state.cursor_byte_position = max(orig_cursor, encode_state.cursor_byte_position) + encode_state.cursor_bit_position = 0 - # the table key is known. We need to encode the associated DOP - # into the PDU. + def encode_value_into_pdu(self, encode_state: EncodeState) -> None: + + key_dop = self.table.key_dop + if key_dop is None: + odxraise( + f"Table '{self.table.short_name}' does not define " + f"a KEY-DOP, but is used by TABLE-KEY parameter " + f"'{self.short_name}'", EncodeError) + return + + if self.short_name not in encode_state.table_keys: + odxraise(f"Table key {self.short_name} has not been defined before " + f"it is required.", EncodeError) + return + else: + tr_short_name = encode_state.table_keys[self.short_name] + + # We need to encode the table key using the associated DOP into the PDU. tr_candidates = [x for x in self.table.table_rows if x.short_name == tr_short_name] if len(tr_candidates) == 0: - raise EncodeError(f"No table row with short name '{tr_short_name}' found") + odxraise(f"No table row with short name '{tr_short_name}' found", EncodeError) + return elif len(tr_candidates) > 1: - raise EncodeError(f"Multiple rows exhibiting short name '{tr_short_name}'") + odxraise(f"Multiple rows exhibiting short name '{tr_short_name}'", EncodeError) tr = tr_candidates[0] - key_dop = self.table.key_dop - if key_dop is None: - raise EncodeError(f"Table '{self.table.short_name}' does not define " - f"a KEY-DOP, but is used in TABLE-KEY parameter " - f"'{self.short_name}'") - bit_position = 0 if self.bit_position is None else self.bit_position - return key_dop.convert_physical_to_bytes(tr.key, encode_state, bit_position=bit_position) + encode_state.cursor_byte_position = encode_state.key_pos[self.short_name] + encode_state.cursor_bit_position = self.bit_position or 0 - @override - def encode_into_pdu(self, encode_state: EncodeState) -> bytes: - return super().encode_into_pdu(encode_state) + raw_key = key_dop.convert_physical_to_bytes( + physical_value=odxrequire(tr.key), + encode_state=encode_state, + bit_position=encode_state.cursor_bit_position) + + encode_state.emplace_atomic_value(raw_key, self.short_name) @override def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue: diff --git a/odxtools/parameters/tablestructparameter.py b/odxtools/parameters/tablestructparameter.py index 363ceef6..ace89e6b 100644 --- a/odxtools/parameters/tablestructparameter.py +++ b/odxtools/parameters/tablestructparameter.py @@ -101,71 +101,84 @@ def is_settable(self) -> bool: return True @override - def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: - physical_value = encode_state.parameter_values.get(self.short_name) + def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue], + encode_state: EncodeState) -> None: - if not isinstance(physical_value, tuple) or \ + if not isinstance(physical_value, (tuple, list)) or \ len(physical_value) != 2 or \ not isinstance(physical_value[0], str): - raise EncodeError(f"The physical value of TableStructParameter 'self.short_name' " - f"must be a tuple with the short name of the selected table " - f"row as the first element and the physical value for the " - f"row's structure or DOP as the second.") + odxraise( + f"The physical value of TableStructParameter 'self.short_name' " + f"must be a tuple containing the short name of the selected table " + f"row as the first element and the physical value for the " + f"row's structure or DOP as the second.", EncodeError) tr_short_name = physical_value[0] # make sure that the same table row is selected for all # TABLE-STRUCT parameters that are using the same key tk_short_name = self.table_key.short_name - tk_value = encode_state.parameter_values.get(tk_short_name) + tk_value = encode_state.table_keys.get(tk_short_name) if tk_value is None: # no value for the key has been set yet. Set it to the # value which we are using right now - encode_state.parameter_values[tk_short_name] = tr_short_name + encode_state.table_keys[tk_short_name] = tr_short_name elif tk_value != tr_short_name: - raise EncodeError(f"Cannot determine a unique value for table key '{tk_short_name}': " - f"Requested are '{tk_value}' and '{tr_short_name}'") + odxraise( + f"Cannot determine a unique value for table key '{tk_short_name}': " + f"Requested are '{tk_value}' and '{tr_short_name}'", EncodeError) + return # deal with the static case (i.e., the table row is selected # by the table key object itself) - if self.table_key.table_row is not None and \ - self.table_key.table_row.short_name != tr_short_name: - raise EncodeError(f"The selected table row for the {self.short_name} " - f"parameter must be '{self.table_key.table_row.short_name}' " - f"(is: '{tr_short_name}')") + if self.table_key.table_row is not None: + if tr_short_name is not None and self.table_key.table_row.short_name != tr_short_name: + odxraise( + f"The selected table row for the {self.short_name} " + f"parameter must be '{self.table_key.table_row.short_name}' " + f"instead of '{tr_short_name}'", EncodeError) + return + + tr_short_name = self.table_key.table_row.short_name # encode the user specified value using the structure (or DOP) # of the selected table row table = self.table_key.table candidate_trs = [tr for tr in table.table_rows if tr.short_name == tr_short_name] - if len(candidate_trs) != 1: - raise EncodeError(f"Could not uniquely resolve a table row named " - f"'{tr_short_name}' in table '{table.short_name}' ") + if len(candidate_trs) == 0: + odxraise( + f"Could not find a table row named " + f"'{tr_short_name}' in table '{table.short_name}'", EncodeError) + return + elif len(candidate_trs) > 1: + odxraise( + f"Found multiple table rows named " + f"'{tr_short_name}' in table '{table.short_name}'", EncodeError) + tr = candidate_trs[0] tr_value = physical_value[1] - bit_position = self.bit_position or 0 if tr.structure is not None: # the selected table row references a structure - inner_encode_state = EncodeState( - coded_message=bytearray(b''), - parameter_values=tr_value, - triggering_request=encode_state.triggering_request) - - return tr.structure.convert_physical_to_bytes( - tr_value, inner_encode_state, bit_position=bit_position) + raw_data = tr.structure.convert_physical_to_bytes( + physical_value=tr_value, + encode_state=encode_state, + bit_position=encode_state.cursor_bit_position) + encode_state.emplace_atomic_value(raw_data, self.short_name) + return # if the table row does not reference a structure, it must # point to a DOP! if tr.dop is None: - odxraise() - - return tr.dop.convert_physical_to_bytes( - tr_value, encode_state=encode_state, bit_position=bit_position) - - @override - def encode_into_pdu(self, encode_state: EncodeState) -> bytes: - return super().encode_into_pdu(encode_state) + odxraise(f"Neither a structure nor a DOP has been defined for table row" + f"'{tr.short_name}'") + return + + raw_data = tr.dop.convert_physical_to_bytes( + physical_value=tr_value, + encode_state=encode_state, + bit_position=encode_state.cursor_bit_position) + encode_state.emplace_atomic_value(raw_data, self.short_name) @override def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue: diff --git a/odxtools/parameters/valueparameter.py b/odxtools/parameters/valueparameter.py index 3f439e0f..c4c53ae6 100644 --- a/odxtools/parameters/valueparameter.py +++ b/odxtools/parameters/valueparameter.py @@ -7,9 +7,9 @@ from ..dataobjectproperty import DataObjectProperty from ..encodestate import EncodeState -from ..exceptions import odxraise, odxrequire +from ..exceptions import EncodeError, odxraise, odxrequire from ..odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId -from ..odxtypes import AtomicOdxType +from ..odxtypes import AtomicOdxType, ParameterValue from ..utils import dataclass_fields_asdict from .parameter import ParameterType from .parameterwithdop import ParameterWithDOP @@ -77,16 +77,18 @@ def is_settable(self) -> bool: return True @override - def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: - physical_value = encode_state.parameter_values.get(self.short_name, - self.physical_default_value) + def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue], + encode_state: EncodeState) -> None: + + if physical_value is None: + physical_value = self._physical_default_value if physical_value is None: - raise TypeError(f"A value for parameter '{self.short_name}' must be specified" - f" as the parameter does not exhibit a default.") - dop = odxrequire( - self.dop, - f"Param {self.short_name} does not have a DOP. Maybe resolving references failed?") - - bit_position_int = self.bit_position if self.bit_position is not None else 0 - return dop.convert_physical_to_bytes( - physical_value, encode_state=encode_state, bit_position=bit_position_int) + odxraise( + f"A value for parameter '{self.short_name}' must be specified" + f" because the parameter does not exhibit a default.", EncodeError) + + raw_data = self.dop.convert_physical_to_bytes( + physical_value=odxrequire(physical_value), + encode_state=encode_state, + bit_position=encode_state.cursor_bit_position) + encode_state.emplace_atomic_value(raw_data, self.short_name) diff --git a/tests/test_encoding.py b/tests/test_encoding.py index 2b414f3f..c6836a5a 100644 --- a/tests/test_encoding.py +++ b/tests/test_encoding.py @@ -173,7 +173,7 @@ def test_encode_linear(self) -> None: param1._resolve_snrefs(cast(DiagLayer, None)) # Missing mandatory parameter. - with self.assertRaises(TypeError): + with self.assertRaises(EncodeError): req.encode() self.assertEqual( @@ -223,7 +223,9 @@ def test_encode_nrc_const(self) -> None: parameters=NamedItemList([param1, param2]), byte_size=None, ) - self.assertEqual(resp.encode(), bytearray([0x12, 0x34])) + + with self.assertRaises(EncodeError): + resp.encode() # "No value for required parameter param2 specified" self.assertEqual(resp.encode(param2=0xAB), bytearray([0x12, 0xAB])) self.assertRaises(EncodeError, resp.encode, param2=0xEF) @@ -287,7 +289,7 @@ def test_encode_overlapping(self) -> None: parameters=NamedItemList([param1, param2, param3]), byte_size=None, ) - self.assertEqual(req.encode(), bytearray([0x33, 0x75, 0x56])) + self.assertEqual(req.encode(), bytearray([0x12, 0x34, 0x56])) self.assertEqual(req.get_static_bit_length(), 24) def _create_request(self, parameters: List[Parameter]) -> Request: diff --git a/tests/test_somersault.py b/tests/test_somersault.py index cdfec185..085fcb11 100644 --- a/tests/test_somersault.py +++ b/tests/test_somersault.py @@ -165,7 +165,9 @@ def test_encode_specify_unknown_param(self) -> None: with self.assertRaises(OdxError) as eo: request.encode(forward_soberness_check=0x12, num_flips=5, grass_level="what grass?") - self.assertEqual(str(eo.exception), "Value for unknown parameter 'grass_level' specified") + self.assertEqual( + str(eo.exception), + "Value for unknown parameter 'grass_level' specified for structure do_forward_flips") def test_decode_request(self) -> None: messages = odxdb.ecus.somersault_assiduous.decode(bytes([0x03, 0x45])) From e8b4612ad5f36c89d83d95fa24a5ec19e9e6f57f Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Mon, 22 Apr 2024 19:56:07 +0200 Subject: [PATCH 2/2] BasicStructure: remove unnecessary size check while encoding This is already handled by `._validate_coded_message_size()`. thanks to [at]kayoub5 for the catch! Signed-off-by: Andreas Lauser Signed-off-by: Gerrit Ecke --- odxtools/basicstructure.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/odxtools/basicstructure.py b/odxtools/basicstructure.py index fed0d9e5..d80da6e4 100644 --- a/odxtools/basicstructure.py +++ b/odxtools/basicstructure.py @@ -179,12 +179,6 @@ def convert_physical_to_internal(self, if len(encode_state.coded_message) < self.byte_size: # Padding bytes needed encode_state.coded_message = encode_state.coded_message.ljust(self.byte_size, b"\0") - elif len(encode_state.coded_message) > self.byte_size: - odxraise( - f"Encoded structure {self.short_name} is too large: " - f"{len(encode_state.coded_message)} instead of {self.byte_size} " - f"bytes", EncodeError) - return # encode the length- and table keys. This cannot be done above # because we allow these to be defined implicitly (i.e. they