Skip to content

Commit

Permalink
remove the infrastructure for printing the message layout
Browse files Browse the repository at this point in the history
This code was hard to maintain, located in the wrong place, did not
work for quite a few edge cases, and finally a simple table of the
parameters is far more useful from a user's point of view IMO.

Signed-off-by: Andreas Lauser <[email protected]>
Signed-off-by: Gerrit Ecke <[email protected]>
  • Loading branch information
andlaus committed Jan 19, 2024
1 parent 8b75587 commit fd6a242
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 217 deletions.
176 changes: 0 additions & 176 deletions odxtools/basicstructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from .parameters.parameterwithdop import ParameterWithDOP
from .parameters.physicalconstantparameter import PhysicalConstantParameter
from .parameters.tablekeyparameter import TableKeyParameter
from .parameters.valueparameter import ValueParameter
from .utils import dataclass_fields_asdict

if TYPE_CHECKING:
Expand Down Expand Up @@ -301,178 +300,3 @@ def _resolve_snrefs(self, diag_layer: "DiagLayer") -> None:

for p in self.parameters:
p._resolve_snrefs(diag_layer)

def _message_format_lines(self, allow_unknown_lengths: bool = False) -> List[str]:
# sort parameters
sorted_params: list = list(self.parameters) # copy list

def param_sort_key(param: Parameter) -> Tuple[int, int, int]:
byte_position_int = param.byte_position if param.byte_position is not None else 0
bit_position_int = param.bit_position if param.bit_position is not None else 0
param_bit_length_int = param.get_static_bit_length() or 0
# take bit-length into account for overlapping parameters. 1000000 is an arbitrary number chosen
# to hopefully never be smaller than an actual bit-length
return (byte_position_int, 1000000 - param_bit_length_int, 8 - bit_position_int)

sorted_params.sort(key=param_sort_key)

# replace structure parameters by their sub parameters
params: List[Parameter] = []
for p in sorted_params:
if isinstance(p, ValueParameter) and isinstance(p._dop, BasicStructure):
params += p._dop.parameters
else:
params.append(p)

param_idx = 0
byte_idx = 0
# needs to be one larger than the maximum digit length of a byte number
indent_for_byte_numbering = 5 * " "
formatted_lines = [
indent_for_byte_numbering + "".join(f" {7-bit_idx} " for bit_idx in range(8))
]

stop_bit = 0 # absolute bit position where the next parameter starts
previous_stop_bit = 0 # previous stop-bit position, after it has been changed

divide_string = indent_for_byte_numbering + 8 * "+-----" + "+"
skip_divider = False

error = False
next_line = ""
while param_idx <= len(params) and not error: # For each byte
if 8 * byte_idx == stop_bit and param_idx == len(params):
# If we have formatted the last parameter, we're done.
break

if not skip_divider:
formatted_lines.append(f"{divide_string}")
else:
skip_divider = False

if stop_bit // 8 - byte_idx > 5:
curr_param = params[param_idx - 1].short_name
formatted_lines.append(
indent_for_byte_numbering +
f" ... {stop_bit // 8 - byte_idx} bytes belonging to {curr_param} ... ")
byte_idx += stop_bit // 8 - byte_idx
continue

next_line = (
f"{(len(indent_for_byte_numbering) - 1 - len(str(byte_idx))) * ' '}{byte_idx} ")

for bit_idx in range(8):
odxassert(8 * byte_idx + bit_idx <= stop_bit)

if param_idx == len(params):
# the last param ends mid byte, there are still padding bits left
error = True
break

if 8 * byte_idx + bit_idx == stop_bit:
# END-OF-PDU fields do not exhibit a fixed bit
# length, so they need special treatment here
dct = None
if hasattr(params[param_idx], "_dop"):
dop = params[param_idx]._dop # type: ignore[attr-defined]
if hasattr(dop, "diag_coded_type"):
dct = dop.diag_coded_type

bit_length = params[param_idx].get_static_bit_length()

if dct is not None and dct.dct_type == "MIN-MAX-LENGTH-TYPE":
name = params[param_idx].short_name + " ("
if dct.termination == "END-OF-PDU":
name += "End of PDU, "
name += f"{dct.min_length}..{dct.max_length} bytes"
name += ")"
next_line += "| " + name

param_idx += 1

# adding 8 is a bit hacky here, but hey, it
# works ...
previous_stop_bit = stop_bit
stop_bit += 8

break

elif not bit_length and not allow_unknown_lengths:
# The bit length is not set for the current
# parameter, i.e. it was either not specified
# or the parameter is of variable length and
# has a type which is not handled above. In
# this case, stop trying.
error = True
break
else:
bit_length = 0 if bit_length is None else bit_length
previous_stop_bit = stop_bit
stop_bit += bit_length or (allow_unknown_lengths and 8)
name = params[param_idx].short_name + f" ({bit_length or 'Unknown'} bits)"
next_line += "| " + name

param_idx += 1

