Skip to content
Merged
11 changes: 11 additions & 0 deletions docs/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,14 @@ Surfactant supports overriding database source URLs via the sources section in t
```bash
surfactant config sources.js_library_patterns.retirejs https://new-url.com
```

## decompression

- extract_dir
- Directory where files are extracted when decompressing files. Default is the system's temporary directory.
- extract_prefix
- Prefix for extract directories created when decompressing files. Default is `surfactant-temp`.
- cache_extractions
- Controls whether to cache decompressed/extracted files. If Surfactant fails to exit successfully, the cache will be used to prevent re-extractions. If set to `true`, decompressed files will be cached in the `extract_dir` directory. Default is `true`.
- persist_extractions
- Controls whether to persist decompressed files after Surfactant exits successfully. If set to `true`, decompressed files will be kept in the `extract_dir` directory. Default is `false`.
154 changes: 115 additions & 39 deletions surfactant/infoextractors/file_decompression.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import atexit
import bz2
import gzip
import json
import lzma
import os
import pathlib
Expand All @@ -26,11 +27,20 @@
from surfactant import ContextEntry
from surfactant.configmanager import ConfigManager
from surfactant.sbomtypes import SBOM, Software
from surfactant.utils import exit_hook

# Global list to track temp dirs
GLOBAL_TEMP_DIRS_LIST = []
EXTRACT_DIR = pathlib.Path(
ConfigManager().get("decompression", "extract_dir", tempfile.gettempdir())
)
EXTRACT_DIRS_PREFIX = ConfigManager().get("decompression", "extract_prefix", "surfactant-temp")
EXTRACT_DIR.mkdir(parents=True, exist_ok=True)

RAR_SUPPORT = {"enabled": True}
# Global list to track extracted dirs
# Hash -> Path to extracted directory & Result of array of 2-tuples (install_prefix, extract_path)
EXTRACT_DIRS = {}
EXTRACT_DIRS_PATH = EXTRACT_DIR / ".surfactant_extracted_dirs.json"

RAR_SUPPORT = {"enabled": False}


