Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ dependencies = [
"tomlkit==0.13.*",
"textual==3.*",
"requests>=2.32.3",
"rarfile==4.2.*",
# Pinned to specific version for potential breaking changes
"networkx>=2.6",
"python-msi==0.0.0a2"
]
"python-msi==0.0.0a2",
]

dynamic = ["version"]

[project.optional-dependencies]
macho = ["lief==0.16.6"]
java = ["javatools>=1.6,==1.*"]
extractcode = ["extractcode[full]>=31.0.0"]
test = ["pytest"]
dev = ["build", "pre-commit"]
docs = ["sphinx", "myst-parser"]
Expand Down
53 changes: 53 additions & 0 deletions surfactant/filetypeid/id_extractcode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright 2025 Lawrence Livermore National Security, LLC
# See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: MIT

from typing import Optional

from loguru import logger

import surfactant.plugin

try:
from extractcode import archive as ec_archive
from extractcode import sevenzip

EXTRACTCODE_AVAILABLE = True
# pylint: disable-next=broad-exception-caught
except Exception as e:
# Catch any import errors related to extractcode
logger.warning(f"extractcode library not available in file type identification: {e}")
EXTRACTCODE_AVAILABLE = False
ec_archive = None
sevenzip = None


@surfactant.plugin.hookimpl
def identify_file_type(filepath: str) -> Optional[str]:
if not EXTRACTCODE_AVAILABLE or ec_archive is None:
return None

try:
ec_handler = ec_archive.get_best_handler(filepath)
if ec_handler:
return f"EXTRACTCODE-{ec_handler.name}"
return None
except FileNotFoundError:
return None


@surfactant.plugin.hookimpl
def init_hook(command_name: Optional[str] = None) -> None:
if EXTRACTCODE_AVAILABLE:
WimHandler = ec_archive.Handler(
name="Microsoft wim",
filetypes=("Windows imaging (WIM) image"),
mimetypes=("application/x-ms-wim",),
extensions=(".wim",),
kind=ec_archive.package,
extractors=[sevenzip.extract],
strict=True,
)

ec_archive.archive_handlers.append(WimHandler)
2 changes: 1 addition & 1 deletion surfactant/filetypeid/id_magic.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,6 @@ def identify_file_type(filepath: str) -> Optional[str]:
if magic_bytes[:4] == b"\xed\xab\xee\xdb":
return "RPM Package"

return None
return None
except FileNotFoundError:
return None
118 changes: 118 additions & 0 deletions surfactant/infoextractors/extractcode_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Copyright 2025 Lawrence Livermore National Security, LLC
# See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: MIT

from queue import Queue
from typing import TYPE_CHECKING, Any, Dict, Optional

from loguru import logger

import surfactant.plugin
from surfactant import ContextEntry
from surfactant.filetypeid.id_extractcode import EXTRACTCODE_AVAILABLE
from surfactant.infoextractors.file_decompression import create_extraction
from surfactant.sbomtypes import SBOM, Software

if EXTRACTCODE_AVAILABLE or TYPE_CHECKING:
from extractcode import archive as ec_archive
else:
ec_archive = None

ADDITIONAL_HANDLERS = {
"Linux Kernel Image",
"MSCAB",
"ISCAB",
"DOCKER_GZIP",
"GZIP",
"BZIP2",
"XZ",
"DOCKER_TAR",
"TAR",
"RAR",
"ZIP",
"JAR",
"WAR",
"EAR",
"APK",
"IPA",
"MSIX",
"ZLIB",
"CPIO_BIN big",
"CPIO_BIN little",
"ZSTANDARD",
"ZSTANDARD_DICTIONARY",
"ISO_9660_CD",
"MACOS_DMG",
"RPM Package",
}


def get_handler(filename, filetype: str) -> Optional["ec_archive.Handler"]:
if not EXTRACTCODE_AVAILABLE or ec_archive is None:
return None

if not filetype:
return None

# Check if the filetype is an EXTRACTCODE handler
if filetype.startswith("EXTRACTCODE-"):
name = filetype[len("EXTRACTCODE-") :]
for handler in ec_archive.archive_handlers:
if handler.name == name:
return handler
logger.error(f"Unknown EXTRACTCODE handler: {name}")

# Additionally handle some more file types that we can already identify from id_magic
if filetype in ADDITIONAL_HANDLERS:
handler = ec_archive.get_best_handler(filename)
if not handler:
logger.warning(f"No handler found for {filetype} ({filename}).")
return handler
return None


# pylint: disable=too-many-positional-arguments
@surfactant.plugin.hookimpl
def extract_file_info(
sbom: SBOM,
software: Software,
filename: str,
filetype: str,
context_queue: "Queue[ContextEntry]",
current_context: Optional[ContextEntry],
) -> Optional[Dict[str, Any]]:
# Check if the file is compressed and get its format
handler = get_handler(filename, filetype)

