diff --git a/surfactant/infoextractors/docker_image.py b/surfactant/infoextractors/docker_image.py index 06cfa475..4b677be7 100644 --- a/surfactant/infoextractors/docker_image.py +++ b/surfactant/infoextractors/docker_image.py @@ -5,8 +5,9 @@ import gzip import json import subprocess +import tarfile import tempfile -from typing import Optional +from typing import IO, Any, Optional from loguru import logger @@ -14,6 +15,44 @@ from surfactant.configmanager import ConfigManager from surfactant.sbomtypes import SBOM, Software +### =============================== +### Utility Predicates +### =============================== + + +def is_oci_archive(filename: str) -> bool: + """Return True if given file is a tarball + roughly matching the OCI specification""" + + with tarfile.open(filename) as this_tarfile: # oci-layout only path ensured + return "oci-layout" in this_tarfile.getmembers() + + +def supports_file(filetype: str) -> bool: + """Check if the file type is supported.""" + return filetype in ("DOCKER_TAR", "DOCKER_GZIP") + + +### =============================== +### Archive Utilities +### =============================== + + +def gunzip_tarball(filename: str) -> object: + """Unzip a gzipped tarball to a temporary file + and return the name of the corresponding file.""" + with open(filename, "rb") as gzip_in: + gzip_data = gzip_in.read() + with tempfile.NamedTemporaryFile() as gzip_out: + gzip_out.write(gzip.decompress(gzip_data)) + gzip_out.flush() # Ensure data is written before reading + return gzip_out.name + + +### =============================== +### Extraction Procedures +### =============================== + class DockerScoutManager: def __init__(self) -> None: @@ -45,7 +84,7 @@ def check_docker_scout_installed(self) -> None: "You can also disable this check by running 'surfactant config docker.enable_docker_scout false'." ) - def run_docker_scout(self, filename: str) -> object: + def run_docker_scout(self, filename: str) -> Optional[object]: """Run Docker Scout on the given file and return the results.""" if self.disable_docker_scout: return {} # Do nothing if Docker Scout is disabled by config @@ -58,29 +97,77 @@ def run_docker_scout(self, filename: str) -> object: ) if result.returncode != 0: logger.warning(f"Running Docker Scout on {filename} failed") - return {} + return None spdx_out = json.loads(result.stdout) - return {"dockerSPDX": spdx_out} + return spdx_out except json.JSONDecodeError as e: logger.error(f"Failed to parse Docker Scout output for {filename}: {e}") - return {} + return None # Initialize DockerScoutManager to check installation status dsManager = DockerScoutManager() -def supports_file(filetype: str) -> bool: - """Check if the file type is supported.""" - return filetype in ("DOCKER_TAR", "DOCKER_GZIP") +def extract_configs(filename: str): + """Return image configuration objects mapped by their paths.""" + + def get_manifest_file_from_tarball(tarball: tarfile.TarFile) -> IO[bytes] | None: + return tarball.extractfile( + {tarinfo.name: tarinfo for tarinfo in tarball.getmembers()}["manifest.json"] + ) + + def get_config_file_from_tarball(tarball: tarfile.TarFile, path: str) -> Optional[IO[bytes]]: + return tarball.extractfile( + {tarinfo.name: tarinfo for tarinfo in tarball.getmembers()}[path] + ) + + def get_config_path_from_manifest(manifest: list[dict[str, Any]]) -> list[str]: + path = "Config" + return [entry[path] for entry in manifest] + + # currently unused + def get_repo_tags_from_manifest(manifest: list[dict[str, Any]]) -> list[str]: + path = "RepoTags" + return [entry[path] for entry in manifest] + + image_configs = [] + with tarfile.open(filename) as tarball: + # we know the manifest file is present or we wouldn't be this far + assert (manifest_file := get_manifest_file_from_tarball(tarball)) + manifest = json.load(manifest_file) + for config_path in get_config_path_from_manifest(manifest): + assert (config_file := get_config_file_from_tarball(tarball, config_path)) + config = json.load(config_file) + image_configs.append(config) + return image_configs + + +### ================================= +### Hook Implementation +### ================================= @surfactant.plugin.hookimpl def extract_file_info(sbom: SBOM, software: Software, filename: str, filetype: str) -> object: """Extract file information using Docker Scout if supported.""" - if dsManager.disable_docker_scout or not supports_file(filetype): + if not supports_file(filetype): return None - return extract_docker_info(filetype, filename) + + ## Conditionally extract tarball if gzipped + filename = gunzip_tarball(filename) if filetype == "DOCKER_GZIP" else filename + + ## Establish metadata object + metadata = {} + + ## Extract config files + metadata["dockerImageConfigs"] = extract_configs(filename) + + ## Use docker scout if available and enabled + if not dsManager.disable_docker_scout: + metadata["dockerSPDX"] = dsManager.run_docker_scout(filename) + + return metadata def extract_docker_info(filetype: str, filename: str) -> object: diff --git a/surfactant/infoextractors/docker_tarball_file.py b/surfactant/infoextractors/docker_tarball_file.py new file mode 100644 index 00000000..2ffa504f --- /dev/null +++ b/surfactant/infoextractors/docker_tarball_file.py @@ -0,0 +1,79 @@ +# Copyright 2024 Lawrence Livermore National Security, LLC +# see: ${repository}/LICENSE +# +# SPDX-License-Identifier: MIT + +import json +import tarfile +from pathlib import PurePosixPath +from typing import IO, Any, Union + +import surfactant.plugin +from surfactant.sbomtypes import SBOM, Software + + +def get_manifest_file_from_tarball(tarball: tarfile.TarFile) -> IO[bytes] | None: + return tarball.extractfile( + {tarinfo.name: tarinfo for tarinfo in tarball.getmembers()}["manifest.json"] + ) + + +def get_config_file_from_tarball(tarball: tarfile.TarFile, path: str) -> Union[IO[bytes], None]: + return tarball.extractfile({tarinfo.name: tarinfo for tarinfo in tarball.getmembers()}[path]) + + +def get_config_path_from_manifest(manifest: list[dict[str, Any]]) -> list[str]: + path = "Config" + return [entry[path] for entry in manifest] + + +def get_repo_tags_from_manifest(manifest: list[dict[str, Any]]) -> list[str]: + path = "RepoTags" + return [entry[path] for entry in manifest] + + +def portable_path_list(*paths: str): + """Convert paths to a portable format acknowledged by""" + return tuple(str(PurePosixPath(path_str)) for path_str in paths) + + +def supports_file(filename: str, filetype: str) -> bool: + EXPECTED_FILETYPE = "DOCKER_TAR" + + expected_members = portable_path_list( + "index.json", + "manifest.json", + "oci-layout", + "repositories", + "blobs/sha256", + ) + + if filetype != EXPECTED_FILETYPE: + return False + + with tarfile.open(filename) as this_tarfile: + found_members = portable_path_list(*[member.name for member in this_tarfile.getmembers()]) + + return all(expected_member in found_members for expected_member in expected_members) + + +@surfactant.plugin.hookimpl +def extract_file_info(sbom: SBOM, software: Software, filename: str, filetype: str) -> object: + if not supports_file(filename, filetype): + return None + return extract_image_info(filename) + + +def extract_image_info(filename: str): + """Return image configuration objects mapped by their paths.""" + root_key = "dockerImageConfigs" + image_info: dict[str, list[dict[str, Any]]] = {root_key: []} + with tarfile.open(filename) as tarball: + # we know the manifest file is present or we wouldn't be this far + assert (manifest_file := get_manifest_file_from_tarball(tarball)) + manifest = json.load(manifest_file) + for config_path in manifest.get_config_path_from_manifest(manifest): + assert (config_file := get_config_file_from_tarball(tarball, config_path)) + config = json.load(config_file) + image_info[root_key].append(config) + return image_info