Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions GHIDRA12_FIXES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Cthaeh — Ghidra 12 Compatibility Fixes

## Problem

Running `run_triage.py` against any driver produced `FAILED (no triage output)` for every driver. Root cause: two separate incompatibilities with Ghidra 12.

---

## Fix 1: Use `pyghidraRun` instead of `analyzeHeadless` (`run_triage.py`)

### Why

Ghidra 12 removed built-in Python (Jython) from `analyzeHeadless`. Python scripts now require PyGhidra, which is launched via `pyghidraRun`. Running a `.py` script through the old `analyzeHeadless` binary produces:

```
GhidraScriptLoadException: Ghidra was not started with PyGhidra. Python is not available
```

### What changed

**`run_ghidra_analysis()`** — replaced `analyzeHeadless`/`analyzeHeadless.bat` with `pyghidraRun`/`pyghidraRun.bat`, and added `--headless` as the first argument to the command (required by `pyghidraRun` to enable headless mode). The Ghidra install dir is **not** passed as an extra argument — `pyghidraRun` is a shell script that already knows its own install location.

Before:
```python
if sys.platform == "win32":
headless = os.path.join(ghidra_path, "support", "analyzeHeadless.bat")
else:
headless = os.path.join(ghidra_path, "support", "analyzeHeadless")

cmd = [headless, project_dir, f"triage_{driver_name}", "-import", ...]
```

After:
```python
if sys.platform == "win32":
headless = os.path.join(ghidra_path, "support", "pyghidraRun.bat")
else:
headless = os.path.join(ghidra_path, "support", "pyghidraRun")

cmd = [headless, "--headless", project_dir, f"triage_{driver_name}", "-import", ...]
```

The same change was applied in **`main()`** where the Ghidra path is validated on startup.

---

## Fix 2: Replace `DefinedDataIterator.definedStrings()` (`driver_triage.py`)

### Why

`DefinedDataIterator.definedStrings()` was removed from the Ghidra 12 API. Calling it raised:

```
AttributeError: type object 'ghidra.program.util.DefinedDataIterator' has no attribute 'definedStrings'
```

### What changed