if handler:
create_extraction(
filename,
context_queue,
current_context,
lambda f, t: decompress_to(f, t, handler),
)


def decompress_to(filename: str, output_folder: str, handler: "ec_archive.Handler") -> bool:
extractors = handler.extractors
extractor = None
if len(extractors) == 1:
extractor = extractors[0]
elif len(extractors) == 2:

def extract_twice(f: str, t: str) -> Any:
return ec_archive.extract_twice(f, t, extractors[0], extractors[1])

extractor = extract_twice
else:
logger.error(f"Unsupported number of extractors for {filename}: {len(extractors)}")
return False

logger.info(f"Extracting {filename} ({handler.name}) to {output_folder} using extractcode")
warnings = extractor(filename, output_folder)
if warnings:
for warning in warnings:
logger.warning(f"Warning while extracting {filename}: {warning}")

return True
44 changes: 6 additions & 38 deletions surfactant/infoextractors/file_decompression.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,18 @@
from queue import Queue
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union

import rarfile
from loguru import logger

import surfactant.plugin
from surfactant import ContextEntry
from surfactant.configmanager import ConfigManager
from surfactant.sbomtypes import SBOM, Software

# Global list to track temp dirs
GLOBAL_TEMP_DIRS_LIST = []

RAR_SUPPORT = {"enabled": True}


def supports_file(filetype: str) -> Optional[str]:
if filetype in {"TAR", "GZIP", "ZIP", "BZIP2", "XZ"} or (
filetype == "RAR" and RAR_SUPPORT["enabled"]
):
if filetype in {"TAR", "GZIP", "ZIP", "BZIP2", "XZ"}:
return filetype
return None

Expand Down Expand Up @@ -143,8 +137,6 @@ def decompress_to(filename: str, output_folder: str, compression_format: str) ->
)
# Since it doesn't seem to be a compressed tar file, try just decompressing the file
return decompress_file(filename, output_folder, compression_format)
elif compression_format == "RAR":
decompress_rar_file(filename, output_folder)
else:
raise ValueError(f"Unsupported compression format: {compression_format}")
return True
Expand Down Expand Up @@ -222,38 +214,14 @@ def extract_tar_file(
logger.info(f"Extracted TAR contents to {output_folder}")


def decompress_rar_file(filename: str, output_folder: str):
try:
rf = rarfile.RarFile(filename)
rf.extractall(path=output_folder)
except rarfile.Error as e:
logger.error(f"Error extracting rar file: {e}")
logger.info(f"Extracted RAR contents to {output_folder}")


def delete_temp_dirs():
for temp_dir in GLOBAL_TEMP_DIRS_LIST:
if temp_dir and os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
logger.info(f"Cleaned up temporary directory: {temp_dir}")


@surfactant.plugin.hookimpl
def init_hook(command_name: Optional[str] = None) -> None:
RAR_SUPPORT["enabled"] = False

should_enable_rar = ConfigManager().get("rar", "enabled", True)
if should_enable_rar:
try:
result = rarfile.tool_setup()
if result.setup["open_cmd"][0] in ("UNRAR_TOOL", "UNAR_TOOL"):
RAR_SUPPORT["enabled"] = True
return
except rarfile.RarCannotExec:
pass
logger.warning(
"Install 'Unrar' or 'unar' tool for RAR archive decompression. RAR decompression disabled until installed."
)
try:
shutil.rmtree(temp_dir)
logger.info(f"Cleaned up temporary directory: {temp_dir}")
except PermissionError as e:
logger.error(f"Permission error while deleting {temp_dir}: {e}")


# Register exit handler
Expand Down
5 changes: 4 additions & 1 deletion surfactant/plugin/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
def _register_plugins(pm: pluggy.PluginManager) -> None:
# pylint: disable=import-outside-toplevel
# don't want all these imports as part of the file-level scope
from surfactant.filetypeid import id_extension, id_hex, id_magic
from surfactant.filetypeid import id_extension, id_extractcode, id_hex, id_magic
from surfactant.infoextractors import (
a_out_file,
coff_file,
docker_image,
elf_file,
extractcode_file,
file_decompression,
java_file,
js_file,
Expand Down Expand Up @@ -48,6 +49,7 @@ def _register_plugins(pm: pluggy.PluginManager) -> None:
id_magic,
id_hex,
id_extension,
id_extractcode,
a_out_file,
coff_file,
docker_image,
Expand All @@ -69,6 +71,7 @@ def _register_plugins(pm: pluggy.PluginManager) -> None:
cytrics_reader,
native_lib_file,
file_decompression,
extractcode_file,
)
for plugin in internal_plugins:
pm.register(plugin)
Expand Down
Loading