Skip to content

Commit

Permalink
refactor encoding: Data object properties
Browse files Browse the repository at this point in the history
Signed-off-by: Andreas Lauser <[email protected]>
Signed-off-by: Florian Jost <[email protected]>
  • Loading branch information
andlaus committed Apr 23, 2024
1 parent e8b4612 commit c867376
Show file tree
Hide file tree
Showing 17 changed files with 311 additions and 329 deletions.
142 changes: 69 additions & 73 deletions odxtools/basicstructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast
from xml.etree import ElementTree

from typing_extensions import override

from .complexdop import ComplexDop
from .dataobjectproperty import DataObjectProperty
from .decodestate import DecodeState
Expand Down Expand Up @@ -55,23 +57,21 @@ def get_static_bit_length(self) -> Optional[int]:
return 8 * self.byte_size

cursor = 0
length = 0
byte_length = 0
for param in self.parameters:
param_bit_length = param.get_static_bit_length()
if param_bit_length is None:
# We were not able to calculate a static bit length
return None
elif param.byte_position is not None:
bit_pos = param.bit_position or 0
byte_pos = param.byte_position or 0
cursor = byte_pos * 8 + bit_pos
cursor = param.byte_position

cursor += param_bit_length
length = max(length, cursor)
cursor += ((param.bit_position or 0) + param_bit_length + 7) // 8
byte_length = max(byte_length, cursor)