**`get_strings()`** — replaced the removed API with an equivalent using `Listing.getDefinedData()`, filtering results by mnemonic string (the same method used internally by Ghidra 12's own example scripts).

Before:
```python
from ghidra.program.util import DefinedDataIterator

def get_strings(program):
strings = []
for data in DefinedDataIterator.definedStrings(program):
val = data.getDefaultValueRepresentation()
if val:
strings.append(val.strip('"').strip("'"))
return strings
```

After:
```python
_STRING_MNEMONICS = {"ds", "unicode", "p_unicode", "p_string", "p_string255", "mbcs"}

def get_strings(program):
strings = []
listing = program.getListing()
data_iter = listing.getDefinedData(program.getMinAddress(), True)
while data_iter.hasNext():
data = data_iter.next()
if data.getMnemonicString() in _STRING_MNEMONICS:
val = data.getDefaultValueRepresentation()
if val:
strings.append(val.strip('"').strip("'"))
return strings
```

The `DefinedDataIterator` import was also removed as it is no longer used.
34 changes: 11 additions & 23 deletions driver_triage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

# Ghidra imports (available in Ghidra scripting environment)
from ghidra.program.model.symbol import SourceType
from ghidra.program.util import DefinedDataIterator


# --- Scoring Weights Configuration ---
Expand Down Expand Up @@ -472,13 +471,19 @@ def get_import_dlls(program):
return dlls


_STRING_MNEMONICS = {"ds", "unicode", "p_unicode", "p_string", "p_string255", "mbcs"}

def get_strings(program):
"""Get all defined strings in the binary."""
strings = []
for data in DefinedDataIterator.definedStrings(program):
val = data.getDefaultValueRepresentation()
if val:
strings.append(val.strip('"').strip("'"))
listing = program.getListing()
data_iter = listing.getDefinedData(program.getMinAddress(), True)
while data_iter.hasNext():
data = data_iter.next()
if data.getMnemonicString() in _STRING_MNEMONICS:
val = data.getDefaultValueRepresentation()
if val:
strings.append(val.strip('"').strip("'"))
return strings


Expand Down Expand Up @@ -3202,24 +3207,7 @@ def run():
driver_name = driver_info.get("name", "")

# Check known FP / already-investigated list
# Supports both old format ("driver.sys": "reason string")
# and new format ("driver.sys": {"reason": "...", "version": "1.2.3"})
skip_entry = INVESTIGATED.get(driver_name)
skip_reason = None
if skip_entry:
if isinstance(skip_entry, str):
# Old format: always skip
skip_reason = skip_entry
elif isinstance(skip_entry, dict):
entry_version = skip_entry.get("version")
driver_version = driver_info.get("version", "")
if entry_version and driver_version and entry_version != driver_version:
# Version mismatch: driver was updated, re-scan it
skip_reason = None
print("investigated.json: %s version changed (%s -> %s), re-scanning" % (
driver_name, entry_version, driver_version))
else:
skip_reason = skip_entry.get("reason", "investigated")
skip_reason = INVESTIGATED.get(driver_name)
if skip_reason:
result = {
"driver": driver_info,
Expand Down
118 changes: 30 additions & 88 deletions run_triage.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,43 +30,14 @@
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path

try:
import yaml
except ImportError:
yaml = None


def _load_thresholds():
"""Load scoring thresholds from scoring_rules.yaml (single source of truth).
Falls back to hardcoded defaults if YAML unavailable."""
defaults = {"CRITICAL": 250, "HIGH": 150, "MEDIUM": 75, "LOW": 30}
if yaml is None:
return defaults
# Search: same dir as this script, then cwd
candidates = [
os.path.join(os.path.dirname(os.path.abspath(__file__)), "scoring_rules.yaml"),
os.path.join(os.getcwd(), "scoring_rules.yaml"),
]
for path in candidates:
if os.path.exists(path):
try:
with open(path, "r") as f:
data = yaml.safe_load(f)
thresholds = data.get("thresholds", {})
if thresholds:
return {
"CRITICAL": thresholds.get("CRITICAL", defaults["CRITICAL"]),
"HIGH": thresholds.get("HIGH", defaults["HIGH"]),
"MEDIUM": thresholds.get("MEDIUM", defaults["MEDIUM"]),
"LOW": thresholds.get("LOW", defaults["LOW"]),
}
except Exception:
pass
return defaults


# --- Scoring tier thresholds (loaded from scoring_rules.yaml) ---
SCORE_TIERS = _load_thresholds()
# --- Scoring tier thresholds (used for report recommendations) ---
SCORE_TIERS = {
"CRITICAL": 120,
"HIGH": 85,
"MEDIUM": 55,
"LOW": 30,
}


def get_score_tier(score):
Expand Down Expand Up @@ -264,10 +235,10 @@ def run_ghidra_analysis(args_tuple):
os.makedirs(project_dir, exist_ok=True)

if sys.platform == "win32":
headless = os.path.join(ghidra_path, "support", "analyzeHeadless.bat")
headless = os.path.join(ghidra_path, "support", "pyghidraRun.bat")
else:
headless = os.path.join(ghidra_path, "support", "analyzeHeadless")
headless = os.path.join(ghidra_path, "support", "pyghidraRun")

if not os.path.exists(headless):
return None, f"Ghidra headless not found at {headless}"

Expand All @@ -276,6 +247,7 @@ def run_ghidra_analysis(args_tuple):

cmd = [
headless,
"--headless",
project_dir,
f"triage_{driver_name}",
"-import", driver_path,
Expand Down Expand Up @@ -314,26 +286,13 @@ def run_ghidra_analysis(args_tuple):
return None, str(e)


def run_prefilter(drivers_dir, max_size_mb=5, min_risk_hint=0):
"""Run the pefile pre-filter to eliminate uninteresting drivers.

Args:
min_risk_hint: Minimum prefilter risk_hint score to send to Ghidra.
0 = send everything with attack surface (default, backward compat).
1+ = skip low-potential drivers before Ghidra (saves time).
"""
def run_prefilter(drivers_dir, max_size_mb=5):
"""Run the pefile pre-filter to eliminate uninteresting drivers."""
try:
from prefilter import prefilter_directory
max_bytes = max_size_mb * 1024 * 1024
results = prefilter_directory(drivers_dir, max_bytes, check_loldrivers=True)
analyze = results["analyze"]
if min_risk_hint > 0:
before = len(analyze)
analyze = [d for d in analyze if d.get("risk_hint", 0) >= min_risk_hint]
skipped = before - len(analyze)
if skipped:
print(f" Pre-filter: {skipped} drivers below risk_hint {min_risk_hint} (skipped before Ghidra)")
return [d["path"] for d in analyze]
return [d["path"] for d in results["analyze"]]
except ImportError:
print("WARNING: prefilter.py not found or pefile not installed.")
print(" Install: pip install pefile")
Expand Down Expand Up @@ -385,11 +344,8 @@ def write_csv(results, output_path):
print(f"\nResults written to: {output_path}")


def print_summary(results, min_tier="HIGH"):
def print_summary(results):
"""Print a quick summary to terminal."""
tier_order = ["CRITICAL", "HIGH", "MEDIUM", "LOW", "SKIP"]
min_tier_idx = tier_order.index(min_tier) if min_tier in tier_order else 1

total = len(results)
critical = sum(1 for r in results if r.get("priority") == "CRITICAL")
high = sum(1 for r in results if r.get("priority") == "HIGH")
Expand All @@ -408,18 +364,13 @@ def print_summary(results, min_tier="HIGH"):
print()

results.sort(key=lambda x: x.get("score", 0), reverse=True)
# Filter to min_tier and above
filtered = [r for r in results
if r.get("priority", "SKIP") in tier_order[:min_tier_idx + 1]]
if filtered:
tier_label = f" (>= {min_tier})" if min_tier != "SKIP" else ""
print(f"Top targets{tier_label}:")
print()
for i, r in enumerate(filtered[:20], 1):
if results:
print("Top targets:")
for i, r in enumerate(results[:20], 1):
driver = r.get("driver", {})
print(f" {i:2d}. [{r.get('priority', '?'):8s}] {r.get('score', 0):3d} pts {driver.get('name', '?')}")
elif results:
print(f"No drivers at {min_tier} tier or above. Use --min-tier MEDIUM to see more.")
dc = r.get("driver_class", {})
cls_tag = f" [{dc['class']}]" if dc and dc.get("class", "UNKNOWN") != "UNKNOWN" else ""
print(f" {i:2d}. [{r.get('priority', '?'):6s}] {r.get('score', 0):3d} pts {driver.get('name', '?')}{cls_tag}")


def run_analysis(drivers, ghidra_path, script_path, project_dir, workers=1, json_output=None):
Expand Down Expand Up @@ -792,13 +743,10 @@ def main():
parser = argparse.ArgumentParser(
description="🌳 Cthaeh - Driver vulnerability triage scanner",
epilog="""Examples:
python run_triage.py C:\\drivers # Scan with smart defaults (shows HIGH+ only)
python run_triage.py C:\\drivers --prefilter-min 3 # Aggressive filter (fewer drivers to Ghidra)
python run_triage.py C:\\drivers --prefilter-min 0 # Analyze everything with attack surface
python run_triage.py C:\\drivers --min-tier CRITICAL # Only show CRITICAL in top targets
python run_triage.py C:\\drivers --no-prefilter # Skip pre-filter entirely
python run_triage.py C:\\drivers # Scan with smart defaults
python run_triage.py C:\\drivers --no-prefilter # Skip pre-filter
python run_triage.py --single C:\\path\\to\\driver.sys
python run_triage.py --explain amdfendr.sys # Explain existing results
python run_triage.py --explain amdfendr.sys # Explain existing results
""",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
Expand All @@ -814,9 +762,6 @@ def main():
help="Parallel Ghidra instances (default: auto = half CPUs)")
parser.add_argument("--no-prefilter", action="store_true",
help="Disable pefile pre-filter (on by default)")
parser.add_argument("--prefilter-min", type=int, default=1,
help="Minimum prefilter risk_hint to send to Ghidra (default: 1). "
"Higher = fewer drivers analyzed = faster scan. 0 = analyze everything with attack surface.")
parser.add_argument("--max-size", type=int, default=5,
help="Max driver size in MB for pre-filter (default: 5)")
parser.add_argument("--no-json", action="store_true",
Expand All @@ -827,9 +772,6 @@ def main():
parser.add_argument("--report", help="Markdown report path (default: triage_report.md)")
parser.add_argument("--report-top", type=int, default=20,
help="Number of top drivers to include in report (default: 20)")
parser.add_argument("--min-tier", default="HIGH",
choices=["CRITICAL", "HIGH", "MEDIUM", "LOW", "SKIP"],
help="Minimum tier to show in Top targets display (default: HIGH). Counts/report/JSON still include all.")
parser.add_argument("--explain", help="Show detailed scoring breakdown for a specific driver (by name)")
parser.add_argument("--hw-check", action="store_true",
help="Check hardware presence after triage (Windows only)")
Expand Down Expand Up @@ -879,12 +821,12 @@ def main():

# Validate Ghidra path
if sys.platform == "win32":
headless = os.path.join(ghidra_path, "support", "analyzeHeadless.bat")
headless = os.path.join(ghidra_path, "support", "pyghidraRun.bat")
else:
headless = os.path.join(ghidra_path, "support", "analyzeHeadless")
headless = os.path.join(ghidra_path, "support", "pyghidraRun")

if not os.path.exists(headless):
parser.error(f"Invalid Ghidra path: {ghidra_path} (no analyzeHeadless found in support/)")
parser.error(f"Invalid Ghidra path: {ghidra_path} (no pyghidraRun found in support/)")

# Auto-detect worker count
workers = args.workers if args.workers > 0 else detect_cpu_count()
Expand All @@ -909,7 +851,7 @@ def main():
else:
if use_prefilter:
print(f"Running pre-filter on {drivers_dir}...")
filtered = run_prefilter(drivers_dir, args.max_size, min_risk_hint=args.prefilter_min)
filtered = run_prefilter(drivers_dir, args.max_size)
if filtered is not None:
drivers = filtered
else:
Expand Down Expand Up @@ -986,7 +928,7 @@ def main():

if report_output:
write_report(results, report_output, args.report_top)
print_summary(results, min_tier=args.min_tier)
print_summary(results)

if results:
if args.explain:
Expand Down