Skip to content

Commit

Permalink
Merge pull request #295 from andlaus/refactor_encoding5
Browse files Browse the repository at this point in the history
Refactor encoding, part 5 (the finale)
  • Loading branch information
andlaus authored Apr 24, 2024
2 parents 6037c72 + 8296f91 commit dcc46f9
Show file tree
Hide file tree
Showing 16 changed files with 44 additions and 73 deletions.
25 changes: 2 additions & 23 deletions odxtools/basicstructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ def get_static_bit_length(self) -> Optional[int]:
return byte_length * 8

def coded_const_prefix(self, request_prefix: bytes = b'') -> bytes:
encode_state = EncodeState(
coded_message=bytearray(), parameter_values={}, triggering_request=request_prefix)
encode_state = EncodeState(coded_message=bytearray(), triggering_request=request_prefix)

for param in self.parameters:
if (isinstance(param, MatchingRequestParameter) and param.request_byte_position < len(request_prefix)) or \
Expand Down Expand Up @@ -209,7 +208,7 @@ def encode_into_pdu(self, physical_value: Optional[ParameterValue],
# position directly after the structure and let
# EncodeState add the padding as needed.
encode_state.cursor_byte_position = encode_state.origin_byte_position + self.byte_size
encode_state.emplace_atomic_value(b'', "<PADDING>")
encode_state.emplace_bytes(b'', "<PADDING>")

# encode the length- and table keys. This cannot be done above
# because we allow these to be defined implicitly (i.e. they
Expand Down Expand Up @@ -247,26 +246,6 @@ def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:

return result

def encode(self, coded_request: Optional[bytes] = None, **kwargs: ParameterValue) -> bytes:
"""
Composes an UDS message as bytes for this service.
Parameters:
----------
coded_request: bytes
coded request (only needed when encoding a response)
kwargs: dict
Parameters of the RPC as mapping from SHORT-NAME of the parameter to the value
"""
encode_state = EncodeState(
coded_message=bytearray(),
parameter_values=kwargs,
triggering_request=coded_request,
is_end_of_pdu=True)

self.encode_into_pdu(physical_value=kwargs, encode_state=encode_state)

return encode_state.coded_message

def decode(self, message: bytes) -> ParameterValueDict:
decode_state = DecodeState(coded_message=message)
param_values = self.decode_from_pdu(decode_state)
Expand Down
2 changes: 1 addition & 1 deletion odxtools/diagservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def encode_request(self, **kwargs: ParameterValue) -> bytes:
set(kwargs.keys()).issubset(rq_all_param_names),
f"Unknown parameters specified for encoding: {kwargs.keys()}, "
f"known parameters are: {rq_all_param_names}")
return self.request.encode(coded_request=None, **kwargs)
return self.request.encode(**kwargs)

def encode_positive_response(self,
coded_request: bytes,
Expand Down
2 changes: 1 addition & 1 deletion odxtools/dynamiclengthfield.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def encode_into_pdu(self, physical_value: ParameterValue, encode_state: EncodeSt

# ensure the correct message size if the field is empty
if len(physical_value) == 0:
encode_state.emplace_atomic_value(b"", "<padding>")
encode_state.emplace_bytes(b"", "<padding>")

# move cursor and origin positions
encode_state.origin_byte_position = orig_origin
Expand Down
8 changes: 3 additions & 5 deletions odxtools/encodestate.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: MIT
import warnings
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
from typing import Dict, Optional

from .exceptions import OdxWarning

Expand All @@ -14,9 +14,6 @@ class EncodeState:
#: payload that has been constructed so far
coded_message: bytearray

#: a mapping from short name to value for each parameter
parameter_values: Dict[str, Any]

#: The absolute position in bytes from the beginning of the PDU to
#: which relative positions refer to, e.g., the beginning of the
#: structure.
Expand Down Expand Up @@ -50,7 +47,7 @@ class EncodeState:
#: (needed for MinMaxLengthType, EndOfPduField, etc.)
is_end_of_pdu: bool = True

def emplace_atomic_value(self, new_data: bytes, param_name: str) -> None:
def emplace_bytes(self, new_data: bytes, param_name: Optional[str] = None) -> None:
pos = self.cursor_byte_position

# Make blob longer if necessary
Expand All @@ -63,6 +60,7 @@ def emplace_atomic_value(self, new_data: bytes, param_name: str) -> None:
# the value to be inserted is bitwise "disjoint" from the
# value which is already in the PDU...
if self.coded_message[pos + i] & new_data[i] != 0:
param_name = "<UNKNOWN>" if param_name is None else param_name
warnings.warn(
f"Object '{param_name}' overlaps with another parameter (bits are already set)",
OdxWarning,
Expand Down
2 changes: 1 addition & 1 deletion odxtools/leadinglengthinfotype.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeSta
is_highlow_byte_order=self.is_highlow_byte_order,
)

encode_state.emplace_atomic_value(length_bytes + value_bytes, "<LEADING-LENGTH-INFO-TYPE>")
encode_state.emplace_bytes(length_bytes + value_bytes, "<LEADING-LENGTH-INFO-TYPE>")

@override
def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
Expand Down
2 changes: 1 addition & 1 deletion odxtools/minmaxlengthtype.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeSta
f"(Is: {len(value_bytes)} bytes.)", EncodeError)
return

encode_state.emplace_atomic_value(value_bytes, "<MIN-MAX-LENGTH-TYPE>")
encode_state.emplace_bytes(value_bytes, "<MIN-MAX-LENGTH-TYPE>")

def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
odxassert(decode_state.cursor_bit_position == 0,
Expand Down
4 changes: 2 additions & 2 deletions odxtools/parameters/matchingrequestparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue],
f"bytes, need at least {rq_pos + rq_len} bytes", EncodeError)
return

