diff --git a/odxtools/basicstructure.py b/odxtools/basicstructure.py index 1371587b..3acfc503 100644 --- a/odxtools/basicstructure.py +++ b/odxtools/basicstructure.py @@ -21,7 +21,6 @@ from .parameters.parameterwithdop import ParameterWithDOP from .parameters.physicalconstantparameter import PhysicalConstantParameter from .parameters.tablekeyparameter import TableKeyParameter -from .parameters.valueparameter import ValueParameter from .utils import dataclass_fields_asdict if TYPE_CHECKING: @@ -301,178 +300,3 @@ def _resolve_snrefs(self, diag_layer: "DiagLayer") -> None: for p in self.parameters: p._resolve_snrefs(diag_layer) - - def _message_format_lines(self, allow_unknown_lengths: bool = False) -> List[str]: - # sort parameters - sorted_params: list = list(self.parameters) # copy list - - def param_sort_key(param: Parameter) -> Tuple[int, int, int]: - byte_position_int = param.byte_position if param.byte_position is not None else 0 - bit_position_int = param.bit_position if param.bit_position is not None else 0 - param_bit_length_int = param.get_static_bit_length() or 0 - # take bit-length into account for overlapping parameters. 1000000 is an arbitrary number chosen - # to hopefully never be smaller than an actual bit-length - return (byte_position_int, 1000000 - param_bit_length_int, 8 - bit_position_int) - - sorted_params.sort(key=param_sort_key) - - # replace structure parameters by their sub parameters - params: List[Parameter] = [] - for p in sorted_params: - if isinstance(p, ValueParameter) and isinstance(p._dop, BasicStructure): - params += p._dop.parameters - else: - params.append(p) - - param_idx = 0 - byte_idx = 0 - # needs to be one larger than the maximum digit length of a byte number - indent_for_byte_numbering = 5 * " " - formatted_lines = [ - indent_for_byte_numbering + "".join(f" {7-bit_idx} " for bit_idx in range(8)) - ] - - stop_bit = 0 # absolute bit position where the next parameter starts - previous_stop_bit = 0 # previous stop-bit position, after it has been changed - - divide_string = indent_for_byte_numbering + 8 * "+-----" + "+" - skip_divider = False - - error = False - next_line = "" - while param_idx <= len(params) and not error: # For each byte - if 8 * byte_idx == stop_bit and param_idx == len(params): - # If we have formatted the last parameter, we're done. - break - - if not skip_divider: - formatted_lines.append(f"{divide_string}") - else: - skip_divider = False - - if stop_bit // 8 - byte_idx > 5: - curr_param = params[param_idx - 1].short_name - formatted_lines.append( - indent_for_byte_numbering + - f" ... {stop_bit // 8 - byte_idx} bytes belonging to {curr_param} ... ") - byte_idx += stop_bit // 8 - byte_idx - continue - - next_line = ( - f"{(len(indent_for_byte_numbering) - 1 - len(str(byte_idx))) * ' '}{byte_idx} ") - - for bit_idx in range(8): - odxassert(8 * byte_idx + bit_idx <= stop_bit) - - if param_idx == len(params): - # the last param ends mid byte, there are still padding bits left - error = True - break - - if 8 * byte_idx + bit_idx == stop_bit: - # END-OF-PDU fields do not exhibit a fixed bit - # length, so they need special treatment here - dct = None - if hasattr(params[param_idx], "_dop"): - dop = params[param_idx]._dop # type: ignore[attr-defined] - if hasattr(dop, "diag_coded_type"): - dct = dop.diag_coded_type - - bit_length = params[param_idx].get_static_bit_length() - - if dct is not None and dct.dct_type == "MIN-MAX-LENGTH-TYPE": - name = params[param_idx].short_name + " (" - if dct.termination == "END-OF-PDU": - name += "End of PDU, " - name += f"{dct.min_length}..{dct.max_length} bytes" - name += ")" - next_line += "| " + name - - param_idx += 1 - - # adding 8 is a bit hacky here, but hey, it - # works ... - previous_stop_bit = stop_bit - stop_bit += 8 - - break - - elif not bit_length and not allow_unknown_lengths: - # The bit length is not set for the current - # parameter, i.e. it was either not specified - # or the parameter is of variable length and - # has a type which is not handled above. In - # this case, stop trying. - error = True - break - else: - bit_length = 0 if bit_length is None else bit_length - previous_stop_bit = stop_bit - stop_bit += bit_length or (allow_unknown_lengths and 8) - name = params[param_idx].short_name + f" ({bit_length or 'Unknown'} bits)" - next_line += "| " + name - - param_idx += 1 - - if byte_idx == stop_bit // 8: - char_pos = bit_idx * 6 + 2 + len(name) - width_of_line = (stop_bit % 8) * 6 - if char_pos < width_of_line: - next_line += " " * (width_of_line - char_pos) + "|" - # start next line (belongs to same byte) - formatted_lines.append(next_line) - # fill next line with white spaces up to the - # bit where next parameter starts - next_line = indent_for_byte_numbering + (bit_idx + 1) * 6 * " " - else: - char_pos = 2 + bit_idx * 6 + len(name) - width_of_line = 8 * 6 - if char_pos < width_of_line: - next_line += " " * (width_of_line - char_pos) + "|" - break - else: - if bit_idx == 0: - next_line += "|" + 5 * " " - elif bit_idx == 7: - next_line += 6 * " " + "|" - else: - next_line += 6 * " " - - formatted_lines.append(next_line) - next_line = "" - - if 0 < param_idx and param_idx < len(params) - 1 and hasattr( - params[param_idx], - 'byte_position') and params[param_idx].byte_position == byte_idx: - # don't do anything on the first & last parameters, otherwise, if the current (= next) - # parameter has a byte_position, check if it's the same were we are on currently - # if it is, we need to repeat the operations on the new parameter - # without ending the current box - stop_bit = previous_stop_bit - skip_divider = True - continue - - byte_idx += 1 - - if not error: - formatted_lines.append(divide_string) - return formatted_lines - else: - return [] - - def print_message_format(self, - indent: int = 5, - allow_unknown_lengths: bool = False, - plumbing_output: bool = True) -> None: - """ - Print a description of the message format to `stdout`. - """ - - message_as_lines = self._message_format_lines(allow_unknown_lengths=allow_unknown_lengths) - if message_as_lines is not None: - print(f"{indent * ' '}" + f"\n{indent * ' '}".join(message_as_lines)) - else: - print("Sorry, couldn't pretty print message layout. :(") - if plumbing_output: - for p in self.parameters: - print(indent * " " + str(p).replace("\n", f"\n{indent * ' '}")) diff --git a/odxtools/cli/_print_utils.py b/odxtools/cli/_print_utils.py index 5af299c0..c29523a0 100644 --- a/odxtools/cli/_print_utils.py +++ b/odxtools/cli/_print_utils.py @@ -1,9 +1,9 @@ # SPDX-License-Identifier: MIT import re -from typing import Any, Dict, List, Optional, Union +import textwrap +from typing import Any, Callable, Dict, List, Optional, Union import markdownify -from rich import console, print from tabulate import tabulate # TODO: switch to rich tables from ..diaglayer import DiagLayer @@ -16,19 +16,17 @@ from ..parameters.valueparameter import ValueParameter from ..singleecujob import SingleEcuJob -terminal = console.Console() - -def format_desc(desc: str, ident: int = 0) -> str: +def format_desc(desc: str, indent: int = 0) -> str: # Collapse whitespaces desc = re.sub(r"\s+", " ", desc) # Covert XHTML to Markdown desc = markdownify.markdownify(desc) # Collapse blank lines desc = re.sub(r"(\n\s*)+\n+", "\n", desc).strip() + # add indentation + desc = textwrap.indent(desc, " " * indent) - if "\n" in desc: - desc = "\n" + ident * " " + ("\n" + ident * " ").join(desc.split("\n")) return desc @@ -38,105 +36,83 @@ def print_diagnostic_service(service: DiagService, print_state_transitions: bool = False, print_audiences: bool = False, allow_unknown_bit_lengths: bool = False, - plumbing_output: bool = False) -> None: - print(f" [cyan]{service.short_name}[/cyan] ") + print_fn: Callable = print) -> None: + + print_fn(f" Service '{service.short_name}':") if service.description: - desc = format_desc(service.description, ident=3) - print(f" Service description: " + desc) + desc = format_desc(service.description, indent=3) + print_fn(f" Description: " + desc) if print_pre_condition_states and len(service.pre_condition_states) > 0: pre_condition_states_short_names = [ pre_condition_state.short_name for pre_condition_state in service.pre_condition_states ] - print(f" Pre-Condition-States: {', '.join(pre_condition_states_short_names)}") + print_fn(f" Pre-Condition States: {', '.join(pre_condition_states_short_names)}") if print_state_transitions and len(service.state_transitions) > 0: state_transitions = [ f"{state_transition.source_snref} -> {state_transition.target_snref}" for state_transition in service.state_transitions ] - print(f" State-Transitions: {', '.join(state_transitions)}") + print_fn(f" State Transitions: {', '.join(state_transitions)}") if print_audiences and service.audience: enabled_audiences_short_names = [ enabled_audience.short_name for enabled_audience in service.audience.enabled_audiences ] - print(f" Enabled-Audiences: {', '.join(enabled_audiences_short_names)}") + print_fn(f" Enabled Audiences: {', '.join(enabled_audiences_short_names)}") if print_params: print_service_parameters( - service, - allow_unknown_bit_lengths=allow_unknown_bit_lengths, - plumbing_output=plumbing_output) + service, allow_unknown_bit_lengths=allow_unknown_bit_lengths, print_fn=print_fn) def print_service_parameters(service: DiagService, allow_unknown_bit_lengths: bool = False, - plumbing_output: bool = False) -> None: - # prints parameter details of request, posivite response and negative response of diagnostic service - - assert service.request is not None - assert service.positive_responses is not None - assert service.negative_responses is not None + print_fn: Callable = print) -> None: + # prints parameter details of request, positive response and negative response of diagnostic service # Request - print(f"\n [yellow]Request Properties[/yellow]:") - print(f" Request Name: {service.request.short_name}") - - if service.request and not service.request.required_parameters: - ba = f" Byte-Array: {service()!r}" - hs = f" Hex-String: 0x{str(service().hex().upper())}" - terminal.print(ba, overflow="ellipsis", soft_wrap=True) - terminal.print(hs, overflow="ellipsis", soft_wrap=True) + if service.request: + print_fn(f" Request '{service.request.short_name}':") + const_prefix = service.request.coded_const_prefix() + print_fn( + f" Identifying Prefix: 0x{const_prefix.hex().upper()} ({bytes(const_prefix)!r})") + print_fn(f" Parameters:") + table = extract_parameter_tabulation_data(list(service.request.parameters)) + table_str = textwrap.indent(tabulate(table, headers='keys', tablefmt='presto'), " ") + print_fn() + print_fn(table_str) + print_fn() else: - print(f" Byte-Array: ---\n Hex-String: ---") - - print(f" Service Parameters: {service.request.parameters}\n") - table = extract_parameter_tabulation_data(list(service.request.parameters)) - print(tabulate(table, headers='keys', tablefmt='presto')) - print(f"\n Message format of the request:") - service.request.print_message_format( - indent=0, allow_unknown_lengths=allow_unknown_bit_lengths, plumbing_output=plumbing_output) - - # Positive Response - print(f"\n [yellow]Positive Response Properties[/yellow]:") - print(f" Number of Positive Responses: {len(service.positive_responses)}") - print(f" Positive Responses: {service.positive_responses}") - if len(service.positive_responses) == 1: - resp = service.positive_responses[0] - print(f" Service Parameters: {resp.parameters}\n") + print_fn(f" No Request!") + + # Positive Responses + if not service.positive_responses: + print_fn(f" No positive responses") + + for resp in service.positive_responses: + print_fn(f" Positive Response '{resp.short_name}':") + print_fn(f" Parameters:\n") table = extract_parameter_tabulation_data(list(resp.parameters)) - print(tabulate(table, headers='keys', tablefmt='presto')) - print(f"\n Message format of the positive response:") - resp.print_message_format( - indent=0, - allow_unknown_lengths=allow_unknown_bit_lengths, - plumbing_output=plumbing_output) + table_str = textwrap.indent(tabulate(table, headers='keys', tablefmt='presto'), " ") + print_fn(table_str) + print_fn() # Negative Response - print(f"\n [yellow]Negative Response Properties[/yellow]:") - print(f" Number of Negative Responses: {len(service.negative_responses)}") - print(f" Negative Responses: {service.negative_responses}") - if len(service.negative_responses) == 1: - resp = service.negative_responses[0] - print(f" Service Parameters: {resp.parameters}\n") - table = extract_parameter_tabulation_data(list(resp.parameters)) - print(tabulate(table, headers='keys', tablefmt='presto')) - print(f"\n Message format of a negative response:") - resp.print_message_format( - indent=0, - allow_unknown_lengths=allow_unknown_bit_lengths, - plumbing_output=plumbing_output) + if not service.negative_responses: + print_fn(f" No negative responses") - print("\n") + for resp in service.negative_responses: + print_fn(f" Negative Response '{resp.short_name}':") + print_fn(f" Parameters:\n") + table = extract_parameter_tabulation_data(list(resp.parameters)) + table_str = textwrap.indent(tabulate(table, headers='keys', tablefmt='presto'), " ") + print_fn(table_str) + print_fn() - if (service.positive_responses and - len(service.positive_responses) > 1) or (service.negative_responses and - len(service.negative_responses) > 1): - # Does this ever happen? - raise NotImplementedError( - f"The diagnostic service {service.odx_id} offers more than one response!") + print_fn("\n") def extract_service_tabulation_data(services: List[DiagService]) -> Dict[str, Any]: @@ -151,9 +127,10 @@ def extract_service_tabulation_data(services: List[DiagService]) -> Dict[str, An name.append(service.short_name) semantic.append(service.semantic) - if service.request and not service.request.required_parameters: - request.append(f"0x{str(s.hex().upper())[:32]}...") if len( - s := service()) > 32 else request.append(f"0x{str(s.hex().upper())}") + if service.request: + prefix = service.request.coded_const_prefix() + request.append(f"0x{str(prefix.hex().upper())[:32]}...") if len( + prefix) > 32 else request.append(f"0x{str(prefix.upper())}") else: request.append(None) @@ -200,9 +177,18 @@ def extract_parameter_tabulation_data(parameters: List[Parameter]) -> Dict[str, value_type.append('coded values') dop.append(None) elif isinstance(param, (PhysicalConstantParameter, SystemParameter, ValueParameter)): - dop.append(param.dop.short_name) - if (tmp := getattr(param, "physical_type", None)) is not None: - data_type.append(tmp.base_data_type.name) + # this is a hack to make this routine work for parameters + # which reference DOPs of a type that a is not yet + # internalized. (all parameter objects of the tested types + # are supposed to have a DOP.) + param_dop = getattr(param, "_dop", None) + + if param_dop is not None: + dop.append(param_dop.short_name) + + if param_dop is not None and (phys_type := getattr(param, "physical_type", + None)) is not None: + data_type.append(phys_type.base_data_type.name) else: data_type.append(None) if isinstance(param, PhysicalConstantParameter): @@ -236,12 +222,12 @@ def extract_parameter_tabulation_data(parameters: List[Parameter]) -> Dict[str, 'Parameter Type': param_type, 'Data Type': data_type, 'Value': value, - 'Value Description': value_type, + 'Value Type': value_type, 'Linked DOP': dop } -def print_dl_metrics(variants: List[DiagLayer]) -> None: +def print_dl_metrics(variants: List[DiagLayer], print_fn: Callable = print) -> None: name = [] type = [] @@ -265,4 +251,4 @@ def print_dl_metrics(variants: List[DiagLayer]) -> None: 'Number of DOPs': num_dops, 'Number of communication parameters': num_comparams } - print(tabulate(table, headers='keys', tablefmt='presto')) + print_fn(tabulate(table, headers='keys', tablefmt='presto')) diff --git a/odxtools/cli/browse.py b/odxtools/cli/browse.py index 4fa056da..cf302ec4 100644 --- a/odxtools/cli/browse.py +++ b/odxtools/cli/browse.py @@ -3,6 +3,7 @@ import logging import sys from typing import List, Optional, Union +from tabulate import tabulate # TODO: switch to rich tables import PyInquirer.prompt as PI_prompt @@ -19,6 +20,7 @@ from ..request import Request from ..response import Response from . import _parser_utils +from ._print_utils import extract_parameter_tabulation_data # name of the tool _odxtools_tool_name_ = "browse" @@ -92,7 +94,7 @@ def prompt_single_parameter_value(parameter: Parameter) -> Optional[AtomicOdxTyp choices = [scale.compu_const for scale in scales if scale is not None] param_prompt[0]["choices"] = choices - answer = PI_prompt.prompt(param_prompt) + answer = PI_prompt(param_prompt) if answer.get(parameter.short_name) == "" and not parameter.is_required: return None elif parameter.physical_type.base_data_type is not None: @@ -130,7 +132,7 @@ def encode_message_interactively(sub_service: Union[Request, Response], "message": f"Do you want to encode a message? [y/n]", "choices": ["yes", "no"], }] - answer = PI_prompt.prompt(encode_message_prompt) + answer = PI_prompt(encode_message_prompt) if answer.get("yes_no_prompt") == "no": return @@ -145,7 +147,7 @@ def encode_message_interactively(sub_service: Union[Request, Response], "filter": lambda input: _convert_string_to_bytes(input), }] - answer = PI_prompt.prompt(answered_request_prompt) + answer = PI_prompt(answered_request_prompt) answered_request = answer.get("request") print(f"Input interpretation as list: {list(answered_request)}") @@ -264,7 +266,7 @@ def browse(odxdb: Database) -> None: "message": "Select a Variant.", "choices": list(dl_names) + ["[exit]"], }] - answer = PI_prompt.prompt(selection) + answer = PI_prompt(selection) if answer.get("variant") == "[exit]": return @@ -300,7 +302,7 @@ def browse(odxdb: Database) -> None: f"The variant {variant.short_name} offers the following services. Select one!", "choices": [s.short_name for s in services] + ["[back]"], }] - answer = PI_prompt.prompt(selection) + answer = PI_prompt(selection) if answer.get("service") == "[back]": break @@ -334,14 +336,18 @@ def browse(odxdb: Database) -> None: "short": f"Negative response: {nr.short_name}", } for nr in service.negative_responses] + ["[back]"], # type: ignore }] - answer = PI_prompt.prompt(selection) + answer = PI_prompt(selection) if answer.get("message_type") == "[back]": continue - sub_service = answer.get("message_type") - sub_service.print_message_format() + codec = answer.get("message_type") - encode_message_interactively(sub_service, ask_user_confirmation=True) + if codec is not None: + table = extract_parameter_tabulation_data(codec.parameters) + table_str = tabulate(table, headers='keys', tablefmt='presto') + print(table_str) + + encode_message_interactively(codec, ask_user_confirmation=True) def add_subparser(subparsers: "argparse._SubParsersAction") -> None: diff --git a/odxtools/cli/compare.py b/odxtools/cli/compare.py index 7b8147e7..23e4664d 100644 --- a/odxtools/cli/compare.py +++ b/odxtools/cli/compare.py @@ -106,8 +106,7 @@ def print_dl_changes(self, service_dict: SpecsServiceDict) -> None: print(tabulate(detailed_info, headers='keys', tablefmt='presto')) if self.param_detailed: # print all parameter details of diagnostic service - print_service_parameters( - service, allow_unknown_bit_lengths=True, plumbing_output=self.obj_detailed) + print_service_parameters(service, allow_unknown_bit_lengths=True) def print_database_changes(self, changes_variants: SpecsChangesVariants) -> None: # prints result of database comparison (input variable: dictionary: changes_variants) @@ -590,13 +589,6 @@ def add_subparser(subparsers: "argparse._SubParsersAction") -> None: help="Don't show all service parameter details", ) - parser.add_argument( - "-po", - "--plumbing-output", - action="store_true", - required=False, - help="Print full objects instead of selected and formatted attributes", - ) # TODO # Idea: provide folder with multiple pdx files as argument # -> load all pdx files in folder, sort them alphabetically, compare databases pairwaise @@ -608,7 +600,6 @@ def run(args: argparse.Namespace) -> None: task = Comparison() task.param_detailed = args.no_details - task.obj_detailed = args.plumbing_output db_names = [args.pdx_file if isinstance(args.pdx_file, str) else str(args.pdx_file[0])] diff --git a/odxtools/cli/find.py b/odxtools/cli/find.py index a568a12d..37b5c40b 100644 --- a/odxtools/cli/find.py +++ b/odxtools/cli/find.py @@ -26,8 +26,7 @@ def print_summary(odxdb: Database, service_names: List[str], ecu_variants: Optional[List[str]] = None, allow_unknown_bit_lengths: bool = False, - print_params: bool = False, - plumbing_output: bool = False) -> None: + print_params: bool = False) -> None: ecu_names = ecu_variants if ecu_variants else [ecu.short_name for ecu in odxdb.ecus] service_db: Dict[str, DiagService] = {} service_ecus: Dict[str, List[str]] = {} @@ -60,8 +59,7 @@ def print_summary(odxdb: Database, allow_unknown_bit_lengths=allow_unknown_bit_lengths, print_pre_condition_states=True, print_state_transitions=True, - print_audiences=True, - plumbing_output=plumbing_output) + print_audiences=True) elif isinstance(service, SingleEcuJob): print(f"SingleEcuJob: {service.odx_id}") else: @@ -121,14 +119,6 @@ def add_subparser(subparsers: "argparse._SubParsersAction") -> None: help="Relax output formatting rules (allow unknown bitlengths for ascii representation)", ) - parser.add_argument( - "-po", - "--plumbing-output", - action="store_true", - required=False, - help="Print full objects instead of selected and formatted attributes", - ) - def run(args: argparse.Namespace) -> None: odxdb = _parser_utils.load_file(args) @@ -139,5 +129,4 @@ def run(args: argparse.Namespace) -> None: ecu_variants=None if variants == "all" else variants, service_names=args.service_names, print_params=not args.no_details, - allow_unknown_bit_lengths=args.relaxed_output, - plumbing_output=args.plumbing_output) + allow_unknown_bit_lengths=args.relaxed_output) diff --git a/odxtools/cli/list.py b/odxtools/cli/list.py index e4c6c182..a15eeec4 100644 --- a/odxtools/cli/list.py +++ b/odxtools/cli/list.py @@ -2,7 +2,7 @@ import argparse from typing import Callable, List, Optional -from rich import print +import rich from ..database import Database from ..diagcomm import DiagComm @@ -27,8 +27,7 @@ def print_summary(odxdb: Database, print_audiences: bool = False, allow_unknown_bit_lengths: bool = False, variants: Optional[List[str]] = None, - service_filter: Callable[[DiagComm], bool] = lambda x: True, - plumbing_output: bool = False) -> None: + service_filter: Callable[[DiagComm], bool] = lambda x: True) -> None: diag_layer_names = [dl.short_name for dl in odxdb.diag_layers] diag_layers: List[DiagLayer] = [] @@ -41,18 +40,18 @@ def print_summary(odxdb: Database, diag_layers.append([x for x in odxdb.diag_layers if x.short_name == name][0]) else: - print(f"The variant '{name}' could not be found!") + rich.print(f"The variant '{name}' could not be found!") return if diag_layers: - print("\n") - print(f"Overview of diagnostic layers: ") - print_dl_metrics(diag_layers) + rich.print("\n") + rich.print(f"Overview of diagnostic layers: ") + print_dl_metrics(diag_layers, print_fn=rich.print) for dl in diag_layers: - print("\n") - print(f"[green]Diagnostic layer:[/green] '[bold white]{dl.short_name}[/bold white]'") - print(f" [blue]Variant Type[/blue]: {dl.variant_type.value}") + rich.print("\n") + rich.print(f"Diagnostic layer: '{dl.short_name}'") + rich.print(f" Variant Type: {dl.variant_type.value}") assert isinstance(dl, DiagLayer) all_services: List[DiagComm] = sorted(dl.services, key=lambda x: x.short_name) @@ -62,33 +61,26 @@ def print_summary(odxdb: Database, for proto in dl.protocols: if (can_rx_id := dl.get_can_receive_id(proto.short_name)) is not None: - print( - f" [blue]CAN receive ID[/blue] for protocol '{proto.short_name}': 0x{can_rx_id:x}" - ) + rich.print(f" CAN receive ID for protocol '{proto.short_name}': 0x{can_rx_id:x}") if (can_tx_id := dl.get_can_send_id(proto.short_name)) is not None: - print( - f" [blue]CAN send ID[/blue] for protocol '{proto.short_name}': 0x{can_tx_id:x}" - ) + rich.print(f" CAN send ID for protocol '{proto.short_name}': 0x{can_tx_id:x}") if dl.description: - desc = format_desc(dl.description, ident=2) - print(f" [blue]Description[/blue]: " + desc) + desc = format_desc(dl.description, indent=2) + rich.print(f" Description: " + desc) if print_global_negative_responses and dl.global_negative_responses: - print("\n") - print(f"The [blue]global negative responses[/blue] of '{dl.short_name}' are: ") + rich.print("\n") + rich.print(f"The global negative responses of '{dl.short_name}' are: ") for gnr in dl.global_negative_responses: - if plumbing_output: - print(f" {gnr}") - else: - print(f" {gnr.short_name}") + rich.print(f" {gnr.short_name}") if print_services and len(all_services) > 0: services = [s for s in all_services if service_filter(s)] if len(services) > 0: - print("\n") - print(f"The [blue]services[/blue] of '{dl.short_name}' are: ") + rich.print("\n") + rich.print(f"The services of '{dl.short_name}' are: ") for service in services: if isinstance(service, DiagService): print_diagnostic_service( @@ -98,29 +90,26 @@ def print_summary(odxdb: Database, print_state_transitions=print_state_transitions, print_audiences=print_audiences, allow_unknown_bit_lengths=allow_unknown_bit_lengths, - plumbing_output=plumbing_output) + print_fn=rich.print) elif isinstance(service, SingleEcuJob): - print(f" [blue]Single ECU job[/blue]: {service.odx_id}") + rich.print(f" Single ECU job: {service.odx_id}") else: - print(f" Unidentifiable service: {service}") + rich.print(f" Unidentifiable service: {service}") if print_dops and len(data_object_properties) > 0: - print("\n") - print(f"The [blue]DOPs[/blue] of the {dl.variant_type.value} '{dl.short_name}' are: ") + rich.print("\n") + rich.print(f"The DOPs of the {dl.variant_type.value} '{dl.short_name}' are: ") for dop in sorted( data_object_properties, key=lambda x: (type(x).__name__, x.short_name)): - if plumbing_output: - print(" " + str(dop).replace("\n", "\n ")) - else: - print(" " + str(dop.short_name).replace("\n", "\n ")) + rich.print(" " + str(dop.short_name).replace("\n", "\n ")) if print_comparams and len(comparams) > 0: - print("\n") - print( - f"The [blue]communication parameters[/blue] of the {dl.variant_type.value} '{dl.short_name}' are: " + rich.print("\n") + rich.print( + f"The communication parameters of the {dl.variant_type.value} '{dl.short_name}' are: " ) for com_param in comparams: - print(f" {com_param.short_name}: {com_param.value}") + rich.print(f" {com_param.short_name}: {com_param.value}") def add_subparser(subparsers: "argparse._SubParsersAction") -> None: @@ -205,17 +194,21 @@ def add_subparser(subparsers: "argparse._SubParsersAction") -> None: ) parser.add_argument( - "-po", - "--plumbing-output", + "--dump-database", action="store_true", required=False, - help="Print full objects instead of selected and formatted attributes", + help="Ignore all other parameters and print a comprehensive dump the full database " + "instead of providing a pretty-printed summary", ) def run(args: argparse.Namespace) -> None: odxdb = _parser_utils.load_file(args) + if args.dump_database: + print(repr(odxdb)) + return + def service_filter(s: DiagComm) -> bool: if args.services and len(args.services) > 0: return s.short_name in args.services @@ -234,5 +227,4 @@ def service_filter(s: DiagComm) -> bool: print_pre_condition_states=args.all, print_state_transitions=args.all, print_audiences=args.all, - allow_unknown_bit_lengths=args.all, - plumbing_output=args.plumbing_output) + allow_unknown_bit_lengths=args.all) diff --git a/odxtools/nameditemlist.py b/odxtools/nameditemlist.py index 37a60148..861f0467 100644 --- a/odxtools/nameditemlist.py +++ b/odxtools/nameditemlist.py @@ -172,7 +172,7 @@ def __str__(self) -> str: return f"[{', '.join( [self._get_item_key(x) for x in self])}]" def __repr__(self) -> str: - return self.__str__() + return f"{type(self).__name__}([{', '.join([repr(x) for x in self])}])" class NamedItemList(ItemAttributeList[T]): diff --git a/tests/test_cli.py b/tests/test_cli.py index 4f9c7f48..a9a4b786 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -27,7 +27,7 @@ def run_list_tool(path_to_pdx_file: str = "./examples/somersault.pdx", print_params: bool = False, print_dops: bool = False, print_all: bool = False, - plumbing_output: bool = False) -> None: + dump_database: bool = False) -> None: list_args = Namespace( pdx_file=path_to_pdx_file, variants=ecu_variants, @@ -36,7 +36,7 @@ def run_list_tool(path_to_pdx_file: str = "./examples/somersault.pdx", params=print_params, dops=print_dops, all=print_all, - plumbing_output=plumbing_output) + dump_database=dump_database) list_tool.run(list_args) @@ -58,14 +58,14 @@ def run_find_tool(service_names: List[str], ecu_variants: Optional[List[str]] = None, allow_unknown_bit_lengths: bool = False, no_details: bool = False, - plumbing_output: bool = False) -> None: + dump_database: bool = False) -> None: find_args = Namespace( pdx_file=path_to_pdx_file, variants=ecu_variants, service_names=service_names, relaxed_output=allow_unknown_bit_lengths, no_details=no_details, - plumbing_output=plumbing_output) + dump_database=dump_database) find.run(find_args) @@ -73,15 +73,13 @@ def run_find_tool(service_names: List[str], def run_compare_tool(path_to_pdx_file: str = "./examples/somersault.pdx", ecu_variants: Optional[List[str]] = None, database: Optional[List[str]] = None, - no_details: bool = True, - plumbing_output: bool = False) -> None: + no_details: bool = True) -> None: compare_args = Namespace( pdx_file=path_to_pdx_file, variants=ecu_variants, database=database, - no_details=no_details, - plumbing_output=plumbing_output) + no_details=no_details) compare.run(compare_args) @@ -96,7 +94,7 @@ def test_list_tool(self) -> None: UtilFunctions.run_list_tool(print_params=True) UtilFunctions.run_list_tool(print_dops=True) UtilFunctions.run_list_tool(print_all=True) - UtilFunctions.run_list_tool(plumbing_output=True) + UtilFunctions.run_list_tool(dump_database=True) UtilFunctions.run_list_tool(ecu_services=["session_start"]) def test_decode_tool(self) -> None: @@ -124,8 +122,7 @@ def test_compare_tool(self) -> None: UtilFunctions.run_compare_tool(ecu_variants=[ "somersault_lazy", "somersault_assiduous", "somersault_young", "somersault_old" ]) - UtilFunctions.run_compare_tool( - ecu_variants=["somersault_lazy", "somersault_assiduous"], plumbing_output=True) + UtilFunctions.run_compare_tool(ecu_variants=["somersault_lazy", "somersault_assiduous"]) @unittest.skipIf(import_failed, "import of PyInquirer failed") def test_browse_tool(self) -> None: diff --git a/tests/test_encoding.py b/tests/test_encoding.py index 1d2af215..669a578d 100644 --- a/tests/test_encoding.py +++ b/tests/test_encoding.py @@ -302,43 +302,6 @@ def _create_request(self, parameters: List[Parameter]) -> Request: byte_size=None, ) - def test_issue_70(self) -> None: - # see https://github.com/mercedes-benz/odxtools/issues/70 - # make sure overlapping params don't cause this function to go crazy - unit_kwargs = { - "base_data_type": DataType.A_UINT32, - "base_type_encoding": None, - "is_highlow_byte_order_raw": None, - "bit_mask": None, - "is_condensed_raw": None, - } - uint2 = StandardLengthType(bit_length=2, **unit_kwargs) # type: ignore[arg-type] - uint1 = StandardLengthType(bit_length=1, **unit_kwargs) # type: ignore[arg-type] - param_kwargs: Dict[str, Any] = { - "long_name": None, - "description": None, - "byte_position": None, - "semantic": None, - "sdgs": [], - "coded_value": 0, - } - params: List[Parameter] = [ - CodedConstParameter( - short_name="p1", diag_coded_type=uint2, bit_position=0, **param_kwargs), - CodedConstParameter( - short_name="p2", diag_coded_type=uint2, bit_position=2, **param_kwargs), - CodedConstParameter( - short_name="p3", diag_coded_type=uint2, bit_position=3, **param_kwargs), - CodedConstParameter( - short_name="p4", diag_coded_type=uint1, bit_position=5, **param_kwargs), - CodedConstParameter( - short_name="p5", diag_coded_type=uint1, bit_position=6, **param_kwargs), - CodedConstParameter( - short_name="p6", diag_coded_type=uint1, bit_position=7, **param_kwargs), - ] - req = self._create_request(params) - self.assertEqual(req._message_format_lines(), []) - def test_bit_mask(self) -> None: unit_kwargs: Dict[str, Any] = { "base_data_type": DataType.A_UINT32,