From 0e660519a58adba119d552e90fcbca8e430490a4 Mon Sep 17 00:00:00 2001 From: Artur Hadasz Date: Wed, 20 Nov 2024 17:34:27 +0100 Subject: [PATCH] cache_create: Extended the command to extract from envelope This commit extends the suit-generator cache_create command. It now has three subcommands: from_payloads, from_envelopei and merge. from_payload has the same functionality as the old version of the cache_create command from_envelope allows extracting payloads directly from the envelope - also with the possibility to parse the envelope hierarchically and omit certain payloads. merge allows to merge multiple cache partition files into one. Ref: NCSDK-28932 Signed-off-by: Artur Hadasz --- suit_generator/cmd_cache_create.py | 353 +++++++++++++++++++++++------ tests/test_cmd_cache_create.py | 132 ++++++++++- 2 files changed, 404 insertions(+), 81 deletions(-) diff --git a/suit_generator/cmd_cache_create.py b/suit_generator/cmd_cache_create.py index 3b6feaa..2713ca6 100644 --- a/suit_generator/cmd_cache_create.py +++ b/suit_generator/cmd_cache_create.py @@ -6,104 +6,151 @@ """CMD_CACHE_CREATE CLI command entry point.""" import logging -import cbor2 import math +import cbor2 +import re -log = logging.getLogger(__name__) +from suit_generator.exceptions import GeneratorError CACHE_CREATE_CMD = "cache_create" +CACHE_CREATE_FROM_PAYLOADS_CMD = "from_payloads" +CACHE_CREATE_FROM_ENVELOPE_CMD = "from_envelope" +CACHE_MERGE_CMD = "merge" + +log = logging.getLogger(__name__) def add_arguments(parser): """Add additional arguments to the passed parser.""" - cmd_cache_create_arg_parser = parser.add_parser(CACHE_CREATE_CMD, help="Create raw cache structure.") + cmd_cache_create = parser.add_parser(CACHE_CREATE_CMD, help="Create raw cache structure.") - cmd_cache_create_arg_parser.add_argument( + cmd_cache_create_subparsers = cmd_cache_create.add_subparsers( + dest="cache_create_subcommand", required=True, help="Choose cache_create subcommand" + ) + cmd_cache_create_from_payloads = cmd_cache_create_subparsers.add_parser( + CACHE_CREATE_FROM_PAYLOADS_CMD, + help="Create a cache partition from the provided binaries containing raw payloads.", + ) + + cmd_cache_create_from_payloads.add_argument("--output-file", required=True, help="Output raw SUIT DFU cache file.") + cmd_cache_create_from_payloads.add_argument( + "--eb-size", type=int, help="Erase block size in bytes (used for padding).", default=16 + ) + + cmd_cache_create_from_payloads.add_argument( "--input", required=True, action="append", help="Input binary with corresponding URI, passed in format ,." + "Multiple inputs can be passed.", ) - cmd_cache_create_arg_parser.add_argument("--output-file", required=True, help="Output raw SUIT DFU cache file.") - cmd_cache_create_arg_parser.add_argument( + + cmd_cache_create_from_envelope = cmd_cache_create_subparsers.add_parser( + CACHE_CREATE_FROM_ENVELOPE_CMD, help="Create a cache partition from the payloads inside the provided envelope." + ) + + cmd_cache_create_from_envelope.add_argument("--output-file", required=True, help="Output raw SUIT DFU cache file.") + cmd_cache_create_from_envelope.add_argument( "--eb-size", type=int, help="Erase block size in bytes (used for padding).", default=16 ) + cmd_cache_create_from_envelope.add_argument("--input-envelope", required=True, help="Input envelope file path.") -def add_padding(data: bytes, eb_size: int) -> bytes: - """ - Add padding to the given data to align it to the specified erase block size. - - This function ensures that the data is padded to a size that is a multiple of the erase block size (eb_size). - The padding is done by appending a CBOR key-value pair with empty URI as the key and - byte-string-encoded zeros as the value. - - :param data: The input data to be padded. - :type data: bytes - :param eb_size: The erase block size in bytes. - :type eb_size: int - :return: The padded data. - """ - rounded_up_size = math.ceil(len(data) / eb_size) * eb_size - padding_size = rounded_up_size - len(data) - padded_data = data - - # minimum padding size is 2 bytes - if padding_size == 1: - padding_size += eb_size - rounded_up_size += eb_size - - if padding_size == 0: - return data - - padded_data += bytes([0x60]) - - if padding_size <= 23: - header_len = 2 - padded_data += bytes([0x40 + (padding_size - header_len)]) - elif padding_size <= 0xFFFF: - header_len = 4 - padded_data += bytes([0x59]) + (padding_size - header_len).to_bytes(2, byteorder="big") - else: - raise ValueError("Number of required padding bytes exceeds assumed max size 0xFFFF") + cmd_cache_create_from_envelope.add_argument( + "--output-envelope", required=True, help="Output envelope file path (envelope with removed extracted payloads)." + ) + + cmd_cache_create_from_envelope.add_argument( + "--omit-payload-regex", + help="Integrated payloads matching the regular expression will not be extracted to the cache.", + ) - return padded_data.ljust(rounded_up_size, b"\x00") + cmd_cache_create_from_envelope.add_argument( + "--dependency-regex", + help="Integrated payloads matching the regular expression will be treated as dependency" + + "envelopes and parsed hierarchically. " + + "The payloads extracted from the dependency envelopes will be added to the cache.", + ) + cmd_cache_merge = cmd_cache_create_subparsers.add_parser( + CACHE_MERGE_CMD, help="Merge multiple cache partitions into a single cache partition." + ) -def main(input: list[str], output_file: str, eb_size: int) -> None: - """ - Create a raw SUIT DFU cache file from input binaries. + cmd_cache_merge.add_argument("--input", required=True, action="append", help="Input raw SUIT DFU cache file.") + cmd_cache_merge.add_argument("--output-file", required=True, help="Output raw SUIT DFU cache file.") + cmd_cache_merge.add_argument( + "--eb-size", type=int, help="Erase block size in bytes (used for padding).", default=16 + ) - This function processes a list of input binaries, each associated with a URI, and creates a raw SUIT DFU cache file. - The data is padded to align with the specified erase block size (eb_size). - :param input: List of input binaries with corresponding URIs, passed in the format ,. - :type input: list[str] - :param output_file: Path to the output raw SUIT DFU cache file. - :type output_file: str - :param eb_size: Erase block size in bytes (used for padding). - :type eb_size: int - :return: None - """ - cache_data = bytes() - first_slot = True +class CachePartition: + """Class generating SUIT DFU Cache Partition.""" - for single_input in input: - args = single_input.split(",") - if len(args) < 2: - raise ValueError("Invalid number of input arguments: " + single_input) - uri, input_file = args + def __init__(self, eb_size: int): + """Initialize a CachePartition object.""" + self.first_slot = True + self.cache_data = bytes() + self.eb_size = eb_size + self.uris = [] - data = None - with open(input_file, "rb") as f: - data = f.read() + def add_padding(self, data: bytes) -> bytes: + """ + Add padding to the given data to align it to the specified erase block size. + + This method ensures that the data is padded to a size that is a multiple of the erase block size (self.eb_size). + The padding is done by appending a CBOR key-value pair with empty URI as the key and + byte-string-encoded zeros as the value. + + :param data: The input data to be padded. + :type data: bytes + :return: The padded data. + """ + rounded_up_size = math.ceil(len(data) / self.eb_size) * self.eb_size + padding_size = rounded_up_size - len(data) + padded_data = data + + # minimum padding size is 2 bytes + if padding_size == 1: + padding_size += self.eb_size + rounded_up_size += self.eb_size + if padding_size == 0: + return data + + padded_data += bytes([0x60]) + + if padding_size <= 23: + header_len = 2 + padded_data += bytes([0x40 + (padding_size - header_len)]) + elif padding_size <= 0xFFFF: + header_len = 4 + padded_data += bytes([0x59]) + (padding_size - header_len).to_bytes(2, byteorder="big") + else: + raise ValueError("Number of required padding bytes exceeds assumed max size 0xFFFF") + + return padded_data.ljust(rounded_up_size, b"\x00") + + def add_cache_slot(self, uri: str, data: bytes): + """ + Add a cache slot to the cache from the given URI and data. + + This function creates a cache slot from the given URI and data, and pads the data to align with the specified + erase block size (eb_size). The first slot in the cache is created with indefinite length CBOR map. + + :param uri: The URI associated with the data. + :type uri: str + :param data: The data to be included in the cache slot. + :type data: bytes + """ slot_data = bytes() - if first_slot: + if self.first_slot: # Open the cache - it is an indefinite length CBOR map (0xBF) slot_data = bytes([0xBF]) - first_slot = False + self.first_slot = False + + if uri in self.uris: + raise ValueError(f"URI {uri} already exists in the cache!") + self.uris.append(uri) # uri as key slot_data += cbor2.dumps(uri) @@ -111,11 +158,171 @@ def main(input: list[str], output_file: str, eb_size: int) -> None: # Size must be encoded in 4 bytes, thus cannot use cbor2.dumps slot_data += bytes([0x5A]) + len(data).to_bytes(4, byteorder="big") + data # Add padding for single slot - slot_data = add_padding(slot_data, eb_size) + slot_data = self.add_padding(slot_data) + + self.cache_data += slot_data - cache_data += slot_data + def close_and_save_cache(self, output_file: str): + """ + Close the cache and save it to the specified output file. - cache_data += bytes([0xFF]) # Finish the indefinite length map + This function closes the cache by adding the end-of-map byte (0xFF) and saves the cache to the specified output + file. + + :param output_file: Path to the output raw SUIT DFU cache file. + :type output_file: str + """ + self.cache_data += bytes([0xFF]) + with open(output_file, "wb") as f: + f.write(self.cache_data) + + def merge_single_cache_file(self, cache_input_file: str): + """ + Merge the contents of the provided single cache file into the current cache. + + :param cache_input_file: Path to the input raw SUIT DFU cache file. + """ + with open(cache_input_file, "rb") as f: + data = f.read() + + cache_dict = cbor2.loads(data) + + for k in cache_dict.keys(): + if len(k) == 0: + continue # Empty key means padding - skip + self.add_cache_slot(k, cache_dict[k]) + + +class CacheFromPayloads: + """Class generating SUIT DFU Cache Partition from payloads.""" + + def fill_cache_from_payloads(cache: CachePartition, input: list[str]) -> None: + """ + Process list of input binaries, each associated with a URI, and fill the SUIT DFU cache with the data. + + :param cache: CachePartition object to fill with the data + :param input: List of input binaries with corresponding URIs, passed in the format , + """ + for single_input in input: + args = single_input.split(",") + if len(args) < 2: + raise ValueError("Invalid number of input arguments: " + single_input) + uri, input_file = args + + with open(input_file, "rb") as f: + data = f.read() + + cache.add_cache_slot(uri, data) + + +class CacheFromEnvelope: + """Class generating SUIT DFU Cache Partition from envelope.""" + + def fill_cache_from_envelope_data( + cache: CachePartition, envelope_data: bytes, omit_payload_regex: str, dependency_regex: str + ) -> bytes: + """ + Fill the cache partition with data from the payloads inside the provided envelope binary data. + + This function is called recursively for dependency envelopes. + :param cache: CachePartition object to fill with the data + :param envelope_data: Binary data of the envelope to extract the payloads from + :param omit_payload_regex: Integrated payloads matching the regular expression will not be extracted to the + cache + :param dependency_regex: Integrated payloads matching the regular expression will be treated as dependency + envelopes + """ + try: + envelope = cbor2.loads(envelope_data) + except Exception: + raise GeneratorError("The provided envelope/dependency envelope is not a valid envelope!") + + if isinstance(envelope, cbor2.CBORTag) and isinstance(envelope.value, dict): + integrated = [k for k in envelope.value.keys() if isinstance(k, str)] + else: + raise GeneratorError("The provided envelope/dependency envelope is not a valid envelope!") + + if dependency_regex is not None: + integrated_dependencies = [k for k in integrated if not re.fullmatch(dependency_regex, k) is None] + for dep in integrated_dependencies: + integrated.remove(dep) + else: + integrated_dependencies = [] + + if omit_payload_regex is None: + payloads_to_extract = integrated + else: + payloads_to_extract = [k for k in integrated if re.fullmatch(omit_payload_regex, k) is None] + + for payload in payloads_to_extract: + cache.add_cache_slot(payload, envelope.value.pop(payload)) + + for dependency in integrated_dependencies: + try: + new_dependency_data = CacheFromEnvelope.fill_cache_from_envelope_data( + cache, envelope.value[dependency], omit_payload_regex, dependency_regex + ) + except GeneratorError as e: + log.log(logging.ERROR, "Failed to extract payloads from dependency %s: %s", dependency, repr(e)) + raise GeneratorError("Failed to extract payloads from envelope!") + + envelope.value[dependency] = new_dependency_data + + return cbor2.dumps(envelope) + + def fill_cache_from_envelope( + cache: CachePartition, input_envelope: str, output_envelope: str, omit_payload_regex: str, dependency_regex: str + ) -> None: + """ + Extract the payloads from the provided envelope to the cache partition file. + + param cache: CachePartition object to fill with the data + param input_envelope: Path to the input envelope file + param output_envelope: Path to the output envelope file (envelope with removed extracted payloads) + param omit_payload_regex: Integrated payloads matching the regular expression will not be extracted to the cache + param dependency_regex: Integrated payloads matching the regular expression will be treated as dependency + envelopes + """ + with open(input_envelope, "rb") as fh: + data = fh.read() + output_envelope_data = CacheFromEnvelope.fill_cache_from_envelope_data( + cache, data, omit_payload_regex, dependency_regex + ) + with open(output_envelope, "wb") as fh: + fh.write(output_envelope_data) + + +class CacheMerge: + """Class merging SUIT DFU Cache Partitions.""" + + def merge_cache_files(cache: CachePartition, input: list[str]) -> None: + """ + Merge the contents of the provided cache files into the cache partition. + + :param cache: CachePartition object to merge the cache files into + :param input: List of paths to the input raw SUIT DFU cache files + """ + for single_input in input: + cache.merge_single_cache_file(single_input) + + +def main(**kwargs) -> None: + """Create a raw SUIT DFU cache file.""" + cache = CachePartition(kwargs["eb_size"]) + + if kwargs["cache_create_subcommand"] == CACHE_CREATE_FROM_PAYLOADS_CMD: + CacheFromPayloads.fill_cache_from_payloads(cache, kwargs["input"]) + elif kwargs["cache_create_subcommand"] == CACHE_CREATE_FROM_ENVELOPE_CMD: + CacheFromEnvelope.fill_cache_from_envelope( + cache, + kwargs["input_envelope"], + kwargs["output_envelope"], + kwargs["omit_payload_regex"], + kwargs["dependency_regex"], + ) + elif kwargs["cache_create_subcommand"] == CACHE_MERGE_CMD: + CacheMerge.merge_cache_files(cache, kwargs["input"]) + else: + raise GeneratorError(f"Invalid 'cache_create' subcommand: {kwargs['cache_create_subcommand']}") - with open(output_file, "wb") as f: - f.write(cache_data) + cache.close_and_save_cache(kwargs["output_file"]) diff --git a/tests/test_cmd_cache_create.py b/tests/test_cmd_cache_create.py index b4c3b08..ded2574 100644 --- a/tests/test_cmd_cache_create.py +++ b/tests/test_cmd_cache_create.py @@ -8,30 +8,67 @@ import pytest import pathlib import os +import cbor2 from suit_generator.cmd_cache_create import main as cmd_cache_create_main +from suit_generator.cmd_create import main as cmd_create_main TEMP_DIRECTORY = pathlib.Path("test_test_data") BINARY_CONTENT_1 = bytes([0x01, 0x02, 0x03, 0x04]) BINARY_CONTENT_2 = bytes([0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11]) +BINARY_CONTENT_3 = bytes( + [ + 0x11, + 0x22, + 0x33, + 0x44, + 0x55, + ] +) URI1 = "#first" # [0x23, 0x66, 0x69, 0x72, 0x73, 0x74] URI2 = "#second" # [0x23, 0x73, 0x65, 0x63, 0x6F, 0x6E, 0x64] +URI3 = "#third" # [0x23, 0x74, 0x68, 0x69, 0x72, 0x64] +ENVELOPE_ROOT_YAML = """SUIT_Envelope_Tagged: + suit-authentication-wrapper: + SuitDigest: + suit-digest-algorithm-id: cose-alg-sha-256 + suit-digest-bytes: 60198229d4c07c866094a3d19c2d8b15b5dd552cd5bba5cf8f78e492ccbb3327 + suit-manifest: + suit-manifest-component-id: + - raw: aa + suit-integrated-dependencies: + 'dependency_manifest': dep.suit + '#first': first.bin +""" + +ENVELOPE_DEP_YAML = """SUIT_Envelope_Tagged: + suit-authentication-wrapper: + SuitDigest: + suit-digest-algorithm-id: cose-alg-sha-256 + suit-digest-bytes: b2afeba5d8172371661b7ab5d7242c2ba6797ae27e713114619d90663ab6a2ec + suit-manifest: + suit-manifest-component-id: + - raw: bb + suit-integrated-dependencies: + '#second': second.bin + '#third': third.bin +""" # fmt: off EXPECTED_CACHE_EB_8 = bytes([0xBF, - 0x66, 0x23, 0x66, 0x69, 0x72, 0x73, 0x74, # tstr "first" + 0x66, 0x23, 0x66, 0x69, 0x72, 0x73, 0x74, # tstr "#first" 0x5A, 0x00, 0x00, 0x00, 0x04, # bstr size 4 0x01, 0x02, 0x03, 0x04, # BINARY_CONTENT_1 0x60, 0x45, 0x00, 0x00, 0x00, 0x00, 0x00, # padding - 0x67, 0x23, 0x73, 0x65, 0x63, 0x6F, 0x6E, 0x64, # tstr "second" + 0x67, 0x23, 0x73, 0x65, 0x63, 0x6F, 0x6E, 0x64, # tstr "#second" 0x5A, 0x00, 0x00, 0x00, 0x0a, # bstr size 10 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, # BINARY_CONTENT_2 0x60, 0x47, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, # padding 0xFF]) EXPECTED_CACHE_EB_64 = bytes([0xBF, - 0x66, 0x23, 0x66, 0x69, 0x72, 0x73, 0x74, # tstr "first" + 0x66, 0x23, 0x66, 0x69, 0x72, 0x73, 0x74, # tstr "#first" 0x5A, 0x00, 0x00, 0x00, 0x04, # bstr size 4 0x01, 0x02, 0x03, 0x04, # BINARY_CONTENT_1 0x60, 0x59, 0x00, 0x2B, # padding 47 bytes (4 bytes header + 43 bytes padding) @@ -41,7 +78,7 @@ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x67, 0x23, 0x73, 0x65, 0x63, 0x6F, 0x6E, 0x64, # tstr "second" + 0x67, 0x23, 0x73, 0x65, 0x63, 0x6F, 0x6E, 0x64, # tstr "#second" 0x5A, 0x00, 0x00, 0x00, 0x0a, # bstr size 10 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, # BINARY_CONTENT_2 0x60, 0x59, 0x00, 0x25, # padding 41 bytes (4 bytes header + 37 bytes padding) @@ -67,7 +104,8 @@ def setup_and_teardown(tmp_path_factory): fh.write(BINARY_CONTENT_1) with open("second.bin", "wb") as fh: fh.write(BINARY_CONTENT_2) - + with open("third.bin", "wb") as fh: + fh.write(BINARY_CONTENT_3) yield # Cleanup environment # - remove temp directory @@ -81,12 +119,90 @@ def setup_and_teardown(tmp_path_factory): (64, EXPECTED_CACHE_EB_64), ], ) -def test_cache_create(setup_and_teardown, eb_size, output_content): - """Verify if is possible to create binary envelope from json input.""" +def test_cache_create_from_payloads(setup_and_teardown, eb_size, output_content): + """Verify if the cache is correctly created from payloads.""" cmd_cache_create_main( - input=[f"{URI1},first.bin", f"{URI2},second.bin"], output_file="test_cache.bin", eb_size=eb_size + cache_create_subcommand="from_payloads", + input=[f"{URI1},first.bin", f"{URI2},second.bin"], + output_file="test_cache.bin", + eb_size=eb_size, ) with open("test_cache.bin", "rb") as f: assert f.read() == output_content + + +def test_cache_create_merge(setup_and_teardown): + """Verify if the cache is correctly created from two other cache partitions.""" + + cmd_cache_create_main( + cache_create_subcommand="from_payloads", + input=[f"{URI1},first.bin", f"{URI2},second.bin"], + output_file="test_cache1.bin", + eb_size=8, + ) + cmd_cache_create_main( + cache_create_subcommand="from_payloads", input=[f"{URI3},third.bin"], output_file="test_cache2.bin", eb_size=4 + ) + + cmd_cache_create_main( + cache_create_subcommand="from_payloads", + input=[f"{URI1},first.bin", f"{URI2},second.bin", f"{URI3},third.bin"], + output_file="test_cache_merged_expected.bin", + eb_size=16, + ) + + cmd_cache_create_main( + cache_create_subcommand="merge", + input=["test_cache1.bin", "test_cache2.bin"], + output_file="test_cache_merged.bin", + eb_size=16, + ) + + with open("test_cache_merged.bin", "rb") as f: + result = f.read() + + with open("test_cache_merged_expected.bin", "rb") as f: + expected = f.read() + + # Assert that cache resulting from merging two caches is the same as if the cache was created from the payloads + assert result == expected + + +def test_cache_create_merge_from_envelope(setup_and_teardown): + # Prepare envelope files + with open("root.yaml", "w") as fh: + fh.write(ENVELOPE_ROOT_YAML) + with open("dep.yaml", "w") as fh: + fh.write(ENVELOPE_DEP_YAML) + cmd_create_main(input_file="dep.yaml", output_file="dep.suit", input_format="AUTO") + cmd_create_main(input_file="root.yaml", output_file="root.suit", input_format="AUTO") + + cmd_cache_create_main( + cache_create_subcommand="from_envelope", + input_envelope="root.suit", + output_envelope="root_stripped.suit", + output_file="test_cache_from_envelope.bin", + eb_size=8, + omit_payload_regex=".*third", + dependency_regex="dep.*", + ) + + with open("test_cache_from_envelope.bin", "rb") as f: + assert f.read() == EXPECTED_CACHE_EB_8 + + with open("root_stripped.suit", "rb") as fh: + envelope_stripped = cbor2.load(fh) + + assert "#first" not in envelope_stripped.value.keys() + + dependency = envelope_stripped.value.pop("dependency_manifest", None) + assert dependency is not None + dependency = cbor2.loads(dependency).value + + assert "#second" not in dependency.keys() + + not_extracted = dependency.pop("#third", None) + assert not_extracted is not None + assert not_extracted == BINARY_CONTENT_3