encode_state.emplace_atomic_value(encode_state.triggering_request[rq_pos:rq_pos + rq_len],
self.short_name)
encode_state.emplace_bytes(encode_state.triggering_request[rq_pos:rq_pos + rq_len],
self.short_name)

@override
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
Expand Down
2 changes: 1 addition & 1 deletion odxtools/parameters/reservedparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue],
encode_state: EncodeState) -> None:
raw_data = (0).to_bytes((encode_state.cursor_bit_position + self.bit_length + 7) // 8,
"big")
encode_state.emplace_atomic_value(raw_data, self.short_name)
encode_state.emplace_bytes(raw_data, self.short_name)

@override
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
Expand Down
2 changes: 1 addition & 1 deletion odxtools/parameters/tablekeyparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def encode_placeholder_into_pdu(self, physical_value: Optional[ParameterValue],
EncodeError)
return

encode_state.emplace_atomic_value(bytes([0] * (size // 8)), self.short_name)
encode_state.emplace_bytes(bytes([0] * (size // 8)), self.short_name)

encode_state.cursor_byte_position = max(orig_pos, encode_state.cursor_byte_position)
encode_state.cursor_bit_position = 0
Expand Down
2 changes: 1 addition & 1 deletion odxtools/paramlengthinfotype.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeSta
is_highlow_byte_order=self.is_highlow_byte_order,
)

encode_state.emplace_atomic_value(raw_data, "<PARAM-LENGTH-INFO-TYPE>")
encode_state.emplace_bytes(raw_data, "<PARAM-LENGTH-INFO-TYPE>")

def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
# First, we need to find a length key with matching ID.
Expand Down
10 changes: 10 additions & 0 deletions odxtools/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from xml.etree import ElementTree

from .basicstructure import BasicStructure
from .encodestate import EncodeState
from .odxlink import OdxDocFragment
from .odxtypes import ParameterValue
from .utils import dataclass_fields_asdict


Expand All @@ -20,3 +22,11 @@ def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) ->
kwargs = dataclass_fields_asdict(BasicStructure.from_et(et_element, doc_frags))

return Request(**kwargs)

def encode(self, **kwargs: ParameterValue) -> bytes:
encode_state = EncodeState(
coded_message=bytearray(), triggering_request=None, is_end_of_pdu=True)

self.encode_into_pdu(physical_value=kwargs, encode_state=encode_state)

return encode_state.coded_message
19 changes: 6 additions & 13 deletions odxtools/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
from xml.etree import ElementTree

from .basicstructure import BasicStructure
from .encodestate import EncodeState
from .exceptions import odxraise
from .odxlink import OdxDocFragment
from .odxtypes import ParameterValue
from .parameters.matchingrequestparameter import MatchingRequestParameter
from .utils import dataclass_fields_asdict


Expand Down Expand Up @@ -38,17 +38,10 @@ def from_et(et_element: ElementTree.Element, doc_frags: List[OdxDocFragment]) ->

return Response(response_type=response_type, **kwargs)

def encode(self, coded_request: Optional[bytes] = None, **params: ParameterValue) -> bytes:
if coded_request is not None:
# Extract MATCHING-REQUEST-PARAMs from the coded
# request. TODO: this should be done by
# MatchingRequestParam itself!
for param in self.parameters:
if isinstance(param, MatchingRequestParameter):
byte_pos = param.request_byte_position
byte_length = param.byte_length
def encode(self, coded_request: Optional[bytes] = None, **kwargs: ParameterValue) -> bytes:
encode_state = EncodeState(
coded_message=bytearray(), triggering_request=coded_request, is_end_of_pdu=True)

val = coded_request[byte_pos:byte_pos + byte_length]
params[param.short_name] = val
self.encode_into_pdu(physical_value=kwargs, encode_state=encode_state)

return super().encode(coded_request=coded_request, **params)
return encode_state.coded_message
2 changes: 1 addition & 1 deletion odxtools/standardlengthtype.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def encode_into_pdu(self, internal_value: AtomicOdxType, encode_state: EncodeSta
base_data_type=self.base_data_type,
is_highlow_byte_order=self.is_highlow_byte_order,
)
encode_state.emplace_atomic_value(raw_data, "<STANDARD-LENGTH-TYPE>")
encode_state.emplace_bytes(raw_data, "<STANDARD-LENGTH-TYPE>")

@override
def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
Expand Down
2 changes: 1 addition & 1 deletion odxtools/staticfield.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def encode_into_pdu(self, physical_value: ParameterValue, encode_state: EncodeSt
encode_state.cursor_byte_position = pos_before + self.item_byte_size
elif pos_after - pos_before < self.item_byte_size:
# add some padding bytes
encode_state.emplace_atomic_value(
encode_state.emplace_bytes(
b'\x00' * (self.item_byte_size - (pos_after - pos_before)), "<PADDING>")

@override
Expand Down
2 changes: 1 addition & 1 deletion tests/test_decoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -2042,7 +2042,7 @@ def test_physical_constant_parameter(self) -> None:
actual_param_dict = request.decode(expected_coded_message)
self.assertEqual(dict(actual_param_dict), expected_param_dict)

actual_coded_message = request.encode(None, **expected_param_dict)
actual_coded_message = request.encode(**expected_param_dict)
self.assertEqual(actual_coded_message, expected_coded_message)

self.assertRaises(DecodeError, request.decode, bytes([0x12, 0x34]))
Expand Down
31 changes: 11 additions & 20 deletions tests/test_diag_coded_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def test_encode_leading_length_info_type_bytefield(self) -> None:
base_type_encoding=None,
is_highlow_byte_order_raw=None,
)
state = EncodeState(bytearray(), parameter_values={}, cursor_bit_position=3)
state = EncodeState(bytearray(), cursor_bit_position=3)
dct.encode_into_pdu(bytes([0x3]), state)
self.assertEqual(state.coded_message.hex(), "0803")

Expand All @@ -88,8 +88,7 @@ def test_decode_leading_length_info_type_bytefield2(self) -> None:
is_highlow_byte_order_raw=None,
)

state = EncodeState(
bytearray.fromhex("0000ff00"), parameter_values={}, cursor_bit_position=3)
state = EncodeState(bytearray.fromhex("0000ff00"), cursor_bit_position=3)
dct.encode_into_pdu(bytes([0xcc]), state)
self.assertEqual(state.coded_message.hex(), "08ccff00")
self.assertEqual(state.cursor_byte_position, 2)
Expand Down Expand Up @@ -133,7 +132,7 @@ def test_encode_leading_length_info_type_unicode2string(self) -> None:
base_type_encoding=None,
is_highlow_byte_order_raw=None,
)
state = EncodeState(coded_message=bytearray(), parameter_values={})
state = EncodeState(coded_message=bytearray())
dct.encode_into_pdu("a9", state)
self.assertEqual(state.coded_message, bytes([0x4, 0x00, 0x61, 0x00, 0x39]))

Expand All @@ -145,7 +144,6 @@ def test_encode_leading_length_info_type_unicode2string(self) -> None:
)
state = EncodeState(
coded_message=bytearray(),
parameter_values={},
cursor_byte_position=0,
cursor_bit_position=0,
origin_byte_position=0)
Expand Down Expand Up @@ -412,8 +410,7 @@ def test_encode_param_info_length_type_uint(self) -> None:
odxlinks = OdxLinkDatabase()
odxlinks.update({length_key_id: length_key})
dct._resolve_odxlinks(odxlinks)
state = EncodeState(
coded_message=bytearray([0xcc]), parameter_values={}, cursor_byte_position=2)
state = EncodeState(coded_message=bytearray([0xcc]), cursor_byte_position=2)
dct.encode_into_pdu(0x12345, state)
self.assertEqual(state.coded_message.hex(), "cc00012345")
self.assertEqual(state.length_keys.get("length_key"), 24)
Expand Down Expand Up @@ -690,8 +687,7 @@ def test_encode_min_max_length_type_hex_ff(self) -> None:
termination="HEX-FF",
is_highlow_byte_order_raw=None,
)
state = EncodeState(
coded_message=bytearray([0x00]), parameter_values={}, is_end_of_pdu=False)
state = EncodeState(coded_message=bytearray([0x00]), is_end_of_pdu=False)
dct.encode_into_pdu(bytes([0x34, 0x56]), state)
self.assertEqual(state.coded_message, bytes([0x34, 0x56, 0xFF]))

Expand All @@ -704,8 +700,7 @@ def test_encode_min_max_length_type_zero(self) -> None:
termination="ZERO",
is_highlow_byte_order_raw=None,
)
state = EncodeState(
coded_message=bytearray([0x00]), parameter_values={}, is_end_of_pdu=False)
state = EncodeState(coded_message=bytearray([0x00]), is_end_of_pdu=False)
dct.encode_into_pdu("Hi", state)
self.assertEqual(state.coded_message, bytes([0x48, 0x69, 0x0]))

