Skip to content

Commit

Permalink
DynamicLengthField: implement support for en- and decoding
Browse files Browse the repository at this point in the history
this also includes minor changes to the encoding infrastructure to
bring it closer to how decoding now works and to make it more
convenient.

Signed-off-by: Andreas Lauser <[email protected]>
Signed-off-by: Gerrit Ecke <[email protected]>
  • Loading branch information
andlaus committed Feb 19, 2024
1 parent e613b32 commit f3c151a
Show file tree
Hide file tree
Showing 9 changed files with 343 additions and 51 deletions.
13 changes: 7 additions & 6 deletions odxtools/basicstructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ def get_static_bit_length(self) -> Optional[int]:

def coded_const_prefix(self, request_prefix: bytes = b'') -> bytes:
prefix = b''
encode_state = EncodeState(prefix, parameter_values={}, triggering_request=request_prefix)
encode_state = EncodeState(
bytearray(prefix), parameter_values={}, triggering_request=request_prefix)
for param in self.parameters:
if isinstance(param, (CodedConstParameter, NrcConstParameter, MatchingRequestParameter,
PhysicalConstantParameter)):
encode_state.coded_message = param.encode_into_pdu(encode_state)
encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state))
else:
break
return encode_state.coded_message
Expand Down Expand Up @@ -132,7 +133,7 @@ def convert_physical_to_internal(self,
odxraise(f"Value for unknown parameter '{param_key}' specified")

encode_state = EncodeState(
b'',
bytearray(),
dict(param_value),
triggering_request=triggering_coded_request,
is_end_of_pdu=False,
Expand All @@ -157,11 +158,11 @@ def convert_physical_to_internal(self,
# into the PDU here and add the real value of the
# parameter in a post processing step.
tmp = encode_state.parameter_values.pop(param.short_name)
encode_state.coded_message = param.encode_into_pdu(encode_state)
encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state))
encode_state.parameter_values[param.short_name] = tmp
continue

encode_state.coded_message = param.encode_into_pdu(encode_state)
encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state))

if self.byte_size is not None and len(encode_state.coded_message) < self.byte_size:
# Padding bytes needed
Expand All @@ -176,7 +177,7 @@ def convert_physical_to_internal(self,
continue

# Encode the key parameter into the message
encode_state.coded_message = param.encode_into_pdu(encode_state)
encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state))

# Assert that length is as expected
self._validate_coded_message(encode_state.coded_message)
Expand Down
3 changes: 2 additions & 1 deletion odxtools/diagservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ def encode_request(self, **params: ParameterValue) -> bytes:

missing_params = {x.short_name
for x in self.request.required_parameters}.difference(params.keys())
odxassert(not missing_params, f"The parameters {missing_params} are required but missing!")
odxassert(
len(missing_params) == 0, f"The parameters {missing_params} are required but missing!")

# make sure that no unknown parameters are specified
rq_all_param_names = {x.short_name for x in self.request.parameters}
Expand Down
68 changes: 64 additions & 4 deletions odxtools/dynamiclengthfield.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .decodestate import DecodeState
from .determinenumberofitems import DetermineNumberOfItems
from .encodestate import EncodeState
from .exceptions import odxrequire
from .exceptions import DecodeError, EncodeError, odxassert, odxraise, odxrequire
from .field import Field
from .odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId
from .odxtypes import ParameterValue
Expand Down Expand Up @@ -49,11 +49,71 @@ def _resolve_snrefs(self, diag_layer: "DiagLayer") -> None:

def convert_physical_to_bytes(
self,
physical_value: ParameterValue,
physical_values: ParameterValue,
encode_state: EncodeState,
bit_position: int = 0,
) -> bytes:
raise NotImplementedError()

odxassert(bit_position == 0, "No bit position can be specified for dynamic length fields!")
if not isinstance(physical_values, list):
odxraise(
f"Expected a list of values for dynamic length field {self.short_name}, "
f"got {type(physical_values)}", EncodeError)

det_num_items = self.determine_number_of_items
num_item = det_num_items.dop.convert_physical_to_bytes(
len(physical_values), encode_state, det_num_items.bit_position or 0)

# hack to emplace the length specifier at the correct location
tmp = encode_state.coded_message
encode_state.coded_message = bytearray()
encode_state.emplace_atomic_value(num_item, det_num_items.byte_position,
self.short_name + ".num_items")
result = encode_state.coded_message
encode_state.coded_message = tmp

# if required, add padding between the length specifier and
# the first item
if len(result) < self.offset:
result.extend([0] * (self.offset - len(result)))
elif len(result) > self.offset:
odxraise(f"The length specifier of field {self.short_name} overlaps "
f"with the first item!")

for value in physical_values:
result += self.structure.convert_physical_to_bytes(value, encode_state)

return result

def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
raise NotImplementedError()

odxassert(decode_state.cursor_bit_position == 0,
"No bit position can be specified for dynamic length fields!")

orig_origin = decode_state.origin_byte_position
orig_cursor = decode_state.cursor_byte_position

