From f3c151a2c70332ab9ff634fbeb0a13ba7af7e3a9 Mon Sep 17 00:00:00 2001 From: Andreas Lauser Date: Mon, 19 Feb 2024 12:22:18 +0100 Subject: [PATCH] DynamicLengthField: implement support for en- and decoding this also includes minor changes to the encoding infrastructure to bring it closer to how decoding now works and to make it more convenient. Signed-off-by: Andreas Lauser Signed-off-by: Gerrit Ecke --- odxtools/basicstructure.py | 13 +- odxtools/diagservice.py | 3 +- odxtools/dynamiclengthfield.py | 68 +++++- odxtools/encodestate.py | 29 ++- odxtools/endofpdufield.py | 3 +- odxtools/parameters/parameter.py | 27 +-- odxtools/parameters/tablestructparameter.py | 2 +- tests/test_decoding.py | 229 +++++++++++++++++++- tests/test_diag_coded_types.py | 20 +- 9 files changed, 343 insertions(+), 51 deletions(-) diff --git a/odxtools/basicstructure.py b/odxtools/basicstructure.py index 34b3ec65..3726f5e4 100644 --- a/odxtools/basicstructure.py +++ b/odxtools/basicstructure.py @@ -73,11 +73,12 @@ def get_static_bit_length(self) -> Optional[int]: def coded_const_prefix(self, request_prefix: bytes = b'') -> bytes: prefix = b'' - encode_state = EncodeState(prefix, parameter_values={}, triggering_request=request_prefix) + encode_state = EncodeState( + bytearray(prefix), parameter_values={}, triggering_request=request_prefix) for param in self.parameters: if isinstance(param, (CodedConstParameter, NrcConstParameter, MatchingRequestParameter, PhysicalConstantParameter)): - encode_state.coded_message = param.encode_into_pdu(encode_state) + encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state)) else: break return encode_state.coded_message @@ -132,7 +133,7 @@ def convert_physical_to_internal(self, odxraise(f"Value for unknown parameter '{param_key}' specified") encode_state = EncodeState( - b'', + bytearray(), dict(param_value), triggering_request=triggering_coded_request, is_end_of_pdu=False, @@ -157,11 +158,11 @@ def convert_physical_to_internal(self, # into the PDU here and add the real value of the # parameter in a post processing step. tmp = encode_state.parameter_values.pop(param.short_name) - encode_state.coded_message = param.encode_into_pdu(encode_state) + encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state)) encode_state.parameter_values[param.short_name] = tmp continue - encode_state.coded_message = param.encode_into_pdu(encode_state) + encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state)) if self.byte_size is not None and len(encode_state.coded_message) < self.byte_size: # Padding bytes needed @@ -176,7 +177,7 @@ def convert_physical_to_internal(self, continue # Encode the key parameter into the message - encode_state.coded_message = param.encode_into_pdu(encode_state) + encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state)) # Assert that length is as expected self._validate_coded_message(encode_state.coded_message) diff --git a/odxtools/diagservice.py b/odxtools/diagservice.py index baaac10e..2c633335 100644 --- a/odxtools/diagservice.py +++ b/odxtools/diagservice.py @@ -231,7 +231,8 @@ def encode_request(self, **params: ParameterValue) -> bytes: missing_params = {x.short_name for x in self.request.required_parameters}.difference(params.keys()) - odxassert(not missing_params, f"The parameters {missing_params} are required but missing!") + odxassert( + len(missing_params) == 0, f"The parameters {missing_params} are required but missing!") # make sure that no unknown parameters are specified rq_all_param_names = {x.short_name for x in self.request.parameters} diff --git a/odxtools/dynamiclengthfield.py b/odxtools/dynamiclengthfield.py index 48c39e40..2bf8b809 100644 --- a/odxtools/dynamiclengthfield.py +++ b/odxtools/dynamiclengthfield.py @@ -6,7 +6,7 @@ from .decodestate import DecodeState from .determinenumberofitems import DetermineNumberOfItems from .encodestate import EncodeState -from .exceptions import odxrequire +from .exceptions import DecodeError, EncodeError, odxassert, odxraise, odxrequire from .field import Field from .odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId from .odxtypes import ParameterValue @@ -49,11 +49,71 @@ def _resolve_snrefs(self, diag_layer: "DiagLayer") -> None: def convert_physical_to_bytes( self, - physical_value: ParameterValue, + physical_values: ParameterValue, encode_state: EncodeState, bit_position: int = 0, ) -> bytes: - raise NotImplementedError() + + odxassert(bit_position == 0, "No bit position can be specified for dynamic length fields!") + if not isinstance(physical_values, list): + odxraise( + f"Expected a list of values for dynamic length field {self.short_name}, " + f"got {type(physical_values)}", EncodeError) + + det_num_items = self.determine_number_of_items + num_item = det_num_items.dop.convert_physical_to_bytes( + len(physical_values), encode_state, det_num_items.bit_position or 0) + + # hack to emplace the length specifier at the correct location + tmp = encode_state.coded_message + encode_state.coded_message = bytearray() + encode_state.emplace_atomic_value(num_item, det_num_items.byte_position, + self.short_name + ".num_items") + result = encode_state.coded_message + encode_state.coded_message = tmp + + # if required, add padding between the length specifier and + # the first item + if len(result) < self.offset: + result.extend([0] * (self.offset - len(result))) + elif len(result) > self.offset: + odxraise(f"The length specifier of field {self.short_name} overlaps " + f"with the first item!") + + for value in physical_values: + result += self.structure.convert_physical_to_bytes(value, encode_state) + + return result def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue: - raise NotImplementedError() + + odxassert(decode_state.cursor_bit_position == 0, + "No bit position can be specified for dynamic length fields!") + + orig_origin = decode_state.origin_byte_position + orig_cursor = decode_state.cursor_byte_position + + det_num_items = self.determine_number_of_items + decode_state.origin_byte_position = decode_state.cursor_byte_position + decode_state.cursor_byte_position = decode_state.origin_byte_position + det_num_items.byte_position + decode_state.cursor_bit_position = det_num_items.bit_position or 0 + + n = det_num_items.dop.decode_from_pdu(decode_state) + + if not isinstance(n, int): + odxraise(f"Number of items specified by a dynamic length field {self.short_name} " + f"must be an integer (is: {type(n).__name__})") + elif n < 0: + odxraise( + f"Number of items specified by a dynamic length field {self.short_name} " + f"must be positive (is: {n})", DecodeError) + else: + decode_state.cursor_byte_position = decode_state.origin_byte_position + self.offset + result: List[ParameterValue] = [] + for _ in range(n): + result.append(self.structure.decode_from_pdu(decode_state)) + + decode_state.origin_byte_position = orig_origin + decode_state.cursor_byte_position = max(orig_cursor, decode_state.cursor_byte_position) + + return result diff --git a/odxtools/encodestate.py b/odxtools/encodestate.py index 4797c716..2d39d7aa 100644 --- a/odxtools/encodestate.py +++ b/odxtools/encodestate.py @@ -1,7 +1,10 @@ # SPDX-License-Identifier: MIT +import warnings from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, Dict, Optional +from .exceptions import OdxWarning + if TYPE_CHECKING: from .tablerow import TableRow @@ -11,8 +14,8 @@ class EncodeState: """Utility class to holding the state variables needed for encoding a message. """ - #: payload that is constructed so far - coded_message: bytes + #: payload that has been constructed so far + coded_message: bytearray #: a mapping from short name to value for each parameter parameter_values: Dict[str, Any] @@ -31,3 +34,25 @@ class EncodeState: #: Flag whether we are currently the last parameter of the PDU #: (needed for MinMaxLengthType) is_end_of_pdu: bool = False + + def emplace_atomic_value(self, + new_data: bytes, + pos: Optional[int] = None, + param_name: str = "unknown") -> None: + if pos is None: + pos = len(self.coded_message) + + # Make blob longer if necessary + min_length = pos + len(new_data) + if len(self.coded_message) < min_length: + self.coded_message.extend([0] * (min_length - len(self.coded_message))) + + for byte_idx_val, byte_idx_rpc in enumerate(range(pos, pos + len(new_data))): + # insert byte value + if self.coded_message[byte_idx_rpc] & new_data[byte_idx_val] != 0: + warnings.warn( + f"Parameter '{param_name}' overlaps with another parameter (bytes are already set)", + OdxWarning, + stacklevel=1, + ) + self.coded_message[byte_idx_rpc] |= new_data[byte_idx_val] diff --git a/odxtools/endofpdufield.py b/odxtools/endofpdufield.py index 36527e6a..c94b25eb 100644 --- a/odxtools/endofpdufield.py +++ b/odxtools/endofpdufield.py @@ -45,12 +45,13 @@ def convert_physical_to_bytes( encode_state: EncodeState, bit_position: int = 0, ) -> bytes: + odxassert( bit_position == 0, "End of PDU field must be byte aligned. " "Is there an error in reading the .odx?", EncodeError) if not isinstance(physical_values, list): odxraise( - f"Expected a list of values for structure {self.short_name}, " + f"Expected a list of values for end-of-pdu field {self.short_name}, " f"got {type(physical_values)}", EncodeError) coded_message = b'' diff --git a/odxtools/parameters/parameter.py b/odxtools/parameters/parameter.py index e8743492..470f8572 100644 --- a/odxtools/parameters/parameter.py +++ b/odxtools/parameters/parameter.py @@ -1,13 +1,11 @@ # SPDX-License-Identifier: MIT import abc -import warnings from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional from ..decodestate import DecodeState from ..element import NamedElement from ..encodestate import EncodeState -from ..exceptions import OdxWarning from ..odxlink import OdxLinkDatabase, OdxLinkId from ..odxtypes import ParameterValue from ..specialdatagroup import SpecialDataGroup @@ -155,27 +153,6 @@ def encode_into_pdu(self, encode_state: EncodeState) -> bytes: else: byte_position = len(msg_blob) - return self._encode_into_blob(msg_blob, param_blob, byte_position) + encode_state.emplace_atomic_value(param_blob, byte_position, self.short_name) - def _encode_into_blob(self, blob: bytes, new_data: bytes, pos: Optional[int] = None) -> bytes: - if pos is None: - pos = len(blob) - - # Make blob longer if necessary - min_length = pos + len(new_data) - - result_blob = bytearray(blob) - if len(blob) < min_length: - result_blob.extend([0] * (min_length - len(blob))) - - for byte_idx_val, byte_idx_rpc in enumerate(range(pos, pos + len(new_data))): - # insert byte value - if result_blob[byte_idx_rpc] & new_data[byte_idx_val] != 0: - warnings.warn( - f"Parameter {self.short_name} overlaps with another parameter (bytes are already set)", - OdxWarning, - stacklevel=1, - ) - result_blob[byte_idx_rpc] |= new_data[byte_idx_val] - - return result_blob + return encode_state.coded_message diff --git a/odxtools/parameters/tablestructparameter.py b/odxtools/parameters/tablestructparameter.py index 28972cdc..c4a77068 100644 --- a/odxtools/parameters/tablestructparameter.py +++ b/odxtools/parameters/tablestructparameter.py @@ -107,7 +107,7 @@ def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes: if tr.structure is not None: # the selected table row references a structure inner_encode_state = EncodeState( - coded_message=b'', + coded_message=bytearray(b''), parameter_values=tr_value, triggering_request=encode_state.triggering_request) diff --git a/tests/test_decoding.py b/tests/test_decoding.py index 459fbd7f..f3783905 100644 --- a/tests/test_decoding.py +++ b/tests/test_decoding.py @@ -6,6 +6,7 @@ from odxtools.compumethods.limit import IntervalType, Limit from odxtools.compumethods.linearcompumethod import LinearCompuMethod from odxtools.dataobjectproperty import DataObjectProperty +from odxtools.determinenumberofitems import DetermineNumberOfItems from odxtools.diagdatadictionaryspec import DiagDataDictionarySpec from odxtools.diaglayer import DiagLayer from odxtools.diaglayerraw import DiagLayerRaw @@ -13,6 +14,7 @@ from odxtools.diagnostictroublecode import DiagnosticTroubleCode from odxtools.diagservice import DiagService from odxtools.dtcdop import DtcDop +from odxtools.dynamiclengthfield import DynamicLengthField from odxtools.endofpdufield import EndOfPduField from odxtools.exceptions import DecodeError from odxtools.message import Message @@ -700,8 +702,233 @@ def test_decode_request_structure(self) -> None: self.assertEqual(expected_message.coding_object, decoded_message.coding_object) self.assertEqual(expected_message.param_dict, decoded_message.param_dict) + def test_dynamic_length_field_coding(self) -> None: + """Test en- and decoding of a dynamic length fields.""" + diag_coded_type = StandardLengthType( + base_data_type=DataType.A_UINT32, + base_type_encoding=None, + bit_length=8, + bit_mask=None, + is_condensed_raw=None, + is_highlow_byte_order_raw=None, + ) + diag_coded_type_4 = StandardLengthType( + base_data_type=DataType.A_UINT32, + base_type_encoding=None, + bit_length=4, + bit_mask=None, + is_condensed_raw=None, + is_highlow_byte_order_raw=None, + ) + + compu_method = IdenticalCompuMethod( + internal_type=DataType.A_INT32, physical_type=DataType.A_INT32) + dop = DataObjectProperty( + odx_id=OdxLinkId("dlf.dop.id", doc_frags), + short_name="dlf_dop_sn", + long_name=None, + description=None, + admin_data=None, + diag_coded_type=diag_coded_type_4, + physical_type=PhysicalType(DataType.A_UINT32, display_radix=None, precision=None), + compu_method=compu_method, + unit_ref=None, + sdgs=[], + internal_constr=None, + physical_constr=None, + ) + + req_param1 = CodedConstParameter( + short_name="SID", + long_name=None, + description=None, + semantic=None, + diag_coded_type=diag_coded_type, + coded_value=0x12, + byte_position=0, + bit_position=None, + sdgs=[], + ) + + struct_param1 = CodedConstParameter( + short_name="struct_param_1", + long_name=None, + description=None, + semantic=None, + diag_coded_type=diag_coded_type_4, + coded_value=0x4, + byte_position=0, + bit_position=0, + sdgs=[], + ) + struct_param2 = ValueParameter( + short_name="struct_param_2", + long_name=None, + description=None, + semantic=None, + dop_ref=OdxLinkRef.from_id(dop.odx_id), + dop_snref=None, + physical_default_value_raw=None, + byte_position=0, + bit_position=4, + sdgs=[], + ) + struct = Structure( + odx_id=OdxLinkId("dlf_struct.id", doc_frags), + short_name="dlf_struct", + long_name=None, + description=None, + admin_data=None, + sdgs=[], + parameters=NamedItemList([struct_param1, struct_param2]), + byte_size=None, + ) + det_num_items = DetermineNumberOfItems( + byte_position=1, bit_position=3, dop_ref=OdxLinkRef.from_id(dop.odx_id)) + dlf = DynamicLengthField( + odx_id=OdxLinkId("dlf.id", doc_frags), + short_name="dlf_sn", + long_name=None, + description=None, + admin_data=None, + sdgs=[], + structure_ref=OdxLinkRef.from_id(struct.odx_id), + structure_snref=None, + env_data_desc_ref=None, + env_data_desc_snref=None, + is_visible_raw=True, + offset=3, + determine_number_of_items=det_num_items, + ) + req_param2 = ValueParameter( + short_name="dlf_param", + long_name=None, + description=None, + semantic=None, + dop_ref=OdxLinkRef.from_id(dlf.odx_id), + dop_snref=None, + physical_default_value_raw=None, + byte_position=None, + bit_position=None, + sdgs=[], + ) + + req = Request( + odx_id=OdxLinkId("dlf.request.id", doc_frags), + short_name="dlf_request_sn", + long_name=None, + description=None, + admin_data=None, + sdgs=[], + parameters=NamedItemList([req_param1, req_param2]), + byte_size=None, + ) + service = DiagService( + odx_id=OdxLinkId("dlf.service.id", doc_frags), + short_name="dlf_service_sn", + long_name=None, + description=None, + protocol_snrefs=[], + related_diag_comm_refs=[], + diagnostic_class=None, + is_mandatory_raw=None, + is_executable_raw=None, + is_final_raw=None, + admin_data=None, + semantic=None, + comparam_refs=[], + is_cyclic_raw=None, + is_multiple_raw=None, + addressing_raw=None, + transmission_mode_raw=None, + audience=None, + functional_class_refs=[], + pre_condition_state_refs=[], + state_transition_refs=[], + request_ref=OdxLinkRef.from_id(req.odx_id), + pos_response_refs=[], + neg_response_refs=[], + sdgs=[], + ) + diag_layer_raw = DiagLayerRaw( + variant_type=DiagLayerType.BASE_VARIANT, + odx_id=OdxLinkId("dl.id", doc_frags), + short_name="dl_sn", + long_name=None, + description=None, + admin_data=None, + company_datas=NamedItemList(), + functional_classes=NamedItemList(), + diag_data_dictionary_spec=DiagDataDictionarySpec( + dtc_dops=NamedItemList(), + data_object_props=NamedItemList([dop]), + structures=NamedItemList([struct]), + end_of_pdu_fields=NamedItemList(), + dynamic_length_fields=NamedItemList([dlf]), + tables=NamedItemList(), + env_data_descs=NamedItemList(), + env_datas=NamedItemList(), + muxs=NamedItemList(), + unit_spec=None, + sdgs=[]), + diag_comms=[service], + requests=NamedItemList([req]), + positive_responses=NamedItemList(), + negative_responses=NamedItemList(), + global_negative_responses=NamedItemList(), + additional_audiences=NamedItemList(), + import_refs=[], + state_charts=NamedItemList(), + sdgs=[], + parent_refs=[], + comparams=[], + ecu_variant_patterns=[], + comparam_spec_ref=None, + prot_stack_snref=None, + ) + diag_layer = DiagLayer(diag_layer_raw=diag_layer_raw) + odxlinks = OdxLinkDatabase() + odxlinks.update(diag_layer._build_odxlinks()) + diag_layer._resolve_odxlinks(odxlinks) + diag_layer._finalize_init(odxlinks) + + expected_message = Message( + coded_message=bytes([0x12, 0x00, 0x18, 0x00, 0x34, 0x44, 0x54]), + service=service, + coding_object=req, + param_dict={ + "SID": + 0x12, + "dlf_param": [ + { + "struct_param_1": 4, + "struct_param_2": 3 + }, + { + "struct_param_1": 4, + "struct_param_2": 4 + }, + { + "struct_param_1": 4, + "struct_param_2": 5 + }, + ], + }, + ) + + # test encoding + encoded_message = diag_layer.services.dlf_service_sn(**expected_message.param_dict) + self.assertEqual(encoded_message, expected_message.coded_message) + + # test decoding + decoded_message = diag_layer.decode(expected_message.coded_message)[0] + self.assertEqual(expected_message.coded_message, decoded_message.coded_message) + self.assertEqual(expected_message.service, decoded_message.service) + self.assertEqual(expected_message.coding_object, decoded_message.coding_object) + self.assertEqual(expected_message.param_dict, decoded_message.param_dict) + def test_decode_request_end_of_pdu_field(self) -> None: - """Test the decoding for a structure.""" + """Test decoding of end-of-pdu fields.""" diag_coded_type = StandardLengthType( base_data_type=DataType.A_UINT32, base_type_encoding=None, diff --git a/tests/test_diag_coded_types.py b/tests/test_diag_coded_types.py index 02151455..e11e01cb 100644 --- a/tests/test_diag_coded_types.py +++ b/tests/test_diag_coded_types.py @@ -76,7 +76,7 @@ def test_encode_leading_length_info_type_bytefield(self) -> None: base_type_encoding=None, is_highlow_byte_order_raw=None, ) - state = EncodeState(bytes([]), {}) + state = EncodeState(bytearray([]), {}) byte_val = dct.convert_internal_to_bytes("4V", state, bit_position=1) self.assertEqual(byte_val, bytes([0x4, 0x34, 0x56])) @@ -86,7 +86,7 @@ def test_encode_leading_length_info_type_bytefield(self) -> None: base_type_encoding=None, is_highlow_byte_order_raw=None, ) - state = EncodeState(bytes([]), {}) + state = EncodeState(bytearray([]), {}) internal = dct.convert_internal_to_bytes(bytes([0x3]), state, bit_position=1) self.assertEqual(internal, bytes([0x2, 0x3])) @@ -97,7 +97,7 @@ def test_decode_leading_length_info_type_bytefield2(self) -> None: base_type_encoding=None, is_highlow_byte_order_raw=None, ) - state = EncodeState(bytes([0x12, 0x34]), {}) + state = EncodeState(bytearray([0x12, 0x34]), {}) byte_val = dct.convert_internal_to_bytes(bytes([0x0]), state, bit_position=0) # Right now `bytes([0x1, 0x0])` is the encoded value. # However, since bytes() is shorter and would be decoded @@ -134,7 +134,7 @@ def test_encode_leading_length_info_type_unicode2string(self) -> None: base_type_encoding=None, is_highlow_byte_order_raw=None, ) - state = EncodeState(coded_message=bytes([0x12]), parameter_values={}) + state = EncodeState(coded_message=bytearray([0x12]), parameter_values={}) byte_val = dct.convert_internal_to_bytes("a9", state, bit_position=0) self.assertEqual(byte_val, bytes([0x4, 0x00, 0x61, 0x00, 0x39])) @@ -405,7 +405,7 @@ def test_encode_param_info_length_type_uint(self) -> None: odxlinks = OdxLinkDatabase() odxlinks.update({length_key_id: length_key}) dct._resolve_odxlinks(odxlinks) - state = EncodeState(bytes([0x10]), {length_key.short_name: 40}) + state = EncodeState(bytearray([0x10]), {length_key.short_name: 40}) byte_val = dct.convert_internal_to_bytes(0x12345, state, bit_position=0) self.assertEqual(byte_val.hex(), "0000012345") @@ -673,7 +673,7 @@ def test_encode_min_max_length_type_hex_ff(self) -> None: termination="HEX-FF", is_highlow_byte_order_raw=None, ) - state = EncodeState(bytes([0x12]), parameter_values={}, is_end_of_pdu=False) + state = EncodeState(bytearray([0x12]), parameter_values={}, is_end_of_pdu=False) byte_val = dct.convert_internal_to_bytes(bytes([0x34, 0x56]), state, bit_position=0) self.assertEqual(byte_val, bytes([0x34, 0x56, 0xFF])) @@ -686,7 +686,7 @@ def test_encode_min_max_length_type_zero(self) -> None: termination="ZERO", is_highlow_byte_order_raw=None, ) - state = EncodeState(bytes([0x12]), parameter_values={}, is_end_of_pdu=False) + state = EncodeState(bytearray([0x12]), parameter_values={}, is_end_of_pdu=False) byte_val = dct.convert_internal_to_bytes("Hi", state, bit_position=0) self.assertEqual(byte_val, bytes([0x48, 0x69, 0x0])) @@ -701,7 +701,7 @@ def test_encode_min_max_length_type_end_of_pdu(self) -> None: termination=termination, is_highlow_byte_order_raw=None, ) - state = EncodeState(bytes([0x12]), parameter_values={}, is_end_of_pdu=True) + state = EncodeState(bytearray([0x12]), parameter_values={}, is_end_of_pdu=True) byte_val = dct.convert_internal_to_bytes( bytes([0x34, 0x56, 0x78, 0x9A]), state, bit_position=0) self.assertEqual(byte_val, bytes([0x34, 0x56, 0x78, 0x9A])) @@ -714,7 +714,7 @@ def test_encode_min_max_length_type_end_of_pdu(self) -> None: termination="END-OF-PDU", is_highlow_byte_order_raw=None, ) - state = EncodeState(bytes([0x12]), parameter_values={}, is_end_of_pdu=False) + state = EncodeState(bytearray([0x12]), parameter_values={}, is_end_of_pdu=False) def test_encode_min_max_length_type_max_length(self) -> None: """If the internal value is larger than max length, an EncodeError must be raised.""" @@ -727,7 +727,7 @@ def test_encode_min_max_length_type_max_length(self) -> None: termination=termination, is_highlow_byte_order_raw=None, ) - state = EncodeState(bytes([0x12]), parameter_values={}, is_end_of_pdu=True) + state = EncodeState(bytearray([0x12]), parameter_values={}, is_end_of_pdu=True) byte_val = dct.convert_internal_to_bytes( bytes([0x34, 0x56, 0x78]), state, bit_position=0) self.assertEqual(byte_val, bytes([0x34, 0x56, 0x78]))