Expand All @@ -720,18 +715,16 @@ def test_encode_min_max_length_type_end_of_pdu(self) -> None:
termination=termination,
is_highlow_byte_order_raw=None,
)
state = EncodeState(coded_message=bytearray(), parameter_values={}, is_end_of_pdu=True)
state = EncodeState(coded_message=bytearray(), is_end_of_pdu=True)
dct.encode_into_pdu(bytes([0x34, 0x56, 0x78, 0x9A]), state)
self.assertEqual(state.coded_message.hex(), "3456789a")

if termination == "END-OF-PDU":
state = EncodeState(
coded_message=bytearray(), parameter_values={}, is_end_of_pdu=False)
state = EncodeState(coded_message=bytearray(), is_end_of_pdu=False)
self.assertRaises(OdxError, dct.encode_into_pdu, bytes([0x34, 0x56, 0x78, 0x9A]),
state)
else:
state = EncodeState(
coded_message=bytearray(), parameter_values={}, is_end_of_pdu=False)
state = EncodeState(coded_message=bytearray(), is_end_of_pdu=False)
dct.encode_into_pdu(bytes([0x34, 0x56, 0x78, 0x9A]), state)
self.assertTrue(state.coded_message.hex().startswith("3456789a"))

Expand All @@ -746,8 +739,7 @@ def test_encode_min_max_length_type_min_length(self) -> None:
termination=termination,
is_highlow_byte_order_raw=None,
)
state = EncodeState(
coded_message=bytearray([0x00]), parameter_values={}, is_end_of_pdu=True)
state = EncodeState(coded_message=bytearray([0x00]), is_end_of_pdu=True)
dct.encode_into_pdu(bytes([0x34, 0x56]), state)
self.assertTrue(state.coded_message.hex().startswith("3456"))
self.assertRaises(
Expand All @@ -768,8 +760,7 @@ def test_encode_min_max_length_type_max_length(self) -> None:
termination=termination,
is_highlow_byte_order_raw=None,
)
state = EncodeState(
coded_message=bytearray([0x00]), parameter_values={}, is_end_of_pdu=True)
state = EncodeState(coded_message=bytearray([0x00]), is_end_of_pdu=True)
dct.encode_into_pdu(bytes([0x34, 0x56, 0x78]), state)
self.assertEqual(state.coded_message, bytes([0x34, 0x56, 0x78]))
self.assertRaises(
Expand Down

0 comments on commit dcc46f9

Please sign in to comment.