# Round up to account for padding bits (all structures are
# byte aligned)
return ((length + 7) // 8) * 8
return byte_length * 8

def coded_const_prefix(self, request_prefix: bytes = b'') -> bytes:
encode_state = EncodeState(
Expand Down Expand Up @@ -117,32 +117,58 @@ def print_free_parameters_info(self) -> None:

print(parameter_info(self.free_parameters), end="")

def convert_physical_to_internal(self,
param_value: ParameterValue,
triggering_coded_request: Optional[bytes],
is_end_of_pdu: bool = True) -> bytes:
def _validate_coded_message_size(self, coded_byte_len: int) -> None:

encode_state = EncodeState(
bytearray(),
parameter_values=cast(Dict[str, ParameterValue], param_value),
triggering_request=triggering_coded_request,
is_end_of_pdu=False)
if self.byte_size is not None:
# We definitely broke something if we didn't respect the explicit byte_size
if self.byte_size != coded_byte_len:
warnings.warn(
"Verification of coded message failed: Incorrect size.",
OdxWarning,
stacklevel=1)

return

bit_length = self.get_static_bit_length()

if not isinstance(param_value, dict):
if bit_length is None:
# Nothing to check
return

if coded_byte_len * 8 != bit_length:
# We may have broke something
# but it could be that bit_length was mis calculated and not the actual bytes are wrong
# Could happen with overlapping parameters and parameters with gaps
warnings.warn(
"Verification of coded message possibly failed: Size may be incorrect.",
OdxWarning,
stacklevel=1)

@override
def encode_into_pdu(self, physical_value: Optional[ParameterValue],
encode_state: EncodeState) -> None:
if not isinstance(physical_value, dict):
odxraise(
f"Expected a dictionary for the values of structure {self.short_name}, "
f"got {type(param_value).__name__}", EncodeError)
f"got {type(physical_value).__name__}", EncodeError)
elif encode_state.cursor_bit_position != 0:
odxraise(
f"Structures must be byte aligned, but "
f"{self.short_name} requested to be at bit position "
f"{encode_state.cursor_bit_position}", EncodeError)
encode_state.bit_position = 0

orig_cursor = encode_state.cursor_byte_position
orig_origin = encode_state.origin_byte_position
encode_state.origin_byte_position = encode_state.cursor_byte_position

orig_is_end_of_pdu = encode_state.is_end_of_pdu
encode_state.is_end_of_pdu = False

# in strict mode, ensure that no values for unknown parameters are specified.
if strict_mode:
param_names = {param.short_name for param in self.parameters}
for param_value_name in param_value:
for param_value_name in physical_value:
if param_value_name not in param_names:
odxraise(f"Value for unknown parameter '{param_value_name}' specified "
f"for structure {self.short_name}")
Expand All @@ -153,7 +179,7 @@ def convert_physical_to_internal(self,
# the PDU if the structure itself is at the end of the
# PDU. TODO: This assumes that the last parameter
# specified in the ODX is located last in the PDU...
encode_state.is_end_of_pdu = is_end_of_pdu
encode_state.is_end_of_pdu = orig_is_end_of_pdu

if isinstance(param, (LengthKeyParameter, TableKeyParameter)):
# At this point, we encode a placeholder value for length-
Expand All @@ -164,21 +190,26 @@ def convert_physical_to_internal(self,
# into the PDU here and add the real value of the
# parameter in a post-processing step.
param.encode_placeholder_into_pdu(
physical_value=param_value.get(param.short_name), encode_state=encode_state)
physical_value=physical_value.get(param.short_name), encode_state=encode_state)

continue

if param.is_required and param.short_name not in param_value:
if param.is_required and param.short_name not in physical_value:
odxraise(f"No value for required parameter {param.short_name} specified",
EncodeError)

param.encode_into_pdu(
physical_value=param_value.get(param.short_name), encode_state=encode_state)
physical_value=physical_value.get(param.short_name), encode_state=encode_state)

encode_state.is_end_of_pdu = False
if self.byte_size is not None:
if len(encode_state.coded_message) < self.byte_size:
# Padding bytes needed
encode_state.coded_message = encode_state.coded_message.ljust(self.byte_size, b"\0")
actual_len = encode_state.cursor_byte_position - encode_state.origin_byte_position
if actual_len < self.byte_size:
# Padding bytes needed. We add an empty object at the
# position directly after the structure and let
# EncodeState add the padding as needed.
encode_state.cursor_byte_position = encode_state.origin_byte_position + self.byte_size
encode_state.emplace_atomic_value(b'', "<PADDING>")

# encode the length- and table keys. This cannot be done above
# because we allow these to be defined implicitly (i.e. they
Expand All @@ -195,52 +226,10 @@ def convert_physical_to_internal(self,
self._validate_coded_message_size(encode_state.cursor_byte_position -
encode_state.origin_byte_position)

return encode_state.coded_message

def _validate_coded_message_size(self, coded_byte_len: int) -> None:

if self.byte_size is not None:
# We definitely broke something if we didn't respect the explicit byte_size
if self.byte_size != coded_byte_len:
warnings.warn(
"Verification of coded message failed: Incorrect size.",
OdxWarning,
stacklevel=1)

return

bit_length = self.get_static_bit_length()

if bit_length is None:
# Nothing to check
return

if coded_byte_len * 8 != bit_length:
# We may have broke something
# but it could be that bit_length was mis calculated and not the actual bytes are wrong
# Could happen with overlapping parameters and parameters with gaps
warnings.warn(
"Verification of coded message possibly failed: Size may be incorrect.",
OdxWarning,
stacklevel=1)

def convert_physical_to_bytes(self,
physical_value: ParameterValue,
encode_state: EncodeState,
bit_position: int = 0) -> bytes:
if not isinstance(physical_value, dict):
raise EncodeError(
f"Expected a dictionary for the values of structure {self.short_name}, "
f"got {type(physical_value)}")
if bit_position != 0:
raise EncodeError("Structures must be aligned, i.e. bit_position=0, but "
f"{self.short_name} was passed the bit position {bit_position}")
return self.convert_physical_to_internal(
physical_value,
triggering_coded_request=encode_state.triggering_request,
is_end_of_pdu=encode_state.is_end_of_pdu,
)
encode_state.origin_byte_position = orig_origin
encode_state.cursor_byte_position = max(orig_cursor, encode_state.cursor_byte_position)

@override
def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
# move the origin since positions specified by sub-parameters of
# structures are relative to the beginning of the structure object.
Expand Down Expand Up @@ -268,8 +257,15 @@ def encode(self, coded_request: Optional[bytes] = None, **kwargs: ParameterValue
kwargs: dict
Parameters of the RPC as mapping from SHORT-NAME of the parameter to the value
"""
return self.convert_physical_to_internal(
kwargs, triggering_coded_request=coded_request, is_end_of_pdu=True)
encode_state = EncodeState(
coded_message=bytearray(),
parameter_values=kwargs,
triggering_request=coded_request,
is_end_of_pdu=True)

self.encode_into_pdu(physical_value=kwargs, encode_state=encode_state)

return encode_state.coded_message

def decode(self, message: bytes) -> ParameterValueDict:
decode_state = DecodeState(coded_message=message)
Expand Down
22 changes: 2 additions & 20 deletions odxtools/dataobjectproperty.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,7 @@ def convert_physical_to_internal(self, physical_value: Any) -> Any:

return self.compu_method.convert_physical_to_internal(physical_value)

def convert_physical_to_bytes(self,
physical_value: Any,
encode_state: EncodeState,
bit_position: int = 0) -> bytes:
def encode_into_pdu(self, physical_value: ParameterValue, encode_state: EncodeState) -> None:
"""
Convert a physical representation of a parameter to a string bytes that can be send over the wire
"""
Expand All @@ -133,22 +130,7 @@ def convert_physical_to_bytes(self,
f" is not a valid.")

internal_val = self.convert_physical_to_internal(physical_value)

tmp_state = EncodeState(
bytearray(),
encode_state.parameter_values,
triggering_request=encode_state.triggering_request,
is_end_of_pdu=encode_state.is_end_of_pdu,
cursor_byte_position=0,
cursor_bit_position=bit_position,
origin_byte_position=0)

self.diag_coded_type.encode_into_pdu(internal_val, tmp_state)

encode_state.length_keys.update(tmp_state.length_keys)
encode_state.table_keys.update(tmp_state.table_keys)

return tmp_state.coded_message
self.diag_coded_type.encode_into_pdu(internal_val, encode_state)

def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
"""
Expand Down
13 changes: 5 additions & 8 deletions odxtools/dopbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
class DopBase(IdentifiableElement):
"""Base class for all DOPs.
Any class that a parameter can reference via a DOP-REF should inherit from this class.
Any class that a parameter can reference via a DOP-REF should
inherit from this class.
"""

admin_data: Optional[AdminData]
Expand Down Expand Up @@ -60,15 +61,11 @@ def get_static_bit_length(self) -> Optional[int]:
return None

def is_valid_physical_value(self, physical_value: ParameterValue) -> bool:
"""Determine if a phyical value can be handled by the DOP
"""
"""Determine if a phyical value can be handled by the DOP"""
raise NotImplementedError

def convert_physical_to_bytes(self,
physical_value: ParameterValue,
encode_state: EncodeState,
bit_position: int = 0) -> bytes:
"""Convert the physical value into bytes."""
def encode_into_pdu(self, physical_value: ParameterValue, encode_state: EncodeState) -> None:
"""Convert the physical value to bytes and emplace them into a PDU."""
raise NotImplementedError

def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
Expand Down
52 changes: 21 additions & 31 deletions odxtools/dtcdop.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast
from xml.etree import ElementTree

from typing_extensions import override

from .compumethods.compumethod import CompuMethod
from .compumethods.createanycompumethod import create_any_compu_method_from_et
from .createanydiagcodedtype import create_any_diag_coded_type_from_et
Expand All @@ -12,7 +14,7 @@
from .diagnostictroublecode import DiagnosticTroubleCode
from .dopbase import DopBase
from .encodestate import EncodeState
from .exceptions import DecodeError, EncodeError, odxassert, odxrequire
from .exceptions import DecodeError, EncodeError, odxassert, odxraise, odxrequire
from .nameditemlist import NamedItemList
from .odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId, OdxLinkRef
from .odxtypes import ParameterValue, odxstr_to_bool
Expand Down Expand Up @@ -87,17 +89,20 @@ def is_visible(self) -> bool:
def linked_dtc_dops(self) -> NamedItemList["DtcDop"]:
return self._linked_dtc_dops

@override
def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:

int_trouble_code = self.diag_coded_type.decode_from_pdu(decode_state)

if self.compu_method.is_valid_internal_value(int_trouble_code):
trouble_code = self.compu_method.convert_internal_to_physical(int_trouble_code)
else:
if not self.compu_method.is_valid_internal_value(int_trouble_code):
# TODO: How to prevent this?
raise DecodeError(
odxraise(
f"DTC-DOP {self.short_name} could not convert the coded value "
f" {repr(int_trouble_code)} to physical type {self.physical_type.base_data_type}.")
f" {repr(int_trouble_code)} to physical type {self.physical_type.base_data_type}.",
DecodeError)
return

trouble_code = self.compu_method.convert_internal_to_physical(int_trouble_code)

assert isinstance(trouble_code, int)

Expand All @@ -110,10 +115,12 @@ def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
return dtcs[0]

# the DTC was not specified. This probably means that the
# diagnostic description file is incomplete. We do not bail
# out but we cannot provide an interpretation for it out of the
# box...
dtc = DiagnosticTroubleCode(
# diagnostic description file is incomplete.
odxraise(
f"Encountered DTC 0x{trouble_code:06x} which has not been defined "
f"by the database", DecodeError)

return DiagnosticTroubleCode(
trouble_code=trouble_code,
odx_id=cast(OdxLinkId, None),
short_name=f'DTC_{trouble_code:06x}',
Expand All @@ -126,12 +133,9 @@ def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
sdgs=[],
)

return dtc

def convert_physical_to_bytes(self,
physical_value: ParameterValue,
encode_state: EncodeState,
bit_position: int = 0) -> bytes:
@override
def encode_into_pdu(self, physical_value: Optional[ParameterValue],
encode_state: EncodeState) -> None:
if isinstance(physical_value, DiagnosticTroubleCode):
trouble_code = physical_value.trouble_code
elif isinstance(physical_value, int):
Expand All @@ -147,21 +151,7 @@ def convert_physical_to_bytes(self,
f" DiagnosticTroubleCode but got {physical_value!r}.")

internal_trouble_code = self.compu_method.convert_physical_to_internal(trouble_code)
tmp_state = EncodeState(
bytearray(),
encode_state.parameter_values,
triggering_request=encode_state.triggering_request,
is_end_of_pdu=False,
cursor_byte_position=0,
cursor_bit_position=0,
origin_byte_position=0)
encode_state.cursor_bit_position = encode_state.cursor_bit_position
self.diag_coded_type.encode_into_pdu(internal_trouble_code, encode_state=tmp_state)

encode_state.length_keys.update(tmp_state.length_keys)
encode_state.table_keys.update(tmp_state.table_keys)

return tmp_state.coded_message
self.diag_coded_type.encode_into_pdu(internal_trouble_code, encode_state)

def _build_odxlinks(self) -> Dict[OdxLinkId, Any]:
odxlinks = super()._build_odxlinks()
Expand Down
Loading

0 comments on commit c867376

Please sign in to comment.