Skip to content

Commit

Permalink
Merge pull request #267 from andlaus/refactor_decoding4
Browse files Browse the repository at this point in the history
Refactor decoding, part 4
  • Loading branch information
andlaus authored Feb 16, 2024
2 parents c8221bb + 7259501 commit e613b32
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 158 deletions.
15 changes: 9 additions & 6 deletions odxtools/cli/browse.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import argparse
import logging
import sys
from typing import List, Optional, Union
from typing import List, Optional, Union, cast

from InquirerPy import prompt as PI_prompt
from tabulate import tabulate # TODO: switch to rich tables
Expand Down Expand Up @@ -99,7 +99,7 @@ def prompt_single_parameter_value(parameter: Parameter) -> Optional[AtomicOdxTyp
return None
elif parameter.physical_type.base_data_type is not None:
return _convert_string_to_odx_type(
answer.get(parameter.short_name), parameter.physical_type.base_data_type)
cast(str, answer.get(parameter.short_name)), parameter.physical_type.base_data_type)
else:
logging.warning(
f"Parameter {parameter.short_name} does not have a physical data type. Param details: {parameter}"
Expand Down Expand Up @@ -148,7 +148,7 @@ def encode_message_interactively(sub_service: Union[Request, Response],
lambda input: _convert_string_to_bytes(input),
}]
answer = PI_prompt(answered_request_prompt)
answered_request = answer.get("request")
answered_request = cast(bytes, answer.get("request"))
print(f"Input interpretation as list: {list(answered_request)}")

# Request values for parameters
Expand Down Expand Up @@ -270,7 +270,10 @@ def browse(odxdb: Database) -> None:
if answer.get("variant") == "[exit]":
return

variant = odxdb.diag_layers[answer.get("variant")]
variant_name = answer.get("variant")
assert isinstance(variant_name, str)
variant = odxdb.diag_layers[variant_name]
print(f"{type(answer.get('variant'))=}")
assert isinstance(variant, DiagLayer)

if (rx_id := variant.get_receive_id()) is not None:
Expand All @@ -287,7 +290,6 @@ def browse(odxdb: Database) -> None:
f"{variant.variant_type.value} '{variant.short_name}' (Receive ID: {recv_id}, Send ID: {send_id})"
)

