From 6500933f49e78657b0ddb6a6d8e9e09a30e36af1 Mon Sep 17 00:00:00 2001 From: Thien Trung Vuong Date: Fri, 7 Feb 2025 01:58:42 +0000 Subject: [PATCH 1/4] Add Tpm2 tool tpm2-tools package provides the toolset to interact with TPM devices Signed-off-by: Thien Trung Vuong --- lisa/tools/__init__.py | 2 ++ lisa/tools/tpm2.py | 68 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 lisa/tools/tpm2.py diff --git a/lisa/tools/__init__.py b/lisa/tools/__init__.py index 95d0f48fda..2db6493c82 100644 --- a/lisa/tools/__init__.py +++ b/lisa/tools/__init__.py @@ -118,6 +118,7 @@ from .texinfo import Texinfo from .timedatectl import Timedatectl from .timeout import Timeout +from .tpm2 import Tpm2 from .unzip import Unzip from .uptime import Uptime from .usermod import Usermod @@ -251,6 +252,7 @@ "TcpDump", "Timedatectl", "Timeout", + "Tpm2", "Uname", "Unzip", "Uptime", diff --git a/lisa/tools/tpm2.py b/lisa/tools/tpm2.py new file mode 100644 index 0000000000..09ee2de7a5 --- /dev/null +++ b/lisa/tools/tpm2.py @@ -0,0 +1,68 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +from typing import Dict, List, Optional, Sequence, Union + +from lisa.executable import Tool +from lisa.operating_system import CBLMariner +from lisa.util import LisaException + + +class Tpm2(Tool): + @property + def command(self) -> str: + return "tpm2" + + @property + def can_install(self) -> bool: + return True + + def _install(self) -> bool: + if isinstance(self.node.os, CBLMariner): + self.node.os.install_packages("tpm2-tools") + else: + raise LisaException( + f"tool {self.command} can't be installed in distro {self.node.os.name}." + ) + return self._check_exists() + + def pcrread( + self, pcrs: Optional[Union[int, Sequence[int]]] = None + ) -> Dict[int, str]: + alg = "sha256" + pcrs = self._get_pcr_list(pcrs) + if len(pcrs) == 0: + pcrs_arg = "all" + else: + pcrs_arg = ",".join(map(str, pcrs)) + cmd = f"pcrread {alg}:{pcrs_arg}" + cmd_result = self.run( + cmd, + expected_exit_code=0, + expected_exit_code_failure_message="failed to read PCR values", + shell=True, + sudo=True, + force_run=True, + ) + output = cmd_result.stdout + + lines = [line.strip() for line in output.splitlines()] + # first line of output will have the format ":", e.g "sha256:" + assert ( + lines[0][:-1] == alg + ), "pcrread output does not contain the requested algorithm" + + result = dict() + for line in lines[1:]: + pcr, hash_value = line.split(":", 1) + pcr_index = int(pcr.strip()) + hash_value = hash_value.strip().lower() + result[pcr_index] = hash_value + return result + + def _get_pcr_list(self, pcrs: Optional[Union[int, Sequence[int]]]) -> List[int]: + if pcrs is None: + return [] + if isinstance(pcrs, int): + return [pcrs] + return [pcr for pcr in pcrs] From 6a28f99e93c11570fa8bc0b689c90c21d7c2d405 Mon Sep 17 00:00:00 2001 From: Thien Trung Vuong Date: Fri, 7 Feb 2025 02:00:39 +0000 Subject: [PATCH 2/4] Add Bootctl tool Signed-off-by: Thien Trung Vuong --- lisa/tools/__init__.py | 2 ++ lisa/tools/bootctl.py | 45 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) create mode 100644 lisa/tools/bootctl.py diff --git a/lisa/tools/__init__.py b/lisa/tools/__init__.py index 2db6493c82..da08f164f3 100644 --- a/lisa/tools/__init__.py +++ b/lisa/tools/__init__.py @@ -18,6 +18,7 @@ from .aria import Aria from .b4 import B4 from .blkid import Blkid +from .bootctl import BootCtl from .bzip2 import Bzip2 from .cargo import Cargo from .chmod import Chmod @@ -134,6 +135,7 @@ "Aria", "B4", "Blkid", + "BootCtl", "Bzip2", "Cargo", "Cat", diff --git a/lisa/tools/bootctl.py b/lisa/tools/bootctl.py new file mode 100644 index 0000000000..3936667c3a --- /dev/null +++ b/lisa/tools/bootctl.py @@ -0,0 +1,45 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +from lisa.executable import Tool +from lisa.operating_system import CBLMariner +from lisa.util import LisaException + + +class BootCtl(Tool): + @property + def command(self) -> str: + return "bootctl" + + @property + def can_install(self) -> bool: + return True + + def _install(self) -> bool: + if isinstance(self.node.os, CBLMariner): + self.node.os.install_packages("systemd-udev") + else: + raise LisaException( + f"tool {self.command} can't be installed in distro {self.node.os.name}" + ) + return self._check_exists() + + def get_esp_path(self) -> str: + return self._get_cmd_output("--print-esp-path") + + def get_boot_path(self) -> str: + return self._get_cmd_output("--print-boot-path") + + def get_root_device(self) -> str: + return self._get_cmd_output("--print-root-device") + + def _get_cmd_output(self, cmd: str) -> str: + cmd_result = self.run( + cmd, + expected_exit_code=0, + expected_exit_code_failure_message="failed to get ESP path", + shell=True, + sudo=True, + force_run=True, + ) + return cmd_result.stdout From 53f5b0bf1147b1c086494fabdfca88fd1f822e1c Mon Sep 17 00:00:00 2001 From: Thien Trung Vuong Date: Fri, 7 Feb 2025 02:01:54 +0000 Subject: [PATCH 3/4] Include raw version string in Posix.get_package_information return value - Introduce LisaVersionInfo to wrap semver.VersionInfo - Update os._get_package_information to use distro's package manager to query a package version string directly, rather than using regex to extract version string from query output - Include the raw version string by default when getting a package version, resolves parsing error when the package does not follow semantic versioning, e.g. systemd Signed-off-by: Thien Trung Vuong --- lisa/operating_system.py | 122 ++++++++++++++------------------------- lisa/util/__init__.py | 21 +++++-- 2 files changed, 58 insertions(+), 85 deletions(-) diff --git a/lisa/operating_system.py b/lisa/operating_system.py index f94980dd5f..686bfabbf6 100644 --- a/lisa/operating_system.py +++ b/lisa/operating_system.py @@ -39,6 +39,7 @@ BaseClassMixin, LisaException, LisaTimeoutException, + LisaVersionInfo, MissingPackagesException, ReleaseEndOfLifeException, RepoNotExistException, @@ -152,7 +153,7 @@ def __init__(self, node: "Node", is_posix: bool) -> None: self._is_posix = is_posix self._log = get_logger(name="os", parent=self._node.log) self._information: Optional[OsInformation] = None - self._packages: Dict[str, VersionInfo] = dict() + self._packages: Dict[str, LisaVersionInfo] = dict() @classmethod def create(cls, node: "Node") -> Any: @@ -494,7 +495,7 @@ def capture_system_information(self, saved_path: Path) -> None: def get_package_information( self, package_name: str, use_cached: bool = True - ) -> VersionInfo: + ) -> LisaVersionInfo: found = self._packages.get(package_name, None) if found and use_cached: return found @@ -549,12 +550,24 @@ def _initialize_package_installation(self) -> None: # sub os can override it, but it's optional pass - def _get_package_information(self, package_name: str) -> VersionInfo: + def _get_package_information(self, package_name: str) -> LisaVersionInfo: raise NotImplementedError() - def _get_version_info_from_named_regex_match( - self, package_name: str, named_matches: Match[str] - ) -> VersionInfo: + def _get_version_info_from_regex( + self, version_str: str, regex: Pattern[str] + ) -> LisaVersionInfo: + matches = regex.search(version_str) + if not matches: + self._node.log.debug( + "Could not find regex match for version_str ({version_str}) using regex" + ) + return LisaVersionInfo(version_str, 0) + + return self._get_version_info_from_named_matches(version_str, matches) + + def _get_version_info_from_named_matches( + self, version_str: str, named_matches: Match[str] + ) -> LisaVersionInfo: essential_matches = ["major", "minor"] # verify all essential keys are in our match dict @@ -576,16 +589,16 @@ def _get_version_info_from_named_regex_match( ) build_match = named_matches.group("build") log_message = ( - f"Found {package_name} version " + f"Found version from ({version_str}) " f"major:{major_match} minor:{minor_match} " f"patch:{patch_match} build:{build_match}" ) self._node.log.debug(log_message) - return VersionInfo(major, minor, patch, build=build_match) + return LisaVersionInfo(version_str, major, minor, patch, build=build_match) def _cache_and_return_version_info( - self, package_name: str, info: VersionInfo - ) -> VersionInfo: + self, package_name: str, info: LisaVersionInfo + ) -> LisaVersionInfo: self._packages[package_name] = info return info @@ -765,14 +778,6 @@ class Debian(Linux): r"\s+(?P.*)\s*" ) - """ Package: dpdk - Version: 20.11.3-0ubuntu1~backport20.04-202111041420~ubuntu20.04.1 - Version: 1:2.25.1-1ubuntu3.2 - """ - _debian_package_information_regex = re.compile( - r"Package: ([a-zA-Z0-9:_\-\.]+)\r?\n" # package name group - r"Version: ([a-zA-Z0-9:_\-\.~+]+)\r?\n" # version number group - ) # ex: 3.10 # ex: 3.10.5-git # ex: 3.10-5git3 @@ -838,32 +843,17 @@ def get_apt_error(self, stdout: str) -> List[str]: error_lines.append(line) return error_lines - def _get_package_information(self, package_name: str) -> VersionInfo: - # run update of package info - apt_info = self._node.execute( - f"apt show {package_name}", + def _get_package_information(self, package_name: str) -> LisaVersionInfo: + query_result = self._node.execute( + f"dpkg-query -f '${{Version}}' -W {package_name}", expected_exit_code=0, expected_exit_code_failure_message=( f"Could not find package information for package {package_name}" ), ) - match = self._debian_package_information_regex.search(apt_info.stdout) - if not match: - raise LisaException( - "Package information parsing could not find regex match " - f" for {package_name} using regex " - f"{self._debian_package_information_regex.pattern}" - ) - version_str = match.group(2) - match = self._debian_version_splitter_regex.search(version_str) - if not match: - raise LisaException( - f"Could not parse version info: {version_str} " - f"for package {package_name}" - ) - self._node.log.debug(f"Attempting to parse version string: {version_str}") - version_info = self._get_version_info_from_named_regex_match( - package_name, match + version_str = query_result.stdout.strip() + version_info = self._get_version_info_from_regex( + version_str, self._debian_version_splitter_regex ) return self._cache_and_return_version_info(package_name, version_info) @@ -1487,7 +1477,6 @@ class RPMDistro(Linux): # ex: dpdk-20.11-3.el8.x86_64 or dpdk-18.11.8-1.el7_8.x86_64 _rpm_version_splitter_regex = re.compile( - r"(?P[a-zA-Z0-9\-_]+)-" r"(?P[0-9]+)\." r"(?P[0-9]+)\.?" r"(?P[0-9]+)?" @@ -1537,23 +1526,17 @@ def add_azure_core_repo( def clean_package_cache(self) -> None: self._node.execute(f"{self._dnf_tool()} clean all", sudo=True, shell=True) - def _get_package_information(self, package_name: str) -> VersionInfo: - rpm_info = self._node.execute( - f"rpm -q {package_name}", + def _get_package_information(self, package_name: str) -> LisaVersionInfo: + query_result = self._node.execute( + f"rpm --queryformat '%{{VERSION}}-%{{RELEASE}}' -q {package_name}", expected_exit_code=0, expected_exit_code_failure_message=( f"Could not find package information for package {package_name}" ), ) - # rpm package should be of format (package_name)-(version) - matches = self._rpm_version_splitter_regex.search(rpm_info.stdout) - if not matches: - raise LisaException( - f"Could not parse package version {rpm_info} for {package_name}" - ) - self._node.log.debug(f"Attempting to parse version string: {rpm_info.stdout}") - version_info = self._get_version_info_from_named_regex_match( - package_name, matches + version_str = query_result.stdout.strip() + version_info = self._get_version_info_from_regex( + version_str, self._rpm_version_splitter_regex ) return self._cache_and_return_version_info(package_name, version_info) @@ -1993,12 +1976,7 @@ class Suse(Linux): ) # Warning: There are no enabled repositories defined. _no_repo_defined = re.compile("There are no enabled repositories defined.", re.M) - # Name : dpdk - # Version : 19.11.10-150400.4.7.1 - _suse_package_information_regex = re.compile( - r"Name\s+: (?P[a-zA-Z0-9:_\-\.]+)\r?\n" - r"Version\s+: (?P[a-zA-Z0-9:_\-\.~+]+)\r?\n" - ) + _suse_version_splitter_regex = re.compile( r"([0-9]+:)?" # some examples have a mystery number followed by a ':' (git) r"(?P[0-9]+)\." # major @@ -2139,33 +2117,17 @@ def _is_package_in_repo(self, package: str) -> bool: result = self._node.execute(command, sudo=True, shell=True) return 0 == result.exit_code - def _get_package_information(self, package_name: str) -> VersionInfo: - # run update of package info - zypper_info = self._node.execute( - f"zypper info {package_name}", + def _get_package_information(self, package_name: str) -> LisaVersionInfo: + query_result = self._node.execute( + f"rpm --queryformat '%{{VERSION}}-%{{RELEASE}}' -q {package_name}", expected_exit_code=0, expected_exit_code_failure_message=( f"Could not find package information for package {package_name}" ), ) - output = self._ansi_escape.sub("", zypper_info.stdout) - match = self._suse_package_information_regex.search(output) - if not match: - raise LisaException( - "Package information parsing could not find regex match " - f" for {package_name} using regex " - f"{self._suse_package_information_regex.pattern}" - ) - version_str = match.group("package_version") - match = self._suse_version_splitter_regex.search(version_str) - if not match: - raise LisaException( - f"Could not parse version info: {version_str} " - f"for package {package_name}" - ) - self._node.log.debug(f"Attempting to parse version string: {version_str}") - version_info = self._get_version_info_from_named_regex_match( - package_name, match + version_str = query_result.stdout.strip() + version_info = self._get_version_info_from_regex( + version_str, self._suse_version_splitter_regex ) return self._cache_and_return_version_info(package_name, version_info) diff --git a/lisa/util/__init__.py b/lisa/util/__init__.py index d590e8cc92..60f49f430a 100644 --- a/lisa/util/__init__.py +++ b/lisa/util/__init__.py @@ -418,6 +418,17 @@ def enable(self) -> None: self._switch(True) +class LisaVersionInfo(VersionInfo): + def __init__(self, version_str: str, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) + self.version_str = version_str + + @classmethod + def parse(cls, version: str) -> "LisaVersionInfo": + version_info = VersionInfo.parse(version) + return LisaVersionInfo(version, *version_info.to_tuple()) + + def get_date_str(current: Optional[datetime] = None) -> str: if current is None: current = datetime.now() @@ -654,7 +665,7 @@ def dump_file(file_name: Path, content: Any) -> None: f.write(secret.mask(content)) -def parse_version(version: str) -> VersionInfo: +def parse_version(version: str) -> LisaVersionInfo: """ Convert an incomplete version string into a semver-compatible Version object @@ -672,8 +683,8 @@ def parse_version(version: str) -> VersionInfo: belong to a basic version. :rtype: tuple(:class:`Version` | None, str) """ - if VersionInfo.isvalid(version): - return VersionInfo.parse(version) + if LisaVersionInfo.isvalid(version): + return LisaVersionInfo.parse(version) match = __version_info_pattern.search(version) if not match: @@ -685,9 +696,9 @@ def parse_version(version: str) -> VersionInfo: if key != "prerelease" } ver["prerelease"] = match["prerelease"] - rest = match.string[match.end() :] # noqa:E203 + rest = match.string[match.end() :] ver["build"] = rest - release_version = VersionInfo(**ver) + release_version = LisaVersionInfo(version, **ver) return release_version From 83004c9fc5dcd09cafaa7a343c714dca13927e06 Mon Sep 17 00:00:00 2001 From: Thien Trung Vuong Date: Fri, 7 Feb 2025 02:08:23 +0000 Subject: [PATCH 4/4] Add CVM boot test suite for Azure Linux CVM Implement 2 new test cases: - verify_encrypted_root_partition: check that the root partition on an Azure Linux CVM is encrypted when deployed with "DiskWithVMGuestState" encryption setting - verify_boot_success_after_component_upgrade: check that a CVM can reboot after a boot component is upgraded, and PCR values are updated correctly Signed-off-by: Thien Trung Vuong --- microsoft/testsuites/cvm/cvm_boot.py | 146 +++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 microsoft/testsuites/cvm/cvm_boot.py diff --git a/microsoft/testsuites/cvm/cvm_boot.py b/microsoft/testsuites/cvm/cvm_boot.py new file mode 100644 index 0000000000..81d610ce77 --- /dev/null +++ b/microsoft/testsuites/cvm/cvm_boot.py @@ -0,0 +1,146 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import itertools +from pathlib import Path +from typing import Any, Dict, cast + +from assertpy.assertpy import assert_that, assert_warn + +from lisa import ( + Logger, + Node, + RemoteNode, + TestCaseMetadata, + TestSuite, + TestSuiteMetadata, +) +from lisa.environment import EnvironmentStatus +from lisa.features.security_profile import ( + CvmEnabled, + SecurityProfile, + SecurityProfileSettings, +) +from lisa.operating_system import CBLMariner, Posix +from lisa.sut_orchestrator import AZURE +from lisa.testsuite import simple_requirement +from lisa.tools import BootCtl, Lsblk, Tpm2 +from lisa.tools.lsblk import PartitionInfo +from lisa.util import SkippedException, UnsupportedDistroException + + +@TestSuiteMetadata( + area="cvm", + category="functional", + description="""This test suite covers some common scenarios related to + CVM boot on Azure. + """, +) +class CVMBootTestSuite(TestSuite): + def before_case(self, log: Logger, **kwargs: Any) -> None: + node: Node = kwargs["node"] + if not isinstance(node.os, CBLMariner): + raise SkippedException( + UnsupportedDistroException( + node.os, "CVM boot test supports only Azure Linux." + ) + ) + + @TestCaseMetadata( + description="""This test verifies that TPM enrollment is done correctly on + a CVM with encrypted root partition + """, + priority=2, + requirement=simple_requirement( + supported_features=[CvmEnabled()], + supported_platform_type=[AZURE], + ), + ) + def verify_encrypted_root_partition(self, node: RemoteNode) -> None: + security_profile_settings = cast( + SecurityProfileSettings, node.features[SecurityProfile].get_settings() + ) + if not security_profile_settings.encrypt_disk: + raise SkippedException("This test requires disk encryption to be enabled") + + disks = node.tools[Lsblk].get_disks(force_run=True) + root_device = node.tools[BootCtl].get_root_device() + partitions = itertools.chain.from_iterable(disk.partitions for disk in disks) + root_partition = next( + (p for p in partitions if p.device_name == root_device), None + ) + + assert_that(root_partition, "Cannot locate root partition").is_not_none() + assert isinstance(root_partition, PartitionInfo) + assert_that(root_partition.fstype).is_equal_to("crypto_LUKS") + + @TestCaseMetadata( + description="""This test case verifies that a CVM can still boot if any boot + component is upgraded. + + Steps: + 1. On first boot, check current PCR values for PCR4 and PCR7 + 2. Get current boot components versions (e.g. shim, grub, systemd-boot, uki) + 3. Run a package upgrade to update boot components + 4. Get new boot components versions to see if anything has changed + 5. Reboot the CVM, make sure the CVM can boot up again + 6. PCR4 should change if any of the boot components is upgraded + 7. PCR7 may change (for example, if a signing certificate is changed) + """, + priority=1, + requirement=simple_requirement( + environment_status=EnvironmentStatus.Connected, + supported_features=[CvmEnabled()], + supported_platform_type=[AZURE], + ), + ) + def verify_boot_success_after_component_upgrade( + self, log: Logger, node: RemoteNode, log_path: Path + ) -> None: + posix_os: Posix = cast(Posix, node.os) + # First boot - no package upgrade has been performed + # Check PCR values (PCR4, PCR7) + pcrs_before_reboot = node.tools[Tpm2].pcrread(pcrs=[4, 7]) + + # - Get current boot components versions (shim, systemd-boot, kernel-uki) + boot_components = ["shim", "systemd-boot", "kernel-uki"] + boot_components_versions: Dict[str, str] = dict() + for pkg in boot_components: + pkg_version = posix_os.get_package_information(pkg, use_cached=False) + boot_components_versions[pkg] = pkg_version.version_str + + # Upgrade boot components + posix_os.update_packages(boot_components) + + # Get new boot components versions + boot_components_new_versions: Dict[str, str] = dict() + for pkg in boot_components: + pkg_version = posix_os.get_package_information(pkg, use_cached=False) + boot_components_new_versions[pkg] = pkg_version.version_str + + # Reboot + node.reboot() + + # VM is up again + pcrs_after_reboot = node.tools[Tpm2].pcrread(pcrs=[4, 7]) + boot_component_changed = ( + boot_components_versions != boot_components_new_versions + ) + + # - PCR4 should change if any of the boot components is upgraded + # - PCR7 may change if a signing cert is changed + if boot_component_changed: + assert_that( + pcrs_after_reboot[4], + "PCR4 value is still the same even though a boot component changed", + ).is_not_equal_to(pcrs_before_reboot[4]) + assert_warn( + pcrs_after_reboot[7], + "PCR7 changed after a boot component changed, this may happen if a" + " signing certificate was updated", + ).is_equal_to(pcrs_before_reboot[7]) + else: + assert_that( + pcrs_after_reboot, + "PCR values changed even though no boot component was updated", + ).is_equal_to(pcrs_before_reboot)