Skip to content

Commit

Permalink
Merge pull request #293 from andlaus/refactor_encoding3
Browse files Browse the repository at this point in the history
Refactor encoding, part 3 (parameters)
  • Loading branch information
andlaus authored Apr 23, 2024
2 parents 0f12e21 + e8b4612 commit 034aa41
Show file tree
Hide file tree
Showing 18 changed files with 430 additions and 286 deletions.
108 changes: 61 additions & 47 deletions odxtools/basicstructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ def get_static_bit_length(self) -> Optional[int]:
return ((length + 7) // 8) * 8

def coded_const_prefix(self, request_prefix: bytes = b'') -> bytes:
prefix = b''
encode_state = EncodeState(
bytearray(prefix), parameter_values={}, triggering_request=request_prefix)
coded_message=bytearray(), parameter_values={}, triggering_request=request_prefix)

for param in self.parameters:
if isinstance(param, (CodedConstParameter, NrcConstParameter, MatchingRequestParameter,
PhysicalConstantParameter)):
encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state))
if (isinstance(param, MatchingRequestParameter) and param.request_byte_position < len(request_prefix)) or \
isinstance(param, (CodedConstParameter, NrcConstParameter, PhysicalConstantParameter)):
param.encode_into_pdu(physical_value=None, encode_state=encode_state)
else:
break
return encode_state.coded_message
Expand Down Expand Up @@ -122,52 +122,63 @@ def convert_physical_to_internal(self,
triggering_coded_request: Optional[bytes],
is_end_of_pdu: bool = True) -> bytes:

encode_state = EncodeState(
bytearray(),
parameter_values=cast(Dict[str, ParameterValue], param_value),
triggering_request=triggering_coded_request,
is_end_of_pdu=False)

if not isinstance(param_value, dict):
raise EncodeError(
odxraise(
f"Expected a dictionary for the values of structure {self.short_name}, "
f"got {type(param_value)}")
f"got {type(param_value).__name__}", EncodeError)
elif encode_state.cursor_bit_position != 0:
odxraise(
f"Structures must be byte aligned, but "
f"{self.short_name} requested to be at bit position "
f"{encode_state.cursor_bit_position}", EncodeError)
encode_state.bit_position = 0

# in strict mode, ensure that no values for unknown parameters are specified.
if strict_mode:
param_names = [param.short_name for param in self.parameters]
for param_key in param_value:
if param_key not in param_names:
odxraise(f"Value for unknown parameter '{param_key}' specified")

encode_state = EncodeState(
bytearray(),
dict(param_value),
triggering_request=triggering_coded_request,
is_end_of_pdu=False)
param_names = {param.short_name for param in self.parameters}
for param_value_name in param_value:
if param_value_name not in param_names:
odxraise(f"Value for unknown parameter '{param_value_name}' specified "
f"for structure {self.short_name}")

for param in self.parameters:
if param == self.parameters[-1]:
# The last parameter is at the end of the PDU if the
# structure itself is at the end of the PDU. TODO:
# This assumes that the last parameter specified in
# the ODX is located last in the PDU...
if id(param) == id(self.parameters[-1]):
# The last parameter of the structure is at the end of
# the PDU if the structure itself is at the end of the
# PDU. TODO: This assumes that the last parameter
# specified in the ODX is located last in the PDU...
encode_state.is_end_of_pdu = is_end_of_pdu

if isinstance(
param,
(LengthKeyParameter, TableKeyParameter)) and param.short_name in param_value:
# This is a hack to always encode a dummy value for
# length- and table keys. since these can be specified
if isinstance(param, (LengthKeyParameter, TableKeyParameter)):
# At this point, we encode a placeholder value for length-
# and table keys, since these can be specified
# implicitly (i.e., by means of parameters that use
# these keys), to avoid getting an "overlapping
# these keys). To avoid getting an "overlapping
# parameter" warning, we must encode a value of zero
# into the PDU here and add the real value of the
# parameter in a post processing step.
tmp = encode_state.parameter_values.pop(param.short_name)
encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state))
encode_state.parameter_values[param.short_name] = tmp
# parameter in a post-processing step.
param.encode_placeholder_into_pdu(
physical_value=param_value.get(param.short_name), encode_state=encode_state)

