Skip to content

Commit

Permalink
move the method for extracting atomic values to DecodeState
Browse files Browse the repository at this point in the history
thanks to [at]kayoub5 for the proposal.

Signed-off-by: Andreas Lauser <[email protected]>
Signed-off-by: Florian Jost <[email protected]>
  • Loading branch information
andlaus committed Feb 16, 2024
1 parent 59324c3 commit 7259501
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 140 deletions.
84 changes: 84 additions & 0 deletions odxtools/decodestate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,31 @@
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Dict

import odxtools.exceptions as exceptions

from .exceptions import DecodeError
from .odxtypes import AtomicOdxType, DataType

try:
import bitstruct.c as bitstruct
except ImportError:
import bitstruct

if TYPE_CHECKING:
from .tablerow import TableRow

# format specifiers for the data type using the bitstruct module
ODX_TYPE_TO_FORMAT_LETTER = {
DataType.A_INT32: "s",
DataType.A_UINT32: "u",
DataType.A_FLOAT32: "f",
DataType.A_FLOAT64: "f",
DataType.A_BYTEFIELD: "r",
DataType.A_UNICODE2STRING: "r", # UTF-16 strings must be converted explicitly
DataType.A_ASCIISTRING: "r",
DataType.A_UTF8STRING: "r",
}


@dataclass
class DecodeState:
Expand Down Expand Up @@ -35,3 +57,65 @@ class DecodeState:

#: values of the table key parameters decoded so far
table_keys: Dict[str, "TableRow"] = field(default_factory=dict)

def extract_atomic_value(
self,
bit_length: int,
base_data_type: DataType,
is_highlow_byte_order: bool,
) -> AtomicOdxType:
"""Extract an internal value from a blob of raw bytes.
:return: Tuple with the internal value of the object and the
byte position of the first undecoded byte after the
extracted object.
"""
# If the bit length is zero, return "empty" values of each type
if bit_length == 0:
return base_data_type.as_python_type()()

byte_length = (bit_length + self.cursor_bit_position + 7) // 8
if self.cursor_byte_position + byte_length > len(self.coded_message):
raise DecodeError(f"Expected a longer message.")
extracted_bytes = self.coded_message[self.cursor_byte_position:self.cursor_byte_position +
byte_length]

# Apply byteorder for numerical objects. Note that doing this
# here might lead to garbage data being included in the result
# if the data to be extracted is not byte aligned and crosses
# byte boundaries, but it is what the specification says.
if not is_highlow_byte_order and base_data_type in [
DataType.A_INT32,
DataType.A_UINT32,
DataType.A_FLOAT32,
DataType.A_FLOAT64,
]:
extracted_bytes = extracted_bytes[::-1]

padding = (8 - (bit_length + self.cursor_bit_position) % 8) % 8
internal_value, = bitstruct.unpack_from(
f"{ODX_TYPE_TO_FORMAT_LETTER[base_data_type]}{bit_length}",
extracted_bytes,
offset=padding)

text_errors = 'strict' if exceptions.strict_mode else 'replace'
if base_data_type == DataType.A_ASCIISTRING:
# The spec says ASCII, meaning only byte values 0-127.
# But in practice, vendors use iso-8859-1, aka latin-1
# reason being iso-8859-1 never fails since it has a valid
# character mapping for every possible byte sequence.
text_encoding = 'iso-8859-1'
internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors)
elif base_data_type == DataType.A_UTF8STRING:
text_encoding = "utf-8"
internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors)
elif base_data_type == DataType.A_UNICODE2STRING:
# For UTF-16, we need to manually decode the extracted
# bytes to a string
text_encoding = "utf-16-be" if is_highlow_byte_order else "utf-16-le"
internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors)

self.cursor_byte_position += byte_length
self.cursor_bit_position = 0

return internal_value
89 changes: 7 additions & 82 deletions odxtools/diagcodedtype.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,22 @@
# SPDX-License-Identifier: MIT
import abc
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union

from .decodestate import ODX_TYPE_TO_FORMAT_LETTER, DecodeState
from .encodestate import EncodeState
from .exceptions import EncodeError, odxassert, odxraise
from .odxlink import OdxLinkDatabase, OdxLinkId
from .odxtypes import AtomicOdxType, DataType

try:
import bitstruct.c as bitstruct
except ImportError:
import bitstruct

from . import exceptions
from .decodestate import DecodeState
from .encodestate import EncodeState
from .exceptions import DecodeError, EncodeError, odxassert, odxraise
from .odxlink import OdxLinkDatabase, OdxLinkId
from .odxtypes import AtomicOdxType, DataType

if TYPE_CHECKING:
from .diaglayer import DiagLayer