det_num_items = self.determine_number_of_items
decode_state.origin_byte_position = decode_state.cursor_byte_position
decode_state.cursor_byte_position = decode_state.origin_byte_position + det_num_items.byte_position
decode_state.cursor_bit_position = det_num_items.bit_position or 0

n = det_num_items.dop.decode_from_pdu(decode_state)

if not isinstance(n, int):
odxraise(f"Number of items specified by a dynamic length field {self.short_name} "
f"must be an integer (is: {type(n).__name__})")
elif n < 0:
odxraise(
f"Number of items specified by a dynamic length field {self.short_name} "
f"must be positive (is: {n})", DecodeError)
else:
decode_state.cursor_byte_position = decode_state.origin_byte_position + self.offset
result: List[ParameterValue] = []
for _ in range(n):
result.append(self.structure.decode_from_pdu(decode_state))

decode_state.origin_byte_position = orig_origin
decode_state.cursor_byte_position = max(orig_cursor, decode_state.cursor_byte_position)

return result
29 changes: 27 additions & 2 deletions odxtools/encodestate.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# SPDX-License-Identifier: MIT
import warnings
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Dict, Optional

from .exceptions import OdxWarning

if TYPE_CHECKING:
from .tablerow import TableRow

Expand All @@ -11,8 +14,8 @@ class EncodeState:
"""Utility class to holding the state variables needed for encoding a message.
"""

#: payload that is constructed so far
coded_message: bytes
#: payload that has been constructed so far
coded_message: bytearray

#: a mapping from short name to value for each parameter
parameter_values: Dict[str, Any]
Expand All @@ -31,3 +34,25 @@ class EncodeState:
#: Flag whether we are currently the last parameter of the PDU
#: (needed for MinMaxLengthType)
is_end_of_pdu: bool = False

def emplace_atomic_value(self,
new_data: bytes,
pos: Optional[int] = None,
param_name: str = "unknown") -> None:
if pos is None:
pos = len(self.coded_message)

# Make blob longer if necessary
min_length = pos + len(new_data)
if len(self.coded_message) < min_length:
self.coded_message.extend([0] * (min_length - len(self.coded_message)))

for byte_idx_val, byte_idx_rpc in enumerate(range(pos, pos + len(new_data))):
# insert byte value
if self.coded_message[byte_idx_rpc] & new_data[byte_idx_val] != 0:
warnings.warn(
f"Parameter '{param_name}' overlaps with another parameter (bytes are already set)",
OdxWarning,
stacklevel=1,
)
self.coded_message[byte_idx_rpc] |= new_data[byte_idx_val]
3 changes: 2 additions & 1 deletion odxtools/endofpdufield.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ def convert_physical_to_bytes(
encode_state: EncodeState,
bit_position: int = 0,
) -> bytes:

odxassert(
bit_position == 0, "End of PDU field must be byte aligned. "
"Is there an error in reading the .odx?", EncodeError)
if not isinstance(physical_values, list):
odxraise(
f"Expected a list of values for structure {self.short_name}, "
f"Expected a list of values for end-of-pdu field {self.short_name}, "
f"got {type(physical_values)}", EncodeError)

coded_message = b''
Expand Down
27 changes: 2 additions & 25 deletions odxtools/parameters/parameter.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
# SPDX-License-Identifier: MIT
import abc
import warnings
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional

from ..decodestate import DecodeState
from ..element import NamedElement
from ..encodestate import EncodeState
from ..exceptions import OdxWarning
from ..odxlink import OdxLinkDatabase, OdxLinkId
from ..odxtypes import ParameterValue
from ..specialdatagroup import SpecialDataGroup
Expand Down Expand Up @@ -155,27 +153,6 @@ def encode_into_pdu(self, encode_state: EncodeState) -> bytes:
else:
byte_position = len(msg_blob)

return self._encode_into_blob(msg_blob, param_blob, byte_position)
encode_state.emplace_atomic_value(param_blob, byte_position, self.short_name)

def _encode_into_blob(self, blob: bytes, new_data: bytes, pos: Optional[int] = None) -> bytes:
if pos is None:
pos = len(blob)

# Make blob longer if necessary
min_length = pos + len(new_data)

result_blob = bytearray(blob)
if len(blob) < min_length:
result_blob.extend([0] * (min_length - len(blob)))

for byte_idx_val, byte_idx_rpc in enumerate(range(pos, pos + len(new_data))):
# insert byte value
if result_blob[byte_idx_rpc] & new_data[byte_idx_val] != 0:
warnings.warn(
f"Parameter {self.short_name} overlaps with another parameter (bytes are already set)",
OdxWarning,
stacklevel=1,
)
result_blob[byte_idx_rpc] |= new_data[byte_idx_val]

return result_blob
return encode_state.coded_message
2 changes: 1 addition & 1 deletion odxtools/parameters/tablestructparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
if tr.structure is not None:
# the selected table row references a structure
inner_encode_state = EncodeState(
coded_message=b'',
coded_message=bytearray(b''),
parameter_values=tr_value,
triggering_request=encode_state.triggering_request)

Expand Down
Loading

0 comments on commit f3c151a

Please sign in to comment.