diff --git a/lisa/operating_system.py b/lisa/operating_system.py index f94980dd5f..eafe36ea4c 100644 --- a/lisa/operating_system.py +++ b/lisa/operating_system.py @@ -119,6 +119,12 @@ class KernelInformation: version_parts: List[str] +@dataclass +class PackageInformation: + name: str + version_str: str + + class OperatingSystem: __lsb_release_pattern = re.compile(r"^Description:[ \t]+([\w]+)[ ]+$", re.M) # NAME="Oracle Linux Server" @@ -500,6 +506,9 @@ def get_package_information( return found return self._get_package_information(package_name) + def query_package(self, package_name: str) -> PackageInformation: + return self._query_package(package_name) + def get_repositories(self) -> List[RepositoryInfo]: raise NotImplementedError("get_repositories is not implemented") @@ -552,6 +561,9 @@ def _initialize_package_installation(self) -> None: def _get_package_information(self, package_name: str) -> VersionInfo: raise NotImplementedError() + def _query_package(self, package_name: str) -> PackageInformation: + raise NotImplementedError() + def _get_version_info_from_named_regex_match( self, package_name: str, named_matches: Match[str] ) -> VersionInfo: @@ -577,8 +589,8 @@ def _get_version_info_from_named_regex_match( build_match = named_matches.group("build") log_message = ( f"Found {package_name} version " - f"major:{major_match} minor:{minor_match} " - f"patch:{patch_match} build:{build_match}" + f"major:{major_match} minor:{minor_match} " # noqa: E231 + f"patch:{patch_match} build:{build_match}" # noqa: E231 ) self._node.log.debug(log_message) return VersionInfo(major, minor, patch, build=build_match) @@ -693,7 +705,7 @@ def __resolve_package_name(self, package: Union[str, Tool, Type[Tool]]) -> str: elif isinstance(package, Tool): package_name = package.package_name else: - assert isinstance(package, type), f"actual:{type(package)}" + assert isinstance(package, type), f"actual:{type(package)}" # noqa: E231 # Create a temp object, it doesn't query. # So they can be queried together. tool = package.create(self._node) @@ -867,6 +879,17 @@ def _get_package_information(self, package_name: str) -> VersionInfo: ) return self._cache_and_return_version_info(package_name, version_info) + def _query_package(self, package_name: str) -> PackageInformation: + query_result = self._node.execute( + f"dpkg-query -f '${{Version}}' -W {package_name}", + expected_exit_code=0, + expected_exit_code_failure_message=( + f"Could not find package information for package {package_name}" + ), + ) + version_str = query_result.stdout.strip() + return PackageInformation(name=package_name, version_str=version_str) + def add_azure_core_repo( self, repo_name: Optional[AzureCoreRepo] = None, code_name: Optional[str] = None ) -> None: @@ -899,7 +922,9 @@ def add_azure_core_repo( code_name = self.information.codename repo_name = AzureCoreRepo.AzureCoreDebian - repo_url = f"http://packages.microsoft.com/repos/{repo_name.value}/" + repo_url = ( + f"http://packages.microsoft.com/repos/{repo_name.value}/" # noqa: E231 + ) self.add_repository( repo=(f"deb [arch={arch_name}] {repo_url} {code_name} main"), keys_location=keys, @@ -1121,7 +1146,7 @@ def _package_exists(self, package: str) -> bool: # vim deinstall # vim-common install # auoms hold - package_pattern = re.compile(f"{package}([ \t]+)(install|hold)") + package_pattern = re.compile(f"{package}([ \t]+)(install|hold)") # noqa: E201 if len(list(filter(package_pattern.match, result.stdout.splitlines()))) == 1: return True return False @@ -1557,6 +1582,17 @@ def _get_package_information(self, package_name: str) -> VersionInfo: ) return self._cache_and_return_version_info(package_name, version_info) + def _query_package(self, package_name: str) -> PackageInformation: + query_result = self._node.execute( + f"rpm --queryformat '%{{VERSION}}-%{{RELEASE}}' -q {package_name}", + expected_exit_code=0, + expected_exit_code_failure_message=( + f"Could not find package information for package {package_name}" + ), + ) + version_str = query_result.stdout.strip() + return PackageInformation(name=package_name, version_str=version_str) + def _install_packages( self, packages: List[str], @@ -1659,7 +1695,7 @@ def install_epel(self) -> None: ).is_greater_than_or_equal_to(7) epel_release_rpm_name = f"epel-release-latest-{major}.noarch.rpm" self.install_packages( - f"https://dl.fedoraproject.org/pub/epel/{epel_release_rpm_name}" + f"https://dl.fedoraproject.org/pub/epel/{epel_release_rpm_name}" # noqa: E231 ) # replace $releasever to 8 for 8.x @@ -2169,6 +2205,17 @@ def _get_package_information(self, package_name: str) -> VersionInfo: ) return self._cache_and_return_version_info(package_name, version_info) + def _query_package(self, package_name: str) -> PackageInformation: + query_result = self._node.execute( + f"rpm --queryformat '%{{VERSION}}-%{{RELEASE}}' -q {package_name}", + expected_exit_code=0, + expected_exit_code_failure_message=( + f"Could not find package information for package {package_name}" + ), + ) + version_str = query_result.stdout.strip() + return PackageInformation(name=package_name, version_str=version_str) + class SLES(Suse): @classmethod diff --git a/lisa/tools/__init__.py b/lisa/tools/__init__.py index 95d0f48fda..2db6493c82 100644 --- a/lisa/tools/__init__.py +++ b/lisa/tools/__init__.py @@ -118,6 +118,7 @@ from .texinfo import Texinfo from .timedatectl import Timedatectl from .timeout import Timeout +from .tpm2 import Tpm2 from .unzip import Unzip from .uptime import Uptime from .usermod import Usermod @@ -251,6 +252,7 @@ "TcpDump", "Timedatectl", "Timeout", + "Tpm2", "Uname", "Unzip", "Uptime", diff --git a/lisa/tools/tpm2.py b/lisa/tools/tpm2.py new file mode 100644 index 0000000000..5599a0b0ff --- /dev/null +++ b/lisa/tools/tpm2.py @@ -0,0 +1,69 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +from typing import Dict, List, Sequence + +from lisa.executable import Tool +from lisa.operating_system import CBLMariner +from lisa.util import LisaException + + +class Tpm2(Tool): + @property + def command(self) -> str: + return "tpm2" + + @property + def can_install(self) -> bool: + return True + + def _install(self) -> bool: + if isinstance(self.node.os, CBLMariner): + self.node.os.install_packages("tpm2-tools") + else: + raise LisaException( + f"tool {self.command} can't be installed in distro {self.node.os.name}." + ) + return self._check_exists() + + def pcrread( + self, + alg: str = "sha256", + pcrs: int | Sequence[int] | None = None, + ) -> Dict[int, str]: + pcrs = self._get_pcr_list(pcrs) + if len(pcrs) == 0: + pcrs_arg = "all" + else: + pcrs_arg = ",".join(map(str, pcrs)) + cmd = f"pcrread {alg}:{pcrs_arg}" # noqa: E231 + cmd_result = self.run( + cmd, + expected_exit_code=0, + expected_exit_code_failure_message="failed to read PCR values", + shell=True, + sudo=True, + force_run=True, + ) + output = cmd_result.stdout + + lines = [line.strip() for line in output.splitlines()] + # first line of output will have the format ":", e.g "sha256:" + assert ( + lines[0][:-1] == alg + ), "pcrread output does not contain the requested algorithm" + + result = dict() + for line in lines[1:]: + pcr, hash_value = line.split(":", 1) + pcr_index = int(pcr.strip()) + hash_value = hash_value.strip().lower() + result[pcr_index] = hash_value + return result + + def _get_pcr_list(self, pcrs: int | Sequence[int] | None) -> List[int]: + if pcrs is None: + return [] + if isinstance(pcrs, int): + return [pcrs] + return [pcr for pcr in pcrs] diff --git a/microsoft/testsuites/cvm/cvm_boot.py b/microsoft/testsuites/cvm/cvm_boot.py new file mode 100644 index 0000000000..c930268ceb --- /dev/null +++ b/microsoft/testsuites/cvm/cvm_boot.py @@ -0,0 +1,176 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +from pathlib import Path +from typing import Any, List, cast + +from assertpy.assertpy import assert_that, assert_warn + +from lisa import ( + Environment, + Logger, + Node, + RemoteNode, + TestCaseMetadata, + TestSuite, + TestSuiteMetadata, +) +from lisa.features.security_profile import ( + CvmEnabled, + SecurityProfile, + SecurityProfileSettings, +) +from lisa.operating_system import CBLMariner, Posix +from lisa.sut_orchestrator import AZURE +from lisa.testsuite import TestResult, simple_requirement +from lisa.tools import Lsblk, Reboot, Tpm2 +from lisa.tools.lsblk import PartitionInfo +from lisa.util import ( + SkippedException, + TcpConnectionException, + UnsupportedDistroException, + constants, +) +from lisa.util.shell import wait_tcp_port_ready + + +@TestSuiteMetadata( + area="cvm", + category="functional", + description="""This test suite covers some common scenarios related to + CVM boot on Azure. + """, +) +class CVMBootTestSuite(TestSuite): + def before_case(self, log: Logger, **kwargs: Any) -> None: + node: Node = kwargs["node"] + if not isinstance(node.os, CBLMariner): + raise SkippedException( + UnsupportedDistroException( + node.os, "CVM boot test supports only Azure Linux." + ) + ) + + @TestCaseMetadata( + description="""This test verifies that TPM enrollment is done correctly on + a CVM with encrypted root partition + """, + priority=2, + requirement=simple_requirement( + supported_features=[CvmEnabled()], + supported_platform_type=[AZURE], + ), + ) + def verify_encrypted_root_partition( + self, + log: Logger, + node: RemoteNode, + environment: Environment, + log_path: Path, + result: TestResult, + ) -> None: + security_profile_settings = cast( + SecurityProfileSettings, node.features[SecurityProfile].get_settings() + ) + if not security_profile_settings.encrypt_disk: + raise SkippedException("This test requires disk encryption to be enabled") + lsblk = node.tools[Lsblk] + disks = lsblk.get_disks(force_run=True) + partitions: List[PartitionInfo] = next( + (d.partitions for d in disks if d.name == "sda"), [] + ) + assert_that(partitions, "Cannot find a disk named 'sda'").is_not_empty() + root_partition = next((p for p in partitions if p.name == "sda2"), None) + assert_that(root_partition, "Cannot locate root partition").is_not_none() + assert isinstance(root_partition, PartitionInfo) + assert_that(root_partition.fstype).is_equal_to("crypto_LUKS") + + @TestCaseMetadata( + description="""This test case verifies that a CVM can still boot if any boot + component is upgraded. + + Steps: + 1. On first boot, check current PCR values for PCR4 and PCR7 + 2. Get current boot components versions (e.g. shim, grub, systemd-boot, uki) + 3. Run a package upgrade to update boot components + 4. Get new boot components versions to see if anything has changed + 5. Reboot the CVM, make sure the CVM can boot up again + 6. PCR4 should change if any of the boot components is upgraded + 7. PCR7 may change (for example, if a signing certificate is changed) + """, + priority=1, + requirement=simple_requirement( + supported_features=[CvmEnabled()], + supported_platform_type=[AZURE], + ), + ) + def verify_boot_success_after_component_upgrade( + self, + log: Logger, + node: RemoteNode, + environment: Environment, + log_path: Path, + result: TestResult, + **kwargs: Any, + ) -> None: + posix_os: Posix = cast(Posix, node.os) + # First boot + # - Check PCR values (PCR4, PCR7) + pcrs_before_reboot = node.tools[Tpm2].pcrread(pcrs=[4, 7]) + + # - Get current boot components versions (shim, systemd-boot, kernel-uki) + boot_components = ["shim", "systemd-boot", "kernel-uki"] + boot_components_versions = dict() + for pkg in boot_components: + pkg_info = posix_os.query_package(pkg) + boot_components_versions[pkg] = pkg_info.version_str + + # - Upgrade boot components + posix_os.update_packages(boot_components) + + # - Get new boot components versions + boot_components_new_versions = dict() + for pkg in boot_components: + pkg_info = posix_os.query_package(pkg) + boot_components_new_versions[pkg] = pkg_info.version_str + + # Reboot + # - Make sure VM boots up again + reboot_tool = node.tools[Reboot] + reboot_tool.reboot_and_check_panic(log_path) + is_ready, tcp_error_code = wait_tcp_port_ready( + node.connection_info[constants.ENVIRONMENTS_NODES_REMOTE_ADDRESS], + node.connection_info[constants.ENVIRONMENTS_NODES_REMOTE_PORT], + log=log, + ) + if not is_ready: + raise TcpConnectionException( + node.connection_info[constants.ENVIRONMENTS_NODES_REMOTE_ADDRESS], + node.connection_info[constants.ENVIRONMENTS_NODES_REMOTE_PORT], + tcp_error_code, + "no panic found in serial log", + ) + + pcrs_after_reboot = node.tools[Tpm2].pcrread(pcrs=[4, 7]) + boot_component_changed = any( + boot_components_versions[pkg] != boot_components_new_versions[pkg] + for pkg in boot_components + ) + + # - PCR4 should change if any of the boot components is upgraded + # - PCR7 may change if a signing cert is changed + if boot_component_changed: + assert_that( + pcrs_after_reboot[4], + "PCR4 value is still the same even though a boot component changed", + ).is_not_equal_to(pcrs_before_reboot[4]) + assert_warn( + pcrs_after_reboot[7], + "PCR7 changed after a boot component changed, this may happen if a" + " signing certificate was updated", + ).is_equal_to(pcrs_before_reboot[7]) + else: + assert_that( + pcrs_after_reboot, + "PCR values changed even though no boot component was updated", + ).is_equal_to(pcrs_before_reboot)