# format specifiers for the data type using the bitstruct module
ODX_TYPE_TO_FORMAT_LETTER = {
DataType.A_INT32: "s",
DataType.A_UINT32: "u",
DataType.A_FLOAT32: "f",
DataType.A_FLOAT64: "f",
DataType.A_BYTEFIELD: "r",
DataType.A_UNICODE2STRING: "r", # UTF-16 strings must be converted explicitly
DataType.A_ASCIISTRING: "r",
DataType.A_UTF8STRING: "r",
}

# Allowed diag-coded types
DctType = Literal[
"LEADING-LENGTH-INFO-TYPE",
Expand Down Expand Up @@ -69,68 +56,6 @@ def dct_type(self) -> DctType:
def is_highlow_byte_order(self) -> bool:
return self.is_highlow_byte_order_raw in [None, True]

@staticmethod
def _extract_atomic_value(
coded_message: bytes,
byte_position: int,
bit_position: int,
bit_length: int,
base_data_type: DataType,
is_highlow_byte_order: bool,
) -> Tuple[AtomicOdxType, int]:
"""Extract an internal value from a blob of raw bytes.
:return: Tuple with the internal value of the object and the
byte position of the first undecoded byte after the
extracted object.
"""
# If the bit length is zero, return "empty" values of each type
if bit_length == 0:
return base_data_type.as_python_type()(), byte_position

byte_length = (bit_length + bit_position + 7) // 8
if byte_position + byte_length > len(coded_message):
raise DecodeError(f"Expected a longer message.")
cursor_byte_position = byte_position + byte_length
extracted_bytes = coded_message[byte_position:cursor_byte_position]

# Apply byteorder for numerical objects. Note that doing this
# here might lead to garbage data being included in the result
# if the data to be extracted is not byte aligned and crosses
# byte boundaries, but it is what the specification says.
if not is_highlow_byte_order and base_data_type in [
DataType.A_INT32,
DataType.A_UINT32,
DataType.A_FLOAT32,
DataType.A_FLOAT64,
]:
extracted_bytes = extracted_bytes[::-1]

padding = (8 - (bit_length + bit_position) % 8) % 8
internal_value, = bitstruct.unpack_from(
f"{ODX_TYPE_TO_FORMAT_LETTER[base_data_type]}{bit_length}",
extracted_bytes,
offset=padding)

text_errors = 'strict' if exceptions.strict_mode else 'replace'
if base_data_type == DataType.A_ASCIISTRING:
# The spec says ASCII, meaning only byte values 0-127.
# But in practice, vendors use iso-8859-1, aka latin-1
# reason being iso-8859-1 never fails since it has a valid
# character mapping for every possible byte sequence.
text_encoding = 'iso-8859-1'
internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors)
elif base_data_type == DataType.A_UTF8STRING:
text_encoding = "utf-8"
internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors)
elif base_data_type == DataType.A_UNICODE2STRING:
# For UTF-16, we need to manually decode the extracted
# bytes to a string
text_encoding = "utf-16-be" if is_highlow_byte_order else "utf-16-le"
internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors)

return internal_value, cursor_byte_position