if byte_idx == stop_bit // 8:
char_pos = bit_idx * 6 + 2 + len(name)
width_of_line = (stop_bit % 8) * 6
if char_pos < width_of_line:
next_line += " " * (width_of_line - char_pos) + "|"
# start next line (belongs to same byte)
formatted_lines.append(next_line)
# fill next line with white spaces up to the
# bit where next parameter starts
next_line = indent_for_byte_numbering + (bit_idx + 1) * 6 * " "
else:
char_pos = 2 + bit_idx * 6 + len(name)
width_of_line = 8 * 6
if char_pos < width_of_line:
next_line += " " * (width_of_line - char_pos) + "|"
break
else:
if bit_idx == 0:
next_line += "|" + 5 * " "
elif bit_idx == 7:
next_line += 6 * " " + "|"
else:
next_line += 6 * " "

formatted_lines.append(next_line)
next_line = ""

if 0 < param_idx and param_idx < len(params) - 1 and hasattr(
params[param_idx],
'byte_position') and params[param_idx].byte_position == byte_idx:
# don't do anything on the first & last parameters, otherwise, if the current (= next)
# parameter has a byte_position, check if it's the same were we are on currently
# if it is, we need to repeat the operations on the new parameter
# without ending the current box
stop_bit = previous_stop_bit
skip_divider = True
continue

byte_idx += 1

if not error:
formatted_lines.append(divide_string)
return formatted_lines
else:
return []

def print_message_format(self,
indent: int = 5,
allow_unknown_lengths: bool = False,
plumbing_output: bool = True) -> None:
"""
Print a description of the message format to `stdout`.
"""

message_as_lines = self._message_format_lines(allow_unknown_lengths=allow_unknown_lengths)
if message_as_lines is not None:
print(f"{indent * ' '}" + f"\n{indent * ' '}".join(message_as_lines))
else:
print("Sorry, couldn't pretty print message layout. :(")
if plumbing_output:
for p in self.parameters:
print(indent * " " + str(p).replace("\n", f"\n{indent * ' '}"))
2 changes: 0 additions & 2 deletions odxtools/cli/_print_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ def print_service_parameters(service: DiagService,
print_fn()
print_fn(table_str)
print_fn()
print_fn(f"\n Message format of the request:")
service.request.print_message_format(indent=0, allow_unknown_lengths=allow_unknown_bit_lengths)
else:
print_fn(f" No Request!")

Expand Down
12 changes: 10 additions & 2 deletions odxtools/cli/browse.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import sys
from typing import List, Optional, Union
from tabulate import tabulate # TODO: switch to rich tables

import PyInquirer.prompt as PI_prompt

Expand All @@ -19,6 +20,7 @@
from ..request import Request
from ..response import Response
from . import _parser_utils
from ._print_utils import extract_parameter_tabulation_data

# name of the tool
_odxtools_tool_name_ = "browse"
Expand Down Expand Up @@ -339,9 +341,15 @@ def browse(odxdb: Database) -> None:
continue

sub_service = answer.get("message_type")
sub_service.print_message_format()

encode_message_interactively(sub_service, ask_user_confirmation=True)
if sub_service.request is not None:
table = extract_parameter_tabulation_data(sub_service.request.parameters)
table_str = tabulate(table, headers='keys', tablefmt='presto')
print(table_str)

encode_message_interactively(sub_service, ask_user_confirmation=True)
else:
print(f"Service '{sub_service.short_name}' does not feature a request!")


def add_subparser(subparsers: "argparse._SubParsersAction") -> None:
Expand Down
37 changes: 0 additions & 37 deletions tests/test_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,43 +302,6 @@ def _create_request(self, parameters: List[Parameter]) -> Request:
byte_size=None,
)

def test_issue_70(self) -> None:
# see https://github.com/mercedes-benz/odxtools/issues/70
# make sure overlapping params don't cause this function to go crazy
unit_kwargs = {
"base_data_type": DataType.A_UINT32,
"base_type_encoding": None,
"is_highlow_byte_order_raw": None,
"bit_mask": None,
"is_condensed_raw": None,
}
uint2 = StandardLengthType(bit_length=2, **unit_kwargs) # type: ignore[arg-type]
uint1 = StandardLengthType(bit_length=1, **unit_kwargs) # type: ignore[arg-type]
param_kwargs: Dict[str, Any] = {
"long_name": None,
"description": None,
"byte_position": None,
"semantic": None,
"sdgs": [],
"coded_value": 0,
}
params: List[Parameter] = [
CodedConstParameter(
short_name="p1", diag_coded_type=uint2, bit_position=0, **param_kwargs),
CodedConstParameter(
short_name="p2", diag_coded_type=uint2, bit_position=2, **param_kwargs),
CodedConstParameter(
short_name="p3", diag_coded_type=uint2, bit_position=3, **param_kwargs),
CodedConstParameter(
short_name="p4", diag_coded_type=uint1, bit_position=5, **param_kwargs),
CodedConstParameter(
short_name="p5", diag_coded_type=uint1, bit_position=6, **param_kwargs),
CodedConstParameter(
short_name="p6", diag_coded_type=uint1, bit_position=7, **param_kwargs),
]
req = self._create_request(params)
self.assertEqual(req._message_format_lines(), [])

def test_bit_mask(self) -> None:
unit_kwargs: Dict[str, Any] = {
"base_data_type": DataType.A_UINT32,
Expand Down

0 comments on commit fd6a242

Please sign in to comment.