service_sn = 0
while True:
services: List[DiagService] = [
s for s in variant.services if isinstance(s, DiagService)
Expand All @@ -307,6 +309,7 @@ def browse(odxdb: Database) -> None:
break

service_sn = answer.get("service")
assert isinstance(service_sn, str)

service = variant.services[service_sn]
assert isinstance(service, DiagService)
Expand Down Expand Up @@ -341,8 +344,8 @@ def browse(odxdb: Database) -> None:
continue

codec = answer.get("message_type")

if codec is not None:
assert isinstance(codec, (Request, Response))
table = extract_parameter_tabulation_data(codec.parameters)
table_str = tabulate(table, headers='keys', tablefmt='presto')
print(table_str)
Expand Down
84 changes: 84 additions & 0 deletions odxtools/decodestate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,31 @@
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Dict

import odxtools.exceptions as exceptions

from .exceptions import DecodeError
from .odxtypes import AtomicOdxType, DataType

try:
import bitstruct.c as bitstruct
except ImportError:
import bitstruct

if TYPE_CHECKING:
from .tablerow import TableRow

# format specifiers for the data type using the bitstruct module
ODX_TYPE_TO_FORMAT_LETTER = {
DataType.A_INT32: "s",
DataType.A_UINT32: "u",
DataType.A_FLOAT32: "f",
DataType.A_FLOAT64: "f",
DataType.A_BYTEFIELD: "r",
DataType.A_UNICODE2STRING: "r", # UTF-16 strings must be converted explicitly
DataType.A_ASCIISTRING: "r",
DataType.A_UTF8STRING: "r",
}


@dataclass
class DecodeState:
Expand Down Expand Up @@ -35,3 +57,65 @@ class DecodeState:

#: values of the table key parameters decoded so far
table_keys: Dict[str, "TableRow"] = field(default_factory=dict)

def extract_atomic_value(
self,
bit_length: int,
base_data_type: DataType,
is_highlow_byte_order: bool,
) -> AtomicOdxType:
"""Extract an internal value from a blob of raw bytes.
:return: Tuple with the internal value of the object and the
byte position of the first undecoded byte after the
extracted object.
"""
# If the bit length is zero, return "empty" values of each type
if bit_length == 0:
return base_data_type.as_python_type()()

byte_length = (bit_length + self.cursor_bit_position + 7) // 8
if self.cursor_byte_position + byte_length > len(self.coded_message):
raise DecodeError(f"Expected a longer message.")
extracted_bytes = self.coded_message[self.cursor_byte_position:self.cursor_byte_position +
byte_length]

# Apply byteorder for numerical objects. Note that doing this
# here might lead to garbage data being included in the result
# if the data to be extracted is not byte aligned and crosses
# byte boundaries, but it is what the specification says.
if not is_highlow_byte_order and base_data_type in [
DataType.A_INT32,
DataType.A_UINT32,
DataType.A_FLOAT32,
DataType.A_FLOAT64,
]:
extracted_bytes = extracted_bytes[::-1]

padding = (8 - (bit_length + self.cursor_bit_position) % 8) % 8
internal_value, = bitstruct.unpack_from(
f"{ODX_TYPE_TO_FORMAT_LETTER[base_data_type]}{bit_length}",
extracted_bytes,
offset=padding)

text_errors = 'strict' if exceptions.strict_mode else 'replace'
if base_data_type == DataType.A_ASCIISTRING:
# The spec says ASCII, meaning only byte values 0-127.
# But in practice, vendors use iso-8859-1, aka latin-1
# reason being iso-8859-1 never fails since it has a valid
# character mapping for every possible byte sequence.
text_encoding = 'iso-8859-1'
internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors)
elif base_data_type == DataType.A_UTF8STRING:
text_encoding = "utf-8"
internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors)
elif base_data_type == DataType.A_UNICODE2STRING:
# For UTF-16, we need to manually decode the extracted
# bytes to a string
text_encoding = "utf-16-be" if is_highlow_byte_order else "utf-16-le"
internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors)

self.cursor_byte_position += byte_length
self.cursor_bit_position = 0

return internal_value
89 changes: 7 additions & 82 deletions odxtools/diagcodedtype.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,22 @@
# SPDX-License-Identifier: MIT
import abc
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union

from .decodestate import ODX_TYPE_TO_FORMAT_LETTER, DecodeState
from .encodestate import EncodeState
from .exceptions import EncodeError, odxassert, odxraise
from .odxlink import OdxLinkDatabase, OdxLinkId
from .odxtypes import AtomicOdxType, DataType

try:
import bitstruct.c as bitstruct
except ImportError:
import bitstruct

from . import exceptions
from .decodestate import DecodeState
from .encodestate import EncodeState
from .exceptions import DecodeError, EncodeError, odxassert, odxraise
from .odxlink import OdxLinkDatabase, OdxLinkId
from .odxtypes import AtomicOdxType, DataType

if TYPE_CHECKING:
from .diaglayer import DiagLayer

# format specifiers for the data type using the bitstruct module
ODX_TYPE_TO_FORMAT_LETTER = {
DataType.A_INT32: "s",
DataType.A_UINT32: "u",
DataType.A_FLOAT32: "f",
DataType.A_FLOAT64: "f",
DataType.A_BYTEFIELD: "r",
DataType.A_UNICODE2STRING: "r", # UTF-16 strings must be converted explicitly
DataType.A_ASCIISTRING: "r",
DataType.A_UTF8STRING: "r",
}

