diff --git a/pyproject.toml b/pyproject.toml index 08d32a222..cb6b78ab4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,17 +56,17 @@ dependencies = [ "tomlkit==0.13.*", "textual==3.*", "requests>=2.32.3", - "rarfile==4.2.*", # Pinned to specific version for potential breaking changes "networkx>=2.6", - "python-msi==0.0.0a2" - ] + "python-msi==0.0.0a2", +] dynamic = ["version"] [project.optional-dependencies] macho = ["lief==0.16.6"] java = ["javatools>=1.6,==1.*"] +extractcode = ["extractcode[full]>=31.0.0"] test = ["pytest"] dev = ["build", "pre-commit"] docs = ["sphinx", "myst-parser"] diff --git a/surfactant/filetypeid/id_extractcode.py b/surfactant/filetypeid/id_extractcode.py new file mode 100644 index 000000000..2d32cca99 --- /dev/null +++ b/surfactant/filetypeid/id_extractcode.py @@ -0,0 +1,53 @@ +# Copyright 2025 Lawrence Livermore National Security, LLC +# See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT + +from typing import Optional + +from loguru import logger + +import surfactant.plugin + +try: + from extractcode import archive as ec_archive + from extractcode import sevenzip + + EXTRACTCODE_AVAILABLE = True +# pylint: disable-next=broad-exception-caught +except Exception as e: + # Catch any import errors related to extractcode + logger.warning(f"extractcode library not available in file type identification: {e}") + EXTRACTCODE_AVAILABLE = False + ec_archive = None + sevenzip = None + + +@surfactant.plugin.hookimpl +def identify_file_type(filepath: str) -> Optional[str]: + if not EXTRACTCODE_AVAILABLE or ec_archive is None: + return None + + try: + ec_handler = ec_archive.get_best_handler(filepath) + if ec_handler: + return f"EXTRACTCODE-{ec_handler.name}" + return None + except FileNotFoundError: + return None + + +@surfactant.plugin.hookimpl +def init_hook(command_name: Optional[str] = None) -> None: + if EXTRACTCODE_AVAILABLE: + WimHandler = ec_archive.Handler( + name="Microsoft wim", + filetypes=("Windows imaging (WIM) image"), + mimetypes=("application/x-ms-wim",), + extensions=(".wim",), + kind=ec_archive.package, + extractors=[sevenzip.extract], + strict=True, + ) + + ec_archive.archive_handlers.append(WimHandler) diff --git a/surfactant/filetypeid/id_magic.py b/surfactant/filetypeid/id_magic.py index f8eb896e8..85b21ae8b 100755 --- a/surfactant/filetypeid/id_magic.py +++ b/surfactant/filetypeid/id_magic.py @@ -252,6 +252,6 @@ def identify_file_type(filepath: str) -> Optional[str]: if magic_bytes[:4] == b"\xed\xab\xee\xdb": return "RPM Package" - return None + return None except FileNotFoundError: return None diff --git a/surfactant/infoextractors/extractcode_file.py b/surfactant/infoextractors/extractcode_file.py new file mode 100644 index 000000000..0f5b90258 --- /dev/null +++ b/surfactant/infoextractors/extractcode_file.py @@ -0,0 +1,118 @@ +# Copyright 2025 Lawrence Livermore National Security, LLC +# See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT + +from queue import Queue +from typing import TYPE_CHECKING, Any, Dict, Optional + +from loguru import logger + +import surfactant.plugin +from surfactant import ContextEntry +from surfactant.filetypeid.id_extractcode import EXTRACTCODE_AVAILABLE +from surfactant.infoextractors.file_decompression import create_extraction +from surfactant.sbomtypes import SBOM, Software + +if EXTRACTCODE_AVAILABLE or TYPE_CHECKING: + from extractcode import archive as ec_archive +else: + ec_archive = None + +ADDITIONAL_HANDLERS = { + "Linux Kernel Image", + "MSCAB", + "ISCAB", + "DOCKER_GZIP", + "GZIP", + "BZIP2", + "XZ", + "DOCKER_TAR", + "TAR", + "RAR", + "ZIP", + "JAR", + "WAR", + "EAR", + "APK", + "IPA", + "MSIX", + "ZLIB", + "CPIO_BIN big", + "CPIO_BIN little", + "ZSTANDARD", + "ZSTANDARD_DICTIONARY", + "ISO_9660_CD", + "MACOS_DMG", + "RPM Package", +} + + +def get_handler(filename, filetype: str) -> Optional["ec_archive.Handler"]: + if not EXTRACTCODE_AVAILABLE or ec_archive is None: + return None + + if not filetype: + return None + + # Check if the filetype is an EXTRACTCODE handler + if filetype.startswith("EXTRACTCODE-"): + name = filetype[len("EXTRACTCODE-") :] + for handler in ec_archive.archive_handlers: + if handler.name == name: + return handler + logger.error(f"Unknown EXTRACTCODE handler: {name}") + + # Additionally handle some more file types that we can already identify from id_magic + if filetype in ADDITIONAL_HANDLERS: + handler = ec_archive.get_best_handler(filename) + if not handler: + logger.warning(f"No handler found for {filetype} ({filename}).") + return handler + return None + + +# pylint: disable=too-many-positional-arguments +@surfactant.plugin.hookimpl +def extract_file_info( + sbom: SBOM, + software: Software, + filename: str, + filetype: str, + context_queue: "Queue[ContextEntry]", + current_context: Optional[ContextEntry], +) -> Optional[Dict[str, Any]]: + # Check if the file is compressed and get its format + handler = get_handler(filename, filetype) + + if handler: + create_extraction( + filename, + context_queue, + current_context, + lambda f, t: decompress_to(f, t, handler), + ) + + +def decompress_to(filename: str, output_folder: str, handler: "ec_archive.Handler") -> bool: + extractors = handler.extractors + extractor = None + if len(extractors) == 1: + extractor = extractors[0] + elif len(extractors) == 2: + + def extract_twice(f: str, t: str) -> Any: + return ec_archive.extract_twice(f, t, extractors[0], extractors[1]) + + extractor = extract_twice + else: + logger.error(f"Unsupported number of extractors for {filename}: {len(extractors)}") + return False + + logger.info(f"Extracting {filename} ({handler.name}) to {output_folder} using extractcode") + warnings = extractor(filename, output_folder) + if warnings: + for warning in warnings: + logger.warning(f"Warning while extracting {filename}: {warning}") + + return True diff --git a/surfactant/infoextractors/file_decompression.py b/surfactant/infoextractors/file_decompression.py index 4695ebaa1..fd68427f5 100644 --- a/surfactant/infoextractors/file_decompression.py +++ b/surfactant/infoextractors/file_decompression.py @@ -19,24 +19,18 @@ from queue import Queue from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union -import rarfile from loguru import logger import surfactant.plugin from surfactant import ContextEntry -from surfactant.configmanager import ConfigManager from surfactant.sbomtypes import SBOM, Software # Global list to track temp dirs GLOBAL_TEMP_DIRS_LIST = [] -RAR_SUPPORT = {"enabled": True} - def supports_file(filetype: str) -> Optional[str]: - if filetype in {"TAR", "GZIP", "ZIP", "BZIP2", "XZ"} or ( - filetype == "RAR" and RAR_SUPPORT["enabled"] - ): + if filetype in {"TAR", "GZIP", "ZIP", "BZIP2", "XZ"}: return filetype return None @@ -143,8 +137,6 @@ def decompress_to(filename: str, output_folder: str, compression_format: str) -> ) # Since it doesn't seem to be a compressed tar file, try just decompressing the file return decompress_file(filename, output_folder, compression_format) - elif compression_format == "RAR": - decompress_rar_file(filename, output_folder) else: raise ValueError(f"Unsupported compression format: {compression_format}") return True @@ -222,38 +214,14 @@ def extract_tar_file( logger.info(f"Extracted TAR contents to {output_folder}") -def decompress_rar_file(filename: str, output_folder: str): - try: - rf = rarfile.RarFile(filename) - rf.extractall(path=output_folder) - except rarfile.Error as e: - logger.error(f"Error extracting rar file: {e}") - logger.info(f"Extracted RAR contents to {output_folder}") - - def delete_temp_dirs(): for temp_dir in GLOBAL_TEMP_DIRS_LIST: if temp_dir and os.path.exists(temp_dir): - shutil.rmtree(temp_dir) - logger.info(f"Cleaned up temporary directory: {temp_dir}") - - -@surfactant.plugin.hookimpl -def init_hook(command_name: Optional[str] = None) -> None: - RAR_SUPPORT["enabled"] = False - - should_enable_rar = ConfigManager().get("rar", "enabled", True) - if should_enable_rar: - try: - result = rarfile.tool_setup() - if result.setup["open_cmd"][0] in ("UNRAR_TOOL", "UNAR_TOOL"): - RAR_SUPPORT["enabled"] = True - return - except rarfile.RarCannotExec: - pass - logger.warning( - "Install 'Unrar' or 'unar' tool for RAR archive decompression. RAR decompression disabled until installed." - ) + try: + shutil.rmtree(temp_dir) + logger.info(f"Cleaned up temporary directory: {temp_dir}") + except PermissionError as e: + logger.error(f"Permission error while deleting {temp_dir}: {e}") # Register exit handler diff --git a/surfactant/plugin/manager.py b/surfactant/plugin/manager.py index ff7fb1a92..697167912 100644 --- a/surfactant/plugin/manager.py +++ b/surfactant/plugin/manager.py @@ -15,12 +15,13 @@ def _register_plugins(pm: pluggy.PluginManager) -> None: # pylint: disable=import-outside-toplevel # don't want all these imports as part of the file-level scope - from surfactant.filetypeid import id_extension, id_hex, id_magic + from surfactant.filetypeid import id_extension, id_extractcode, id_hex, id_magic from surfactant.infoextractors import ( a_out_file, coff_file, docker_image, elf_file, + extractcode_file, file_decompression, java_file, js_file, @@ -48,6 +49,7 @@ def _register_plugins(pm: pluggy.PluginManager) -> None: id_magic, id_hex, id_extension, + id_extractcode, a_out_file, coff_file, docker_image, @@ -69,6 +71,7 @@ def _register_plugins(pm: pluggy.PluginManager) -> None: cytrics_reader, native_lib_file, file_decompression, + extractcode_file, ) for plugin in internal_plugins: pm.register(plugin)