@staticmethod
def _encode_internal_value(
internal_value: AtomicOdxType,
Expand Down
13 changes: 2 additions & 11 deletions odxtools/leadinglengthinfotype.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,33 +66,24 @@ def convert_internal_to_bytes(self, internal_value: Any, encode_state: EncodeSta
return length_bytes + value_bytes

def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
coded_message = decode_state.coded_message

# Extract length of the parameter value
byte_length, byte_position = self._extract_atomic_value(
coded_message=coded_message,
byte_position=decode_state.cursor_byte_position,
bit_position=decode_state.cursor_bit_position,
byte_length = decode_state.extract_atomic_value(
bit_length=self.bit_length,
base_data_type=DataType.A_UINT32, # length is an integer
is_highlow_byte_order=self.is_highlow_byte_order,
)
decode_state.cursor_bit_position = 0

if not isinstance(byte_length, int):
odxraise()

# Extract actual value
# TODO: The returned value is None if the byte_length is 0. Maybe change it
# to some default value like an empty bytearray() or 0?
value, cursor_byte_position = self._extract_atomic_value(
coded_message=coded_message,
byte_position=byte_position,
bit_position=0,
value = decode_state.extract_atomic_value(
bit_length=8 * byte_length,
base_data_type=self.base_data_type,
is_highlow_byte_order=self.is_highlow_byte_order,
)

decode_state.cursor_byte_position = cursor_byte_position
return value
48 changes: 21 additions & 27 deletions odxtools/minmaxlengthtype.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,36 +107,40 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
raise DecodeError("The PDU ended before minimum length was reached.")

coded_message = decode_state.coded_message
cursor_pos = decode_state.cursor_byte_position
orig_cursor_pos = decode_state.cursor_byte_position
termination_seq = self.__termination_sequence()

max_terminator_pos = len(coded_message)
if self.max_length is not None:
max_terminator_pos = min(max_terminator_pos, cursor_pos + self.max_length)
max_terminator_pos = min(max_terminator_pos, orig_cursor_pos + self.max_length)

if self.termination != "END-OF-PDU":
# The parameter either ends after the maximum length, at
# the end of the PDU or if a termination sequence is
# found.

terminator_pos = cursor_pos + self.min_length
# Find the location of the termination sequence. The
# problem here is that the alignment of the termination
# sequence must be correct for it to be a termination
# sequence. (e.g., an odd-aligned double-zero for UTF-16
# strings is *not* a termination sequence!)
terminator_pos = orig_cursor_pos + self.min_length
while True:
# Search the termination sequence
terminator_pos = coded_message.find(termination_seq, terminator_pos,
max_terminator_pos)
terminator_pos = decode_state.coded_message.find(termination_seq, terminator_pos,
max_terminator_pos)
if terminator_pos < 0:
# termination sequence was not found, i.e., we
# are terminated by either the end of the PDU or
# our maximum size. (whatever is the smaller
# value.)
byte_length = max_terminator_pos - cursor_pos
byte_length = max_terminator_pos - orig_cursor_pos
break
elif (terminator_pos - cursor_pos) % len(termination_seq) == 0:
elif (terminator_pos - orig_cursor_pos) % len(termination_seq) == 0:
# we found the termination sequence at a position
# and it is correctly aligned (two-byte
# termination sequences must be word aligned
# relative to the beginning of the parameter)!
byte_length = terminator_pos - cursor_pos
byte_length = terminator_pos - orig_cursor_pos
break
else:
# we found the termination sequence, but its
Expand All @@ -145,38 +149,28 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
terminator_pos += 1

# Extract the value
value, byte_pos = self._extract_atomic_value(
decode_state.coded_message,
byte_position=cursor_pos,
bit_position=0,
value = decode_state.extract_atomic_value(
bit_length=8 * byte_length,
base_data_type=self.base_data_type,
is_highlow_byte_order=self.is_highlow_byte_order,
)

if byte_pos != len(coded_message) and byte_pos - cursor_pos != self.max_length:
byte_pos += len(termination_seq)

# next byte starts after the actual data and the termination sequence
decode_state.cursor_byte_position = byte_pos
decode_state.cursor_bit_position = 0
if decode_state.cursor_byte_position != len(
decode_state.coded_message
) and decode_state.cursor_byte_position - orig_cursor_pos != self.max_length:
# next object starts after the actual data and the termination sequence
decode_state.cursor_byte_position += len(termination_seq)

return value
else:
# If termination == "END-OF-PDU", the parameter ends after max_length
# or at the end of the PDU.
byte_length = max_terminator_pos - cursor_pos
byte_length = max_terminator_pos - orig_cursor_pos

value, byte_pos = self._extract_atomic_value(
decode_state.coded_message,
byte_position=cursor_pos,
bit_position=0,
value = decode_state.extract_atomic_value(
bit_length=8 * byte_length,
base_data_type=self.base_data_type,
is_highlow_byte_order=self.is_highlow_byte_order,
)

decode_state.cursor_byte_position = byte_pos
decode_state.cursor_bit_position = 0

return value
7 changes: 1 addition & 6 deletions odxtools/parameters/matchingrequestparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Optional

from ..decodestate import DecodeState
from ..diagcodedtype import DiagCodedType
from ..encodestate import EncodeState
from ..exceptions import EncodeError
from ..odxtypes import DataType, ParameterValue
Expand Down Expand Up @@ -43,15 +42,11 @@ def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
if self.byte_position is not None:
decode_state.cursor_byte_position = decode_state.origin_byte_position + self.byte_position

result, decode_state.cursor_byte_position = DiagCodedType._extract_atomic_value(
coded_message=decode_state.coded_message,
byte_position=decode_state.cursor_byte_position,
bit_position=self.bit_position or 0,
result = decode_state.extract_atomic_value(
bit_length=self.byte_length * 8,
base_data_type=DataType.A_UINT32,
is_highlow_byte_order=False)

decode_state.cursor_byte_position = max(decode_state.cursor_byte_position, orig_cursor)
decode_state.cursor_bit_position = 0

return result
8 changes: 1 addition & 7 deletions odxtools/paramlengthinfotype.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,10 @@ def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
bit_length = 0

# Extract the internal value and return.
value, cursor_byte_position = self._extract_atomic_value(
decode_state.coded_message,
decode_state.cursor_byte_position,
decode_state.cursor_bit_position,
value = decode_state.extract_atomic_value(
bit_length,
self.base_data_type,
self.is_highlow_byte_order,
)

decode_state.cursor_bit_position = 0
decode_state.cursor_byte_position = cursor_byte_position

return value
Loading

0 comments on commit 7259501

Please sign in to comment.