def supports_file(filetype: list[str]) -> Optional[list[str]]:
Expand Down Expand Up @@ -67,27 +77,36 @@ def extract_file_info(
for fmt in compression_format:
create_extraction(
filename,
software,
context_queue,
current_context,
lambda f, t, fmt=fmt: decompress_to(f, t, fmt),
lambda f, t, format=fmt: decompress_to(f, t, format),
)


# decompress takes a filename and an output folder, and decompresses the file into that folder.
# Returning a boolean indicates an attempt (True) or refusal (False) to decompress.
# Returning a list of 2-tuples indicates that different ContextEntries should be created. The
# first element is the install prefix (or None if not applicable), and the second is the path
# to the extracted folder that should be under the install prefix.
def create_extraction(
filename: str,
software: Software,
context_queue: "Queue[ContextEntry]",
current_context: Optional[ContextEntry],
decompress: Callable[[str, str], Union[bool, List[Tuple[str, str]]]],
):
"""Create extraction context entries for decompressed archive files.

Args:
filename (str): Path to the archive file to be extracted
software (Software): Software object to associated with the file; used to skip extraction if already processed
context_queue (Queue[ContextEntry]): Queue to add new context entries for extracted content
current_context (Optional[ContextEntry]): Current context entry being processed
decompress (Callable[[str, str], Union[bool, List[Tuple[str, str]]]]): Function that performs
the actual decompression. Takes filename and output folder, returns True/False for success
or a list of tuples containing (install_prefix, extract_path) pairs for multiple entries
"""

install_prefix = ""

# Check that archive key exists and filename is same as archive file
if current_context.archive and current_context.archive == filename:
if current_context and current_context.archive and current_context.archive == filename:
if current_context.extractPaths is not None and current_context.extractPaths != []:
logger.info(
f"Already extracted, skipping extraction for archive: {current_context.archive}"
Expand All @@ -97,19 +116,32 @@ def create_extraction(
# Inherit the context entry install prefix for the extracted files
install_prefix = current_context.installPrefix

# Create a temporary directory for extraction
temp_folder = create_temp_dir()
# Decompress the file
entries = decompress(filename, temp_folder)
if (
software.sha256 in EXTRACT_DIRS
and EXTRACT_DIRS[software.sha256]["result"]
and os.path.exists(EXTRACT_DIRS[software.sha256]["path"])
):
entries = EXTRACT_DIRS[software.sha256]["result"]
logger.info(f"Using cached extraction entries for {filename}")
else:
# Create a temporary directory for extraction
temp_folder = create_extract_dir()
EXTRACT_DIRS[software.sha256] = {"path": temp_folder, "result": None}

# Simple case where the decompressor doesn't need multiple entries
if entries is True:
entries = [(None, temp_folder)]
# Decompress the file
entries = decompress(filename, temp_folder)

# If False or an empty list
if not entries:
logger.error(f"Failed to decompress {filename}. No entries created.")
return
# Simple case where the decompressor doesn't need multiple entries
if entries is True:
entries = [(None, temp_folder)]

# If False or an empty list
if not entries:
logger.error(f"Failed to decompress {filename}. No entries created.")
return

# Store the result in the global EXTRACT_DIRS
EXTRACT_DIRS[software.sha256]["result"] = entries

for entry_prefix, extract_path in entries:
# Merges our install prefix with the entry's install prefix (where applicable)
Expand Down Expand Up @@ -160,15 +192,6 @@ def decompress_to(filename: str, output_folder: str, compression_format: str) ->
return True


def create_temp_dir():
# Create a temporary directory
temp_dir = tempfile.mkdtemp(prefix="surfactant-temp")

# Add to global list of temp dirs to facilitate easier clean up at the end
GLOBAL_TEMP_DIRS_LIST.append(temp_dir)
return temp_dir


def decompress_zip_file(filename: str, output_folder: str):
try:
with zipfile.ZipFile(filename, "r") as f:
Expand Down Expand Up @@ -241,15 +264,58 @@ def decompress_rar_file(filename: str, output_folder: str):
logger.info(f"Extracted RAR contents to {output_folder}")


def delete_temp_dirs():
for temp_dir in GLOBAL_TEMP_DIRS_LIST:
if temp_dir and os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
logger.info(f"Cleaned up temporary directory: {temp_dir}")
def setup_extracted_dirs():
"""Get the list of directories where files have been extracted."""
should_cache_extractions = ConfigManager().get("decompression", "cache_extractions", True)
if should_cache_extractions and EXTRACT_DIRS_PATH.exists():
try:
with open(EXTRACT_DIRS_PATH, "r") as f:
GLOBAL_EXTRACT_DIRS = json.load(f)
if not isinstance(GLOBAL_EXTRACT_DIRS, dict):
logger.error(f"Invalid format in {EXTRACT_DIRS_PATH}. Expected a dictionary.")
GLOBAL_EXTRACT_DIRS = {}
except json.JSONDecodeError:
logger.error(f"Failed to read extracted directories from {EXTRACT_DIRS_PATH}.")
GLOBAL_EXTRACT_DIRS = {}
else:
GLOBAL_EXTRACT_DIRS = {}


@surfactant.plugin.hookimpl
def init_hook(command_name: Optional[str] = None) -> None:
def store_extracted_dirs():
"""Store the current extracted directories to a JSON file."""
should_cache_extractions = ConfigManager().get("decompression", "cache_extractions", True)
if should_cache_extractions:
try:
with open(EXTRACT_DIRS_PATH, "w") as f:
json.dump(EXTRACT_DIRS, f)
except IOError as e:
logger.error(f"Failed to write extracted directories to {EXTRACT_DIRS_PATH}: {e}")


def create_extract_dir():
return tempfile.mkdtemp(prefix=EXTRACT_DIRS_PREFIX, dir=EXTRACT_DIR)


def delete_extract_dirs():
exited_gracefully = exit_hook.has_exited_gracefully()
should_cache_extractions = ConfigManager().get("decompression", "cache_extractions", True)
should_persist_extractions = should_cache_extractions and ConfigManager().get(
"decompression", "persist_extractions", False
)
keys = list(EXTRACT_DIRS.keys())
for key in keys:
# Extraction was in progress or failed; we have no reason to keep it
extraction_failed = not EXTRACT_DIRS[key]["result"]
should_delete = extraction_failed or (not should_persist_extractions and exited_gracefully)
if not should_cache_extractions or should_delete:
temp_dir = EXTRACT_DIRS[key]["path"]
if temp_dir and os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
logger.info(f"Cleaned up temporary directory: {temp_dir}")
del EXTRACT_DIRS[key]


def setup_rar_support():
RAR_SUPPORT["enabled"] = False

should_enable_rar = ConfigManager().get("rar", "enabled", True)
Expand All @@ -266,5 +332,15 @@ def init_hook(command_name: Optional[str] = None) -> None:
)


# Register exit handler
atexit.register(delete_temp_dirs)
@surfactant.plugin.hookimpl
def init_hook(command_name: Optional[str] = None):
"""Initialize the file decompression plugin."""
setup_extracted_dirs()
setup_rar_support()


@atexit.register
def cleanup_hook():
"""Clean up temporary directories and store extraction cache on exit."""
delete_extract_dirs()
store_extracted_dirs()
4 changes: 2 additions & 2 deletions surfactant/infoextractors/ole_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ def extract_file_info(

if ole_info["ole"].get("clsid_type") == "MSI":
file_decompression.create_extraction(
filename, context_queue, current_context, extract_msi
filename, software, context_queue, current_context, extract_msi
)

return ole_info


def extract_ole_info(filename: str) -> object:
def extract_ole_info(filename: str) -> Dict[str, Any]:
file_details: Dict[str, Any] = {}

with olefile.OleFileIO(filename) as ole:
Expand Down
49 changes: 49 additions & 0 deletions surfactant/utils/exit_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import sys
from typing import Any, Callable, Optional


# https://stackoverflow.com/a/9741784
class ExitHooks:
def __init__(self):
self.exit_code: Optional[int] = None
self.exception: Optional[Exception] = None
self._orig_exit: Optional[Callable[[int], None]] = None

def hook(self):
"""Install the exit and exception hooks."""
self._orig_exit = sys.exit
sys.exit = self.exit
sys.excepthook = self.exc_handler

def exit(self, code: int = 0):
"""Custom exit handler that captures the exit code."""
self.exit_code = code
if self._orig_exit is not None:
self._orig_exit(code)

def exc_handler(self, exc_type: type, exc: Exception, *args: Any):
"""Custom exception handler that captures exceptions."""
self.exception = exc


_HOOKS = ExitHooks()
_HOOKS.hook()


def get_exit_code() -> Optional[int]:
"""Get the exit code from the last program exit."""
return _HOOKS.exit_code


def get_exception() -> Optional[Exception]:
"""Get the exception from the last unhandled exception."""
return _HOOKS.exception


def has_exited_gracefully() -> bool:
"""
Returns True if the program exited gracefully (without an exception).
"""
if _HOOKS.exit_code is not None and _HOOKS.exit_code != 0:
return False
return _HOOKS.exception is None
Loading