Skip to content

Commit

Permalink
Merge pull request #265 from andlaus/refactor_decoding2
Browse files Browse the repository at this point in the history
Refactor decoding, part 2
  • Loading branch information
andlaus authored Feb 13, 2024
2 parents 240718a + f40cbe3 commit 911df59
Show file tree
Hide file tree
Showing 25 changed files with 200 additions and 166 deletions.
19 changes: 10 additions & 9 deletions odxtools/basicstructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,20 +234,21 @@ def convert_bytes_to_physical(self,

# move the origin since positions specified by sub-parameters of
# structures are relative to the beginning of the structure object.
orig_origin = decode_state.origin_position
decode_state.origin_position = decode_state.cursor_position
orig_origin = decode_state.origin_byte_position
decode_state.origin_byte_position = decode_state.cursor_byte_position

result = {}
for param in self.parameters:
value, cursor_position = param.decode_from_pdu(decode_state)
value, cursor_byte_position = param.decode_from_pdu(decode_state)

result[param.short_name] = value
decode_state.cursor_position = max(decode_state.cursor_position, cursor_position)
decode_state.cursor_byte_position = max(decode_state.cursor_byte_position,
cursor_byte_position)

# decoding of the structure finished. move back the origin.
decode_state.origin_position = orig_origin
decode_state.origin_byte_position = orig_origin

return result, decode_state.cursor_position
return result, decode_state.cursor_byte_position

def encode(self, coded_request: Optional[bytes] = None, **params: ParameterValue) -> bytes:
"""
Expand All @@ -267,14 +268,14 @@ def encode(self, coded_request: Optional[bytes] = None, **params: ParameterValue
def decode(self, message: bytes) -> ParameterValueDict:
# dummy decode state to be passed to convert_bytes_to_physical
decode_state = DecodeState(coded_message=message)
param_values, cursor_position = self.convert_bytes_to_physical(decode_state)
param_values, cursor_byte_position = self.convert_bytes_to_physical(decode_state)
if not isinstance(param_values, dict):
odxraise(f"Decoding a structure must result in a dictionary of parameter "
f"values (is {type(param_values)})")
if len(message) != cursor_position:
if len(message) != cursor_byte_position:
warnings.warn(
f"The message {message.hex()} is longer than could be parsed."
f" Expected {cursor_position} but got {len(message)}.",
f" Expected {cursor_byte_position} but got {len(message)}.",
DecodeError,
stacklevel=1,
)
Expand Down
2 changes: 1 addition & 1 deletion odxtools/cli/browse.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
from typing import List, Optional, Union

import PyInquirer.prompt as PI_prompt
from InquirerPy import prompt as PI_prompt
from tabulate import tabulate # TODO: switch to rich tables

from ..database import Database
Expand Down
13 changes: 11 additions & 2 deletions odxtools/cli/dummy_sub_parser.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# SPDX-License-Identifier: MIT
import argparse
import sys
import traceback
from io import StringIO


class DummyTool:
Expand All @@ -19,10 +21,17 @@ def __init__(self, tool_name: str, error: Exception):
self._error = error

def add_subparser(self, subparser_list: "argparse._SubParsersAction") -> None:
desc = StringIO()

print(f"Tool '{self._odxtools_tool_name_}' is unavailable: {self._error}", file=desc)
print(file=desc)
print(f"Traceback:", file=desc)
traceback.print_tb(self._error.__traceback__, file=desc)

subparser_list.add_parser(
self._odxtools_tool_name_,
description=f"Tool '{self._odxtools_tool_name_}' is unavailable: {self._error}",
help="Dummy tool",
description=desc.getvalue(),
help="Tool unavailable",
formatter_class=argparse.RawTextHelpFormatter,
)

Expand Down
7 changes: 4 additions & 3 deletions odxtools/dataobjectproperty.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,12 @@ def convert_bytes_to_physical(self,
"""
odxassert(0 <= bit_position and bit_position < 8)

internal, cursor_position = self.diag_coded_type.convert_bytes_to_internal(
decode_state, bit_position=bit_position)
decode_state.cursor_bit_position = bit_position
internal = self.diag_coded_type.decode_from_pdu(decode_state)

if self.compu_method.is_valid_internal_value(internal):
return self.compu_method.convert_internal_to_physical(internal), cursor_position
return self.compu_method.convert_internal_to_physical(
internal), decode_state.cursor_byte_position
else:
# TODO: How to prevent this?
raise DecodeError(
Expand Down
10 changes: 8 additions & 2 deletions odxtools/decodestate.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@ class DecodeState:
#:
#: i.e., the absolute byte position to which all relative positions
#: refer to, e.g. the position of the first byte of a structure.
origin_position: int = 0
origin_byte_position: int = 0

#: Absolute position of the next undecoded byte to be considered
#:
#: (if not explicitly specified by the object to be decoded.)
cursor_position: int = 0
cursor_byte_position: int = 0

#: the bit position [0, 7] where the object to be extracted begins
#:
#: If bit position is undefined (`None`), the object to be extracted
#: starts at bit 0.
cursor_bit_position: int = 0

#: values of the length key parameters decoded so far
length_keys: Dict[str, int] = field(default_factory=dict)
Expand Down
10 changes: 4 additions & 6 deletions odxtools/diagcodedtype.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ def _extract_internal_value(
byte_length = (bit_length + bit_position + 7) // 8
if byte_position + byte_length > len(coded_message):
raise DecodeError(f"Expected a longer message.")
cursor_position = byte_position + byte_length
extracted_bytes = coded_message[byte_position:cursor_position]
cursor_byte_position = byte_position + byte_length
extracted_bytes = coded_message[byte_position:cursor_byte_position]

# Apply byteorder for numerical objects. Note that doing this
# here might lead to garbage data being included in the result
Expand Down Expand Up @@ -129,7 +129,7 @@ def _extract_internal_value(
text_encoding = "utf-16-be" if is_highlow_byte_order else "utf-16-le"
internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors)

return internal_value, cursor_position
return internal_value, cursor_byte_position

@staticmethod
def _encode_internal_value(
Expand Down Expand Up @@ -254,9 +254,7 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state:
pass

@abc.abstractmethod
def convert_bytes_to_internal(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[AtomicOdxType, int]:
def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
"""Decode the parameter value from the coded message.
Parameters
Expand Down
8 changes: 4 additions & 4 deletions odxtools/dtcdop.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ def convert_bytes_to_physical(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[ParameterValue, int]:

int_trouble_code, cursor_position = self.diag_coded_type.convert_bytes_to_internal(
decode_state, bit_position=bit_position)
decode_state.cursor_bit_position = bit_position
int_trouble_code = self.diag_coded_type.decode_from_pdu(decode_state)

if self.compu_method.is_valid_internal_value(int_trouble_code):
trouble_code = self.compu_method.convert_internal_to_physical(int_trouble_code)
Expand All @@ -110,7 +110,7 @@ def convert_bytes_to_physical(self,

if len(dtcs) == 1:
# we found exactly one described DTC
return dtcs[0], cursor_position
return dtcs[0], decode_state.cursor_byte_position

# the DTC was not specified. This probably means that the
# diagnostic description file is incomplete. We do not bail
Expand All @@ -129,7 +129,7 @@ def convert_bytes_to_physical(self,
sdgs=[],
)

return dtc, cursor_position
return dtc, decode_state.cursor_byte_position

def convert_physical_to_bytes(self, physical_value: ParameterValue, encode_state: EncodeState,
bit_position: int) -> bytes:
Expand Down
10 changes: 5 additions & 5 deletions odxtools/endofpdufield.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,18 @@ def convert_physical_to_bytes(
def convert_bytes_to_physical(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[ParameterValue, int]:
cursor_position = decode_state.cursor_position
cursor_byte_position = decode_state.cursor_byte_position

value = []
while cursor_position < len(decode_state.coded_message):
while cursor_byte_position < len(decode_state.coded_message):
# ATTENTION: the ODX specification is very misleading
# here: it says that the item is repeated until the end of
# the PDU, but it means that DOP of the items that are
# repeated are identical, not their values
new_value, cursor_position = self.structure.convert_bytes_to_physical(
new_value, cursor_byte_position = self.structure.convert_bytes_to_physical(
decode_state, bit_position=0)
# Update next byte_position
decode_state.cursor_position = cursor_position
decode_state.cursor_byte_position = cursor_byte_position
value.append(new_value)

return value, cursor_position
return value, cursor_byte_position
16 changes: 8 additions & 8 deletions odxtools/leadinglengthinfotype.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import Any, Optional, Tuple
from typing import Any, Optional

from .decodestate import DecodeState
from .diagcodedtype import DctType, DiagCodedType
Expand Down Expand Up @@ -65,28 +65,27 @@ def convert_internal_to_bytes(self, internal_value: Any, encode_state: EncodeSta

return length_bytes + value_bytes

def convert_bytes_to_internal(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[AtomicOdxType, int]:
def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
coded_message = decode_state.coded_message

# Extract length of the parameter value
byte_length, byte_position = self._extract_internal_value(
coded_message=coded_message,
byte_position=decode_state.cursor_position,
bit_position=bit_position,
byte_position=decode_state.cursor_byte_position,
bit_position=decode_state.cursor_bit_position,
bit_length=self.bit_length,
base_data_type=DataType.A_UINT32, # length is an integer
is_highlow_byte_order=self.is_highlow_byte_order,
)
decode_state.cursor_bit_position = 0

if not isinstance(byte_length, int):
odxraise()

# Extract actual value
# TODO: The returned value is None if the byte_length is 0. Maybe change it
# to some default value like an empty bytearray() or 0?
value, cursor_position = self._extract_internal_value(
value, cursor_byte_position = self._extract_internal_value(
coded_message=coded_message,
byte_position=byte_position,
bit_position=0,
Expand All @@ -95,4 +94,5 @@ def convert_bytes_to_internal(self,
is_highlow_byte_order=self.is_highlow_byte_order,
)

return value, cursor_position
decode_state.cursor_byte_position = cursor_byte_position
return value
25 changes: 15 additions & 10 deletions odxtools/minmaxlengthtype.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-License-Identifier: MIT
from dataclasses import dataclass
from typing import Optional, Tuple
from typing import Optional

from .decodestate import DecodeState
from .diagcodedtype import DctType, DiagCodedType
Expand Down Expand Up @@ -100,14 +100,12 @@ def convert_internal_to_bytes(self, internal_value: AtomicOdxType, encode_state:

return value_bytes

def convert_bytes_to_internal(self,
decode_state: DecodeState,
bit_position: int = 0) -> Tuple[AtomicOdxType, int]:
if decode_state.cursor_position + self.min_length > len(decode_state.coded_message):
def decode_from_pdu(self, decode_state: DecodeState, bit_position: int = 0) -> AtomicOdxType:
if decode_state.cursor_byte_position + self.min_length > len(decode_state.coded_message):
raise DecodeError("The PDU ended before minimum length was reached.")

coded_message = decode_state.coded_message
cursor_pos = decode_state.cursor_position
cursor_pos = decode_state.cursor_byte_position
termination_seq = self.__termination_sequence()

max_terminator_pos = len(coded_message)
Expand Down Expand Up @@ -148,7 +146,7 @@ def convert_bytes_to_internal(self,
value, byte_pos = self._extract_internal_value(
decode_state.coded_message,
byte_position=cursor_pos,
bit_position=bit_position,
bit_position=0,
bit_length=8 * byte_length,
base_data_type=self.base_data_type,
is_highlow_byte_order=self.is_highlow_byte_order,
Expand All @@ -158,7 +156,10 @@ def convert_bytes_to_internal(self,
byte_pos += len(termination_seq)

# next byte starts after the actual data and the termination sequence
return value, byte_pos
decode_state.cursor_byte_position = byte_pos
decode_state.cursor_bit_position = 0

return value
else:
# If termination == "END-OF-PDU", the parameter ends after max_length
# or at the end of the PDU.
Expand All @@ -167,9 +168,13 @@ def convert_bytes_to_internal(self,
value, byte_pos = self._extract_internal_value(
decode_state.coded_message,
byte_position=cursor_pos,
bit_position=bit_position,
bit_position=0,
bit_length=8 * byte_length,
base_data_type=self.base_data_type,
is_highlow_byte_order=self.is_highlow_byte_order,
)
return value, byte_pos

decode_state.cursor_byte_position = byte_pos
decode_state.cursor_bit_position = 0

return value
6 changes: 3 additions & 3 deletions odxtools/multiplexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ def convert_bytes_to_physical(self,

case_decode_state = copy(decode_state)
if self.byte_position is not None:
case_decode_state.origin_position = decode_state.origin_position + self.byte_position
case_decode_state.origin_byte_position = decode_state.origin_byte_position + self.byte_position
else:
case_decode_state.origin_position = decode_state.cursor_position
case_decode_state.origin_byte_position = decode_state.cursor_byte_position

case_found = False
case_next_byte = 0
Expand All @@ -144,7 +144,7 @@ def convert_bytes_to_physical(self,
f"Failed to find a matching case in {self.short_name} for value {key_value!r}")

mux_value = {case.short_name: cast(ParameterValue, case_value)}
mux_next_byte = decode_state.cursor_position + max(
mux_next_byte = decode_state.cursor_byte_position + max(
key_next_byte + self.switch_key.byte_position, case_next_byte + self.byte_position)
return mux_value, mux_next_byte

Expand Down
16 changes: 8 additions & 8 deletions odxtools/parameters/codedconstparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,28 +64,28 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:

def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[AtomicOdxType, int]:
# Extract coded values
orig_cursor_pos = decode_state.cursor_position
orig_cursor_pos = decode_state.cursor_byte_position
if self.byte_position is not None:
decode_state.cursor_position = decode_state.origin_position + self.byte_position
decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position

coded_val, cursor_position = self.diag_coded_type.convert_bytes_to_internal(
decode_state, bit_position=self.bit_position or 0)

decode_state.cursor_position = orig_cursor_pos
decode_state.cursor_bit_position = self.bit_position or 0
coded_val = self.diag_coded_type.decode_from_pdu(decode_state)

# Check if the coded value in the message is correct.
if self.coded_value != coded_val:
warnings.warn(
f"Coded constant parameter does not match! "
f"The parameter {self.short_name} expected coded "
f"value {str(self._coded_value_str)} but got {str(coded_val)} "
f"at byte position {decode_state.cursor_position} "
f"at byte position {decode_state.cursor_byte_position} "
f"in coded message {decode_state.coded_message.hex()}.",
DecodeError,
stacklevel=1,
)

return coded_val, cursor_position
decode_state.cursor_byte_position = max(orig_cursor_pos, decode_state.cursor_byte_position)

return coded_val, decode_state.cursor_byte_position

@property
def _coded_value_str(self) -> str:
Expand Down
4 changes: 2 additions & 2 deletions odxtools/parameters/lengthkeyparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ def encode_into_pdu(self, encode_state: EncodeState) -> bytes:
return super().encode_into_pdu(encode_state)

def decode_from_pdu(self, decode_state: DecodeState) -> Tuple[ParameterValue, int]:
phys_val, cursor_position = super().decode_from_pdu(decode_state)
phys_val, cursor_byte_position = super().decode_from_pdu(decode_state)

if not isinstance(phys_val, int):
odxraise(f"The pysical type of length keys must be an integer, "
f"(is {type(phys_val).__name__})")
decode_state.length_keys[self.short_name] = phys_val

return phys_val, cursor_position
return phys_val, cursor_byte_position
Loading

0 comments on commit 911df59

Please sign in to comment.