Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 97 additions & 10 deletions surfactant/infoextractors/docker_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,54 @@
import gzip
import json
import subprocess
import tarfile
import tempfile
from typing import Optional
from typing import IO, Any, Optional

from loguru import logger

import surfactant.plugin
from surfactant.configmanager import ConfigManager
from surfactant.sbomtypes import SBOM, Software

### ===============================
### Utility Predicates
### ===============================


def is_oci_archive(filename: str) -> bool:
"""Return True if given file is a tarball
roughly matching the OCI specification"""

with tarfile.open(filename) as this_tarfile: # oci-layout only path ensured
return "oci-layout" in this_tarfile.getmembers()


def supports_file(filetype: str) -> bool:
"""Check if the file type is supported."""
return filetype in ("DOCKER_TAR", "DOCKER_GZIP")


### ===============================
### Archive Utilities
### ===============================


def gunzip_tarball(filename: str) -> object:
"""Unzip a gzipped tarball to a temporary file
and return the name of the corresponding file."""
with open(filename, "rb") as gzip_in:
gzip_data = gzip_in.read()
with tempfile.NamedTemporaryFile() as gzip_out:
gzip_out.write(gzip.decompress(gzip_data))
gzip_out.flush() # Ensure data is written before reading
return gzip_out.name


### ===============================
### Extraction Procedures
### ===============================


class DockerScoutManager:
def __init__(self) -> None:
Expand Down Expand Up @@ -45,7 +84,7 @@ def check_docker_scout_installed(self) -> None:
"You can also disable this check by running 'surfactant config docker.enable_docker_scout false'."
)

def run_docker_scout(self, filename: str) -> object:
def run_docker_scout(self, filename: str) -> Optional[object]:
"""Run Docker Scout on the given file and return the results."""
if self.disable_docker_scout:
return {} # Do nothing if Docker Scout is disabled by config
Expand All @@ -58,29 +97,77 @@ def run_docker_scout(self, filename: str) -> object:
)
if result.returncode != 0:
logger.warning(f"Running Docker Scout on {filename} failed")
return {}
return None
spdx_out = json.loads(result.stdout)
return {"dockerSPDX": spdx_out}
return spdx_out
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Docker Scout output for {filename}: {e}")
return {}
return None


# Initialize DockerScoutManager to check installation status
dsManager = DockerScoutManager()


def supports_file(filetype: str) -> bool:
"""Check if the file type is supported."""
return filetype in ("DOCKER_TAR", "DOCKER_GZIP")
def extract_configs(filename: str):
"""Return image configuration objects mapped by their paths."""

def get_manifest_file_from_tarball(tarball: tarfile.TarFile) -> IO[bytes] | None:
return tarball.extractfile(
{tarinfo.name: tarinfo for tarinfo in tarball.getmembers()}["manifest.json"]
)

def get_config_file_from_tarball(tarball: tarfile.TarFile, path: str) -> Optional[IO[bytes]]:
return tarball.extractfile(
{tarinfo.name: tarinfo for tarinfo in tarball.getmembers()}[path]
)

def get_config_path_from_manifest(manifest: list[dict[str, Any]]) -> list[str]:
path = "Config"
return [entry[path] for entry in manifest]

# currently unused
def get_repo_tags_from_manifest(manifest: list[dict[str, Any]]) -> list[str]:
path = "RepoTags"
return [entry[path] for entry in manifest]

image_configs = []
with tarfile.open(filename) as tarball:
# we know the manifest file is present or we wouldn't be this far
assert (manifest_file := get_manifest_file_from_tarball(tarball))
manifest = json.load(manifest_file)
for config_path in get_config_path_from_manifest(manifest):
assert (config_file := get_config_file_from_tarball(tarball, config_path))
config = json.load(config_file)
image_configs.append(config)
return image_configs


### =================================
### Hook Implementation
### =================================


@surfactant.plugin.hookimpl
def extract_file_info(sbom: SBOM, software: Software, filename: str, filetype: str) -> object:
"""Extract file information using Docker Scout if supported."""
if dsManager.disable_docker_scout or not supports_file(filetype):
if not supports_file(filetype):
return None
return extract_docker_info(filetype, filename)

## Conditionally extract tarball if gzipped
filename = gunzip_tarball(filename) if filetype == "DOCKER_GZIP" else filename

## Establish metadata object
metadata = {}

## Extract config files
metadata["dockerImageConfigs"] = extract_configs(filename)

## Use docker scout if available and enabled
if not dsManager.disable_docker_scout:
metadata["dockerSPDX"] = dsManager.run_docker_scout(filename)

return metadata


def extract_docker_info(filetype: str, filename: str) -> object:
Expand Down
79 changes: 79 additions & 0 deletions surfactant/infoextractors/docker_tarball_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2024 Lawrence Livermore National Security, LLC
# see: ${repository}/LICENSE
#
# SPDX-License-Identifier: MIT

import json
import tarfile
from pathlib import PurePosixPath
from typing import IO, Any, Union

import surfactant.plugin
from surfactant.sbomtypes import SBOM, Software


def get_manifest_file_from_tarball(tarball: tarfile.TarFile) -> IO[bytes] | None:
return tarball.extractfile(
{tarinfo.name: tarinfo for tarinfo in tarball.getmembers()}["manifest.json"]
)


def get_config_file_from_tarball(tarball: tarfile.TarFile, path: str) -> Union[IO[bytes], None]:
return tarball.extractfile({tarinfo.name: tarinfo for tarinfo in tarball.getmembers()}[path])


def get_config_path_from_manifest(manifest: list[dict[str, Any]]) -> list[str]:
path = "Config"
return [entry[path] for entry in manifest]


def get_repo_tags_from_manifest(manifest: list[dict[str, Any]]) -> list[str]:
path = "RepoTags"
return [entry[path] for entry in manifest]


def portable_path_list(*paths: str):
"""Convert paths to a portable format acknowledged by"""
return tuple(str(PurePosixPath(path_str)) for path_str in paths)


def supports_file(filename: str, filetype: str) -> bool:
EXPECTED_FILETYPE = "DOCKER_TAR"

expected_members = portable_path_list(
"index.json",
"manifest.json",
"oci-layout",
"repositories",
"blobs/sha256",
)

if filetype != EXPECTED_FILETYPE:
return False

with tarfile.open(filename) as this_tarfile:
found_members = portable_path_list(*[member.name for member in this_tarfile.getmembers()])

return all(expected_member in found_members for expected_member in expected_members)


@surfactant.plugin.hookimpl
def extract_file_info(sbom: SBOM, software: Software, filename: str, filetype: str) -> object:
if not supports_file(filename, filetype):
return None
return extract_image_info(filename)


def extract_image_info(filename: str):
"""Return image configuration objects mapped by their paths."""
root_key = "dockerImageConfigs"
image_info: dict[str, list[dict[str, Any]]] = {root_key: []}
with tarfile.open(filename) as tarball:
# we know the manifest file is present or we wouldn't be this far
assert (manifest_file := get_manifest_file_from_tarball(tarball))
manifest = json.load(manifest_file)
for config_path in manifest.get_config_path_from_manifest(manifest):
assert (config_file := get_config_file_from_tarball(tarball, config_path))
config = json.load(config_file)
image_info[root_key].append(config)
return image_info
Loading