# Allowed diag-coded types
DctType = Literal[
"LEADING-LENGTH-INFO-TYPE",
Expand Down Expand Up @@ -69,68 +56,6 @@ def dct_type(self) -> DctType:
def is_highlow_byte_order(self) -> bool:
return self.is_highlow_byte_order_raw in [None, True]

@staticmethod
def _extract_internal_value(
coded_message: bytes,
byte_position: int,
bit_position: int,
bit_length: int,
base_data_type: DataType,
is_highlow_byte_order: bool,
) -> Tuple[AtomicOdxType, int]:
"""Extract an internal value from a blob of raw bytes.
:return: Tuple with the internal value of the object and the
byte position of the first undecoded byte after the
extracted object.
"""
# If the bit length is zero, return "empty" values of each type
if bit_length == 0:
return base_data_type.as_python_type()(), byte_position

byte_length = (bit_length + bit_position + 7) // 8
if byte_position + byte_length > len(coded_message):
raise DecodeError(f"Expected a longer message.")
cursor_byte_position = byte_position + byte_length
extracted_bytes = coded_message[byte_position:cursor_byte_position]

# Apply byteorder for numerical objects. Note that doing this
# here might lead to garbage data being included in the result
# if the data to be extracted is not byte aligned and crosses
# byte boundaries, but it is what the specification says.
if not is_highlow_byte_order and base_data_type in [
DataType.A_INT32,
DataType.A_UINT32,
DataType.A_FLOAT32,
DataType.A_FLOAT64,
]:
extracted_bytes = extracted_bytes[::-1]

padding = (8 - (bit_length + bit_position) % 8) % 8
internal_value, = bitstruct.unpack_from(
f"{ODX_TYPE_TO_FORMAT_LETTER[base_data_type]}{bit_length}",
extracted_bytes,
offset=padding)

text_errors = 'strict' if exceptions.strict_mode else 'replace'
if base_data_type == DataType.A_ASCIISTRING:
# The spec says ASCII, meaning only byte values 0-127.
# But in practice, vendors use iso-8859-1, aka latin-1
# reason being iso-8859-1 never fails since it has a valid
# character mapping for every possible byte sequence.
text_encoding = 'iso-8859-1'
internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors)
elif base_data_type == DataType.A_UTF8STRING:
text_encoding = "utf-8"
internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors)
elif base_data_type == DataType.A_UNICODE2STRING:
# For UTF-16, we need to manually decode the extracted
# bytes to a string
text_encoding = "utf-16-be" if is_highlow_byte_order else "utf-16-le"
internal_value = internal_value.decode(encoding=text_encoding, errors=text_errors)

return internal_value, cursor_byte_position

@staticmethod
def _encode_internal_value(
internal_value: AtomicOdxType,
Expand Down
13 changes: 2 additions & 11 deletions odxtools/leadinglengthinfotype.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,33 +66,24 @@ def convert_internal_to_bytes(self, internal_value: Any, encode_state: EncodeSta
return length_bytes + value_bytes

def decode_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
coded_message = decode_state.coded_message

# Extract length of the parameter value
byte_length, byte_position = self._extract_internal_value(
coded_message=coded_message,
byte_position=decode_state.cursor_byte_position,
bit_position=decode_state.cursor_bit_position,
byte_length = decode_state.extract_atomic_value(
bit_length=self.bit_length,
base_data_type=DataType.A_UINT32, # length is an integer
is_highlow_byte_order=self.is_highlow_byte_order,
)
decode_state.cursor_bit_position = 0

if not isinstance(byte_length, int):
odxraise()

# Extract actual value
# TODO: The returned value is None if the byte_length is 0. Maybe change it
# to some default value like an empty bytearray() or 0?
value, cursor_byte_position = self._extract_internal_value(
coded_message=coded_message,
byte_position=byte_position,
bit_position=0,
value = decode_state.extract_atomic_value(
bit_length=8 * byte_length,
base_data_type=self.base_data_type,
is_highlow_byte_order=self.is_highlow_byte_order,
)

decode_state.cursor_byte_position = cursor_byte_position
return value
Loading

0 comments on commit e613b32

Please sign in to comment.