Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CVM: Add boot/reboot tests #3624

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 53 additions & 6 deletions lisa/operating_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ class KernelInformation:
version_parts: List[str]


@dataclass
class PackageInformation:
name: str
version_str: str


class OperatingSystem:
__lsb_release_pattern = re.compile(r"^Description:[ \t]+([\w]+)[ ]+$", re.M)
# NAME="Oracle Linux Server"
Expand Down Expand Up @@ -500,6 +506,9 @@ def get_package_information(
return found
return self._get_package_information(package_name)

def query_package(self, package_name: str) -> PackageInformation:
return self._query_package(package_name)

def get_repositories(self) -> List[RepositoryInfo]:
raise NotImplementedError("get_repositories is not implemented")

Expand Down Expand Up @@ -552,6 +561,9 @@ def _initialize_package_installation(self) -> None:
def _get_package_information(self, package_name: str) -> VersionInfo:
raise NotImplementedError()

def _query_package(self, package_name: str) -> PackageInformation:
raise NotImplementedError()

def _get_version_info_from_named_regex_match(
self, package_name: str, named_matches: Match[str]
) -> VersionInfo:
Expand All @@ -577,8 +589,8 @@ def _get_version_info_from_named_regex_match(
build_match = named_matches.group("build")
log_message = (
f"Found {package_name} version "
f"major:{major_match} minor:{minor_match} "
f"patch:{patch_match} build:{build_match}"
f"major:{major_match} minor:{minor_match} " # noqa: E231
f"patch:{patch_match} build:{build_match}" # noqa: E231
)
self._node.log.debug(log_message)
return VersionInfo(major, minor, patch, build=build_match)
Expand Down Expand Up @@ -693,7 +705,7 @@ def __resolve_package_name(self, package: Union[str, Tool, Type[Tool]]) -> str:
elif isinstance(package, Tool):
package_name = package.package_name
else:
assert isinstance(package, type), f"actual:{type(package)}"
assert isinstance(package, type), f"actual:{type(package)}" # noqa: E231
# Create a temp object, it doesn't query.
# So they can be queried together.
tool = package.create(self._node)
Expand Down Expand Up @@ -867,6 +879,17 @@ def _get_package_information(self, package_name: str) -> VersionInfo:
)
return self._cache_and_return_version_info(package_name, version_info)

def _query_package(self, package_name: str) -> PackageInformation:
query_result = self._node.execute(
f"dpkg-query -f '${{Version}}' -W {package_name}",
expected_exit_code=0,
expected_exit_code_failure_message=(
f"Could not find package information for package {package_name}"
),
)
version_str = query_result.stdout.strip()
return PackageInformation(name=package_name, version_str=version_str)

def add_azure_core_repo(
self, repo_name: Optional[AzureCoreRepo] = None, code_name: Optional[str] = None
) -> None:
Expand Down Expand Up @@ -899,7 +922,9 @@ def add_azure_core_repo(
code_name = self.information.codename
repo_name = AzureCoreRepo.AzureCoreDebian

repo_url = f"http://packages.microsoft.com/repos/{repo_name.value}/"
repo_url = (
f"http://packages.microsoft.com/repos/{repo_name.value}/" # noqa: E231
)
self.add_repository(
repo=(f"deb [arch={arch_name}] {repo_url} {code_name} main"),
keys_location=keys,
Expand Down Expand Up @@ -1121,7 +1146,7 @@ def _package_exists(self, package: str) -> bool:
# vim deinstall
# vim-common install
# auoms hold
package_pattern = re.compile(f"{package}([ \t]+)(install|hold)")
package_pattern = re.compile(f"{package}([ \t]+)(install|hold)") # noqa: E201
if len(list(filter(package_pattern.match, result.stdout.splitlines()))) == 1:
return True
return False
Expand Down Expand Up @@ -1557,6 +1582,17 @@ def _get_package_information(self, package_name: str) -> VersionInfo:
)
return self._cache_and_return_version_info(package_name, version_info)

def _query_package(self, package_name: str) -> PackageInformation:
query_result = self._node.execute(
f"rpm --queryformat '%{{VERSION}}-%{{RELEASE}}' -q {package_name}",
expected_exit_code=0,
expected_exit_code_failure_message=(
f"Could not find package information for package {package_name}"
),
)
version_str = query_result.stdout.strip()
return PackageInformation(name=package_name, version_str=version_str)

def _install_packages(
self,
packages: List[str],
Expand Down Expand Up @@ -1659,7 +1695,7 @@ def install_epel(self) -> None:
).is_greater_than_or_equal_to(7)
epel_release_rpm_name = f"epel-release-latest-{major}.noarch.rpm"
self.install_packages(
f"https://dl.fedoraproject.org/pub/epel/{epel_release_rpm_name}"
f"https://dl.fedoraproject.org/pub/epel/{epel_release_rpm_name}" # noqa: E231
)

# replace $releasever to 8 for 8.x
Expand Down Expand Up @@ -2169,6 +2205,17 @@ def _get_package_information(self, package_name: str) -> VersionInfo:
)
return self._cache_and_return_version_info(package_name, version_info)

def _query_package(self, package_name: str) -> PackageInformation:
query_result = self._node.execute(
f"rpm --queryformat '%{{VERSION}}-%{{RELEASE}}' -q {package_name}",
expected_exit_code=0,
expected_exit_code_failure_message=(
f"Could not find package information for package {package_name}"
),
)
version_str = query_result.stdout.strip()
return PackageInformation(name=package_name, version_str=version_str)


class SLES(Suse):
@classmethod
Expand Down
2 changes: 2 additions & 0 deletions lisa/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
from .texinfo import Texinfo
from .timedatectl import Timedatectl
from .timeout import Timeout
from .tpm2 import Tpm2
from .unzip import Unzip
from .uptime import Uptime
from .usermod import Usermod
Expand Down Expand Up @@ -251,6 +252,7 @@
"TcpDump",
"Timedatectl",
"Timeout",
"Tpm2",
"Uname",
"Unzip",
"Uptime",
Expand Down
69 changes: 69 additions & 0 deletions lisa/tools/tpm2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

from typing import Dict, List, Sequence

from lisa.executable import Tool
from lisa.operating_system import CBLMariner
from lisa.util import LisaException


class Tpm2(Tool):
@property
def command(self) -> str:
return "tpm2"

@property
def can_install(self) -> bool:
return True

def _install(self) -> bool:
if isinstance(self.node.os, CBLMariner):
self.node.os.install_packages("tpm2-tools")
else:
raise LisaException(
f"tool {self.command} can't be installed in distro {self.node.os.name}."
)
return self._check_exists()

def pcrread(
self,
alg: str = "sha256",
pcrs: int | Sequence[int] | None = None,
) -> Dict[int, str]:
pcrs = self._get_pcr_list(pcrs)
if len(pcrs) == 0:
pcrs_arg = "all"
else:
pcrs_arg = ",".join(map(str, pcrs))
cmd = f"pcrread {alg}:{pcrs_arg}" # noqa: E231
cmd_result = self.run(
cmd,
expected_exit_code=0,
expected_exit_code_failure_message="failed to read PCR values",
shell=True,
sudo=True,
force_run=True,
)
output = cmd_result.stdout

lines = [line.strip() for line in output.splitlines()]
# first line of output will have the format "<alg-name>:", e.g "sha256:"
assert (
lines[0][:-1] == alg
), "pcrread output does not contain the requested algorithm"

result = dict()
for line in lines[1:]:
pcr, hash_value = line.split(":", 1)
pcr_index = int(pcr.strip())
hash_value = hash_value.strip().lower()
result[pcr_index] = hash_value
return result

def _get_pcr_list(self, pcrs: int | Sequence[int] | None) -> List[int]:
if pcrs is None:
return []
if isinstance(pcrs, int):
return [pcrs]
return [pcr for pcr in pcrs]
176 changes: 176 additions & 0 deletions microsoft/testsuites/cvm/cvm_boot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

from pathlib import Path
from typing import Any, List, cast

from assertpy.assertpy import assert_that, assert_warn

from lisa import (
Environment,
Logger,
Node,
RemoteNode,
TestCaseMetadata,
TestSuite,
TestSuiteMetadata,
)
from lisa.features.security_profile import (
CvmEnabled,
SecurityProfile,
SecurityProfileSettings,
)
from lisa.operating_system import CBLMariner, Posix
from lisa.sut_orchestrator import AZURE
from lisa.testsuite import TestResult, simple_requirement
from lisa.tools import Lsblk, Reboot, Tpm2
from lisa.tools.lsblk import PartitionInfo
from lisa.util import (
SkippedException,
TcpConnectionException,
UnsupportedDistroException,
constants,
)
from lisa.util.shell import wait_tcp_port_ready


@TestSuiteMetadata(
area="cvm",
category="functional",
description="""This test suite covers some common scenarios related to
CVM boot on Azure.
""",
)
class CVMBootTestSuite(TestSuite):
def before_case(self, log: Logger, **kwargs: Any) -> None:
node: Node = kwargs["node"]
if not isinstance(node.os, CBLMariner):
raise SkippedException(
UnsupportedDistroException(
node.os, "CVM boot test supports only Azure Linux."
)
)

@TestCaseMetadata(
description="""This test verifies that TPM enrollment is done correctly on
a CVM with encrypted root partition
""",
priority=2,
requirement=simple_requirement(
supported_features=[CvmEnabled()],
supported_platform_type=[AZURE],
),
)
def verify_encrypted_root_partition(
self,
log: Logger,
node: RemoteNode,
environment: Environment,
log_path: Path,
result: TestResult,
) -> None:
security_profile_settings = cast(
SecurityProfileSettings, node.features[SecurityProfile].get_settings()
)
if not security_profile_settings.encrypt_disk:
raise SkippedException("This test requires disk encryption to be enabled")
lsblk = node.tools[Lsblk]
disks = lsblk.get_disks(force_run=True)
partitions: List[PartitionInfo] = next(
(d.partitions for d in disks if d.name == "sda"), []
)
assert_that(partitions, "Cannot find a disk named 'sda'").is_not_empty()
root_partition = next((p for p in partitions if p.name == "sda2"), None)
assert_that(root_partition, "Cannot locate root partition").is_not_none()
assert isinstance(root_partition, PartitionInfo)
assert_that(root_partition.fstype).is_equal_to("crypto_LUKS")

@TestCaseMetadata(
description="""This test case verifies that a CVM can still boot if any boot
component is upgraded.

Steps:
1. On first boot, check current PCR values for PCR4 and PCR7
2. Get current boot components versions (e.g. shim, grub, systemd-boot, uki)
3. Run a package upgrade to update boot components
4. Get new boot components versions to see if anything has changed
5. Reboot the CVM, make sure the CVM can boot up again
6. PCR4 should change if any of the boot components is upgraded
7. PCR7 may change (for example, if a signing certificate is changed)
""",
priority=1,
requirement=simple_requirement(
supported_features=[CvmEnabled()],
supported_platform_type=[AZURE],
),
)
def verify_boot_success_after_component_upgrade(
self,
log: Logger,
node: RemoteNode,
environment: Environment,
log_path: Path,
result: TestResult,
**kwargs: Any,
) -> None:
posix_os: Posix = cast(Posix, node.os)
# First boot
# - Check PCR values (PCR4, PCR7)
pcrs_before_reboot = node.tools[Tpm2].pcrread(pcrs=[4, 7])

# - Get current boot components versions (shim, systemd-boot, kernel-uki)
boot_components = ["shim", "systemd-boot", "kernel-uki"]
boot_components_versions = dict()
for pkg in boot_components:
pkg_info = posix_os.query_package(pkg)
boot_components_versions[pkg] = pkg_info.version_str

# - Upgrade boot components
posix_os.update_packages(boot_components)

# - Get new boot components versions
boot_components_new_versions = dict()
for pkg in boot_components:
pkg_info = posix_os.query_package(pkg)
boot_components_new_versions[pkg] = pkg_info.version_str

# Reboot
# - Make sure VM boots up again
reboot_tool = node.tools[Reboot]
reboot_tool.reboot_and_check_panic(log_path)
is_ready, tcp_error_code = wait_tcp_port_ready(
node.connection_info[constants.ENVIRONMENTS_NODES_REMOTE_ADDRESS],
node.connection_info[constants.ENVIRONMENTS_NODES_REMOTE_PORT],
log=log,
)
if not is_ready:
raise TcpConnectionException(
node.connection_info[constants.ENVIRONMENTS_NODES_REMOTE_ADDRESS],
node.connection_info[constants.ENVIRONMENTS_NODES_REMOTE_PORT],
tcp_error_code,
"no panic found in serial log",
)

pcrs_after_reboot = node.tools[Tpm2].pcrread(pcrs=[4, 7])
boot_component_changed = any(
boot_components_versions[pkg] != boot_components_new_versions[pkg]
for pkg in boot_components
)

# - PCR4 should change if any of the boot components is upgraded
# - PCR7 may change if a signing cert is changed
if boot_component_changed:
assert_that(
pcrs_after_reboot[4],
"PCR4 value is still the same even though a boot component changed",
).is_not_equal_to(pcrs_before_reboot[4])
assert_warn(
pcrs_after_reboot[7],
"PCR7 changed after a boot component changed, this may happen if a"
" signing certificate was updated",
).is_equal_to(pcrs_before_reboot[7])
else:
assert_that(
pcrs_after_reboot,
"PCR values changed even though no boot component was updated",
).is_equal_to(pcrs_before_reboot)
Loading