continue

encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state))
if param.is_required and param.short_name not in param_value:
odxraise(f"No value for required parameter {param.short_name} specified",
EncodeError)

if self.byte_size is not None and len(encode_state.coded_message) < self.byte_size:
# Padding bytes needed
encode_state.coded_message = encode_state.coded_message.ljust(self.byte_size, b"\0")
param.encode_into_pdu(
physical_value=param_value.get(param.short_name), encode_state=encode_state)

if self.byte_size is not None:
if len(encode_state.coded_message) < self.byte_size:
# Padding bytes needed
encode_state.coded_message = encode_state.coded_message.ljust(self.byte_size, b"\0")

# encode the length- and table keys. This cannot be done above
# because we allow these to be defined implicitly (i.e. they
Expand All @@ -177,22 +188,25 @@ def convert_physical_to_internal(self,
# the current parameter is neither a length- nor a table key
continue

# Encode the key parameter into the message
encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state))
# Encode the value of the key parameter into the message
param.encode_value_into_pdu(encode_state=encode_state)

# Assert that length is as expected
self._validate_coded_message(encode_state.coded_message)
self._validate_coded_message_size(encode_state.cursor_byte_position -
encode_state.origin_byte_position)

return bytearray(encode_state.coded_message)
return encode_state.coded_message

def _validate_coded_message(self, coded_message: bytes) -> None:
def _validate_coded_message_size(self, coded_byte_len: int) -> None:

if self.byte_size is not None:
# We definitely broke something if we didn't respect the explicit byte_size
odxassert(
len(coded_message) == self.byte_size,
"Verification of coded message {coded_message.hex()} failed: Incorrect size.")
# No need to check further
if self.byte_size != coded_byte_len:
warnings.warn(
"Verification of coded message failed: Incorrect size.",
OdxWarning,
stacklevel=1)

return

bit_length = self.get_static_bit_length()
Expand All @@ -201,12 +215,12 @@ def _validate_coded_message(self, coded_message: bytes) -> None:
# Nothing to check
return

if len(coded_message) * 8 != bit_length:
if coded_byte_len * 8 != bit_length:
# We may have broke something
# but it could be that bit_length was mis calculated and not the actual bytes are wrong
# Could happen with overlapping parameters and parameters with gaps
warnings.warn(
f"Verification of coded message '{coded_message.hex()}' possibly failed: Size may be incorrect.",
"Verification of coded message possibly failed: Size may be incorrect.",
OdxWarning,
stacklevel=1)

Expand Down
36 changes: 22 additions & 14 deletions odxtools/dynamiclengthfield.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,30 +60,38 @@ def convert_physical_to_bytes(
f"Expected a list of values for dynamic length field {self.short_name}, "
f"got {type(physical_value)}", EncodeError)

tmp_state = EncodeState(
coded_message=bytearray(),
parameter_values={},
triggering_request=encode_state.triggering_request,
origin_byte_position=0,
cursor_byte_position=0,
cursor_bit_position=0)

det_num_items = self.determine_number_of_items
field_len = det_num_items.dop.convert_physical_to_bytes(
len(physical_value), encode_state, det_num_items.bit_position or 0)
field_len_bytes = det_num_items.dop.convert_physical_to_bytes(
len(physical_value), tmp_state, det_num_items.bit_position or 0)

# hack to emplace the length specifier at the correct location
tmp = encode_state.coded_message
encode_state.coded_message = bytearray()
encode_state.cursor_byte_position = det_num_items.byte_position
encode_state.emplace_atomic_value(field_len, self.short_name + ".num_items")
result = encode_state.coded_message
encode_state.coded_message = tmp
tmp_state.cursor_byte_position = det_num_items.byte_position
tmp_state.cursor_bit_position = det_num_items.bit_position or 0
tmp_state.emplace_atomic_value(field_len_bytes, self.short_name + ".num_items")

# if required, add padding between the length specifier and
# the first item
if len(result) < self.offset:
result.extend([0] * (self.offset - len(result)))
elif len(result) > self.offset:
if len(tmp_state.coded_message) < self.offset:
tmp_state.coded_message += b'\00' * (self.offset - len(tmp_state.coded_message))
tmp_state.cursor_byte_position = self.offset
elif len(field_len_bytes) > self.offset:
odxraise(f"The length specifier of field {self.short_name} overlaps "
f"with the first item!")
tmp_state.cursor_byte_position = self.offset

for value in physical_value:
result += self.structure.convert_physical_to_bytes(value, encode_state)
for i, value in enumerate(physical_value):
tmp_bytes = self.structure.convert_physical_to_bytes(value, tmp_state)
tmp_state.emplace_atomic_value(tmp_bytes, f"{self.short_name}{i}")

return result
return tmp_state.coded_message

def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:

Expand Down
35 changes: 14 additions & 21 deletions odxtools/parameters/codedconstparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
from ..decodestate import DecodeState
from ..diagcodedtype import DiagCodedType
from ..encodestate import EncodeState
from ..exceptions import DecodeError, odxrequire
from ..exceptions import DecodeError, EncodeError, odxraise, odxrequire
from ..odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId
from ..odxtypes import AtomicOdxType, DataType
from ..odxtypes import AtomicOdxType, DataType, ParameterValue
from ..utils import dataclass_fields_asdict
from .parameter import Parameter, ParameterType

Expand Down Expand Up @@ -81,30 +81,23 @@ def is_settable(self) -> bool:
return False

@override
def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
if (self.short_name in encode_state.parameter_values and
encode_state.parameter_values[self.short_name] != self.coded_value):
raise TypeError(f"The parameter '{self.short_name}' is constant {self._coded_value_str}"
" and thus can not be changed.")

tmp_state = EncodeState(
bytearray(),
encode_state.parameter_values,
triggering_request=encode_state.triggering_request,
is_end_of_pdu=False,
cursor_byte_position=0,
cursor_bit_position=0,
origin_byte_position=0)
encode_state.cursor_bit_position = self.bit_position or 0
self.diag_coded_type.encode_into_pdu(self.coded_value, encode_state=tmp_state)

return tmp_state.coded_message
def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue],
encode_state: EncodeState) -> None:
if physical_value is not None and physical_value != self.coded_value:
odxraise(
f"Value for constant parameter `{self.short_name}` name can "
f"only be specified as {self.coded_value!r} (is: {physical_value!r})", EncodeError)

internal_value = self.coded_value

self.diag_coded_type.encode_into_pdu(
internal_value=internal_value, encode_state=encode_state)

@override
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
coded_val = self.diag_coded_type.decode_from_pdu(decode_state)

# Check if the coded value in the message is correct.
# Check if the coded value contained by the message is correct.
if self.coded_value != coded_val:
warnings.warn(
f"Coded constant parameter does not match! "
Expand Down
9 changes: 5 additions & 4 deletions odxtools/parameters/dynamicparameter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import List
from typing import List, Optional
from xml.etree import ElementTree

from typing_extensions import override
Expand Down Expand Up @@ -41,9 +41,10 @@ def is_settable(self) -> bool:
raise NotImplementedError(".is_settable for a DynamicParameter")

@override
def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
raise NotImplementedError("Encoding a DynamicParameter is not implemented yet.")
def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue],
encode_state: EncodeState) -> None:
raise NotImplementedError("Encoding DynamicParameter is not implemented yet.")

@override
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
raise NotImplementedError("Decoding a DynamicParameter is not implemented yet.")
raise NotImplementedError("Decoding DynamicParameter is not implemented yet.")
82 changes: 68 additions & 14 deletions odxtools/parameters/lengthkeyparameter.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from xml.etree import ElementTree

from typing_extensions import override
from typing_extensions import final, override

from ..decodestate import DecodeState
from ..encodestate import EncodeState
from ..exceptions import odxraise, odxrequire
from ..exceptions import EncodeError, odxraise, odxrequire
from ..odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId
from ..odxtypes import ParameterValue
from ..utils import dataclass_fields_asdict
Expand Down Expand Up @@ -77,19 +77,73 @@ def is_settable(self) -> bool:
return True

@override
def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
physical_value = encode_state.length_keys.get(self.short_name)
if physical_value is None:
physical_value = encode_state.parameter_values.get(self.short_name, 0)
@final
def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue],
encode_state: EncodeState) -> None:
# if you get this exception, you ought to use
# `.encode_placeholder_into_pdu()` followed by (after the
# value of the length key has been determined)
# `.encode_value_into_pdu()`.
raise RuntimeError("_encode_positioned_into_pdu() cannot be called for length keys.")

def encode_placeholder_into_pdu(self, physical_value: Optional[ParameterValue],
encode_state: EncodeState) -> None:

if physical_value is not None:
if not self.dop.is_valid_physical_value(physical_value):
odxraise(f"Invalid explicitly specified physical value '{physical_value!r}' "
f"for length key '{self.short_name}'.")

lkv = encode_state.length_keys.get(self.short_name)
if lkv is not None and lkv != physical_value:
odxraise(f"Got conflicting values for length key {self.short_name}: "
f"{lkv} and {physical_value!r}")

if not isinstance(physical_value, int):
odxraise(
f"Value of length key {self.short_name} is of type {type(physical_value).__name__} "
f"instead of int")

encode_state.length_keys[self.short_name] = physical_value

orig_cursor = encode_state.cursor_byte_position
pos = encode_state.cursor_byte_position
if self.byte_position is not None:
pos = encode_state.origin_byte_position + self.byte_position
encode_state.key_pos[self.short_name] = pos
encode_state.cursor_byte_position = pos

bit_pos = self.bit_position or 0
dop = odxrequire(super().dop,
f"A DOP is required for length key parameter {self.short_name}")
return dop.convert_physical_to_bytes(physical_value, encode_state, bit_position=bit_pos)

@override
def encode_into_pdu(self, encode_state: EncodeState) -> bytes:
return super().encode_into_pdu(encode_state)
bit_size = self.dop.get_static_bit_length()
if bit_size is None:
odxraise("The DOP of length key {self.short_name} must exhibit a fixed size.",
EncodeError)
return

raw_data = b'\x00' * ((bit_pos + bit_size + 7) // 8)
encode_state.emplace_atomic_value(raw_data, self.short_name)

encode_state.cursor_byte_position = max(encode_state.cursor_byte_position, orig_cursor)
encode_state.cursor_bit_position = 0

def encode_value_into_pdu(self, encode_state: EncodeState) -> None:

if self.short_name not in encode_state.length_keys:
odxraise(
f"Length key {self.short_name} has not been defined before "
f"it is required.", EncodeError)
return
else:
physical_value = encode_state.length_keys[self.short_name]

encode_state.cursor_byte_position = encode_state.key_pos[self.short_name]
encode_state.cursor_bit_position = self.bit_position or 0

raw_data = self.dop.convert_physical_to_bytes(
physical_value=odxrequire(physical_value),
encode_state=encode_state,
bit_position=encode_state.cursor_bit_position)
encode_state.emplace_atomic_value(raw_data, self.short_name)

@override
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
Expand Down
Loading

0 comments on commit 034aa41

Please sign in to comment.