Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions src/firm/bounty/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,24 @@
definition. The factory wires up scope enforcement, vuln database,
dedup engine, triage pipeline, and reward engine.

Agent model names can be overridden at runtime via environment variables.
The variable name is derived from the agent name:
``FIRM_<AGENT_NAME_UPPER>_MODEL`` where hyphens are replaced by
underscores. For example, to override the ``hunt-director`` model::

FIRM_HUNT_DIRECTOR_MODEL=gpt-4o python -m firm ...

A global fallback ``FIRM_DEFAULT_MODEL`` applies to any agent whose
specific variable is not set.

⚠️ Contenu généré par IA — validation humaine requise avant utilisation.
"""

from __future__ import annotations

import os
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional
from typing import Any

from firm.bounty.dedup import DeduplicationEngine
from firm.bounty.reward import RewardEngine
Expand All @@ -20,7 +30,6 @@
from firm.bounty.triage import TriagePipeline
from firm.bounty.vulnerability import VulnDatabase


# ---------------------------------------------------------------------------
# Agent specs
# ---------------------------------------------------------------------------
Expand All @@ -33,52 +42,72 @@ class AgentSpec:
description: str


def _resolve_model(agent_name: str, default_model: str) -> str:
"""Return the model to use for *agent_name*.

Resolution order (first match wins):

1. ``FIRM_<AGENT_NAME_UPPER>_MODEL`` env var (hyphens → underscores).
Example: ``FIRM_HUNT_DIRECTOR_MODEL`` for the ``hunt-director`` agent.
2. ``FIRM_DEFAULT_MODEL`` env var — applies to **all** agents.
3. The hard-coded *default_model* built into the factory.
"""
specific_var = "FIRM_" + agent_name.upper().replace("-", "_") + "_MODEL"
specific = os.environ.get(specific_var, "").strip()
if specific:
return specific
global_default = os.environ.get("FIRM_DEFAULT_MODEL", "").strip()
if global_default:
return global_default
return default_model


BOUNTY_AGENTS: list[AgentSpec] = [
AgentSpec(
name="hunt-director",
model="claude-sonnet-4-20250514",
model=_resolve_model("hunt-director", "claude-sonnet-4-20250514"),
initial_authority=0.90,
description="Campaign coordinator — plans phases, assigns targets, synthesises results.",
),
AgentSpec(
name="recon-agent",
model="gpt-4.1",
model=_resolve_model("recon-agent", "gpt-4.1"),
initial_authority=0.70,
description="Reconnaissance specialist — subdomains, ports, tech stack, URL crawling.",
),
AgentSpec(
name="web-hunter",
model="claude-sonnet-4-20250514",
model=_resolve_model("web-hunter", "claude-sonnet-4-20250514"),
initial_authority=0.65,
description="Web vulnerability hunter — SQLi, XSS, SSRF, IDOR on web apps.",
),
AgentSpec(
name="api-hunter",
model="gpt-4o",
model=_resolve_model("api-hunter", "gpt-4o"),
initial_authority=0.65,
description="API vulnerability hunter — auth bypass, BOLA, rate limiting, GraphQL.",
),
AgentSpec(
name="code-auditor",
model="o4-mini",
model=_resolve_model("code-auditor", "o4-mini"),
initial_authority=0.60,
description="Static code auditor — semgrep, pattern-based detection, dependency audit.",
),
AgentSpec(
name="mobile-hunter",
model="claude-sonnet-4-20250514",
model=_resolve_model("mobile-hunter", "claude-sonnet-4-20250514"),
initial_authority=0.55,
description="Mobile app security — APK/IPA analysis, certificate pinning, local storage.",
),
AgentSpec(
name="web3-hunter",
model="gpt-4.1",
model=_resolve_model("web3-hunter", "gpt-4.1"),
initial_authority=0.55,
description="Smart contract / blockchain security — reentrancy, flash loans, oracle manipulation.",
),
AgentSpec(
name="report-writer",
model="gpt-4o",
model=_resolve_model("report-writer", "gpt-4o"),
initial_authority=0.40,
description="Report writer — crafts clear, detailed H1 reports with reproduction steps.",
),
Comment on lines 65 to 113
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_resolve_model() is called at module import time for each agent in BOUNTY_AGENTS. This means env var overrides (FIRM_*_MODEL, FIRM_DEFAULT_MODEL) are "frozen" at first import. For the documented CLI workflow (env vars set before firm bounty ...), this works because cli.py uses lazy imports. However, programmatic callers who set env vars and then call create_bounty_firm() after the module has already been imported won't see the updated models. Consider deferring model resolution into create_bounty_firm() so it always reads current env vars, or document this limitation explicitly.

Copilot uses AI. Check for mistakes.
Expand Down
87 changes: 67 additions & 20 deletions src/firm/bounty/tools/scanner.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
"""LLM-callable security tools — recon, scan, report.

Every tool is scope-enforced and rate-limited so agents cannot go off-target.
CLI tools are invoked via subprocess; the Go ``httpx`` binary is called via
its full path (``/opt/homebrew/bin/httpx``) to avoid collision with the
Python ``httpx`` pip package.
CLI tools are invoked via subprocess; the Go ``httpx`` binary is resolved via
``shutil.which`` to work on any platform. The ``HTTPX_BIN`` environment
variable can override the binary path explicitly, and ``FFUF_WORDLIST`` can
override the default wordlist used by ``scan_ffuf``.

⚠️ Contenu généré par IA — validation humaine requise avant utilisation.
"""

from __future__ import annotations

import asyncio
import json
import os
import shutil
import socket
import ssl
import subprocess
import tempfile
import time
from dataclasses import dataclass, field
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable
from typing import Any

from firm.bounty.scope import ScopeEnforcer


# ---------------------------------------------------------------------------
# Rate limiter (token-bucket per target)
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -81,8 +82,60 @@ def _run(cmd: list[str], timeout: int = 120) -> str:
return f"[TOOL NOT FOUND: {cmd[0]}]"


# Path to Go httpx binary (avoids Python httpx-cli collision)
_HTTPX_BIN = "/opt/homebrew/bin/httpx"
def _resolve_httpx_bin() -> str:
"""Return the path to the Go httpx binary.

Resolution order (first match wins):
1. ``HTTPX_BIN`` environment variable.
2. ``httpx-toolkit`` on ``$PATH`` (common Linux package name).
3. ``httpx`` on ``$PATH`` (generic name, may collide with Python package).
4. macOS Homebrew default ``/opt/homebrew/bin/httpx``.

If none is found the first available name is returned anyway so that
the caller receives a useful ``[TOOL NOT FOUND]`` message rather than
a silent failure.
"""
env_override = os.environ.get("HTTPX_BIN", "").strip()
if env_override:
return env_override
for candidate in ("httpx-toolkit", "httpx"):
found = shutil.which(candidate)
if found:
return found
# macOS Homebrew fallback
homebrew_path = "/opt/homebrew/bin/httpx"
if Path(homebrew_path).exists():
return homebrew_path
return "httpx-toolkit" # will produce a clear TOOL NOT FOUND message


_DEFAULT_FFUF_WORDLISTS: list[str] = [
"/usr/share/seclists/Discovery/Web-Content/common.txt", # Debian/Ubuntu
"/usr/share/SecLists/Discovery/Web-Content/common.txt", # some distros
str(Path.home() / "SecLists/Discovery/Web-Content/common.txt"), # macOS/user install
"/opt/homebrew/share/seclists/Discovery/Web-Content/common.txt", # Homebrew
]
Comment on lines +112 to +117
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Path.home() is now called at module import time (as part of _DEFAULT_FFUF_WORDLISTS), whereas previously the equivalent Path.home() call was inside the scan_ffuf function body and only executed when that tool was invoked. If Path.home() raises a RuntimeError (e.g., in a containerized environment with no HOME set), it will now prevent the entire scanner module from being imported, rather than just causing that one tool call to fail. Consider computing this path lazily inside _resolve_ffuf_wordlist() instead, or wrapping it with a try/except at module level.

Copilot uses AI. Check for mistakes.


def _resolve_ffuf_wordlist(wordlist: str = "") -> str | None:
"""Return the first available wordlist path.

Resolution order:
1. Explicit ``wordlist`` argument (if non-empty and the file exists).
2. ``FFUF_WORDLIST`` environment variable.
3. Platform-specific default locations (see ``_DEFAULT_FFUF_WORDLISTS``).

Returns ``None`` when no wordlist can be found.
"""
if wordlist:
return wordlist if Path(wordlist).exists() else None
env_wl = os.environ.get("FFUF_WORDLIST", "").strip()
if env_wl:
return env_wl if Path(env_wl).exists() else None
for path in _DEFAULT_FFUF_WORDLISTS:
if Path(path).exists():
return path
return None


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -123,7 +176,7 @@ def recon_tech(target: str) -> str:
return f"BLOCKED: {target} is not in scope."
if not limiter.allow(target):
return "RATE LIMITED — retry later."
httpx_bin = _HTTPX_BIN if Path(_HTTPX_BIN).exists() else "httpx-toolkit"
httpx_bin = _resolve_httpx_bin()
return _run(
[httpx_bin, "-u", f"https://{target}",
"-tech-detect", "-status-code", "-title", "-silent"],
Expand Down Expand Up @@ -201,13 +254,9 @@ def scan_ffuf(target: str, wordlist: str = "", path: str = "/FUZZ") -> str:
return f"BLOCKED: {target} is not in scope."
if not limiter.allow(target):
return "RATE LIMITED — retry later."
wl = wordlist or "/usr/share/seclists/Discovery/Web-Content/common.txt"
if not Path(wl).exists():
wl_alt = Path.home() / "SecLists/Discovery/Web-Content/common.txt"
if wl_alt.exists():
wl = str(wl_alt)
else:
return "[WORDLIST NOT FOUND — install SecLists]"
wl = _resolve_ffuf_wordlist(wordlist)
if wl is None:
return "[WORDLIST NOT FOUND — set FFUF_WORDLIST or install SecLists]"
url = f"https://{target}{path}"
return _run(
["ffuf", "-u", url, "-w", wl,
Expand Down Expand Up @@ -278,8 +327,6 @@ def scan_ssl(target: str) -> str:
return f"BLOCKED: {target} is not in scope."
if not limiter.allow(target):
return "RATE LIMITED — retry later."
import ssl
import socket
findings: list[str] = []
try:
ctx = ssl.create_default_context()
Expand Down
33 changes: 32 additions & 1 deletion tests/test_bounty_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import pytest

from firm.bounty.factory import BOUNTY_AGENTS, create_bounty_firm
from firm.bounty.factory import BOUNTY_AGENTS, _resolve_model, create_bounty_firm
from firm.bounty.scope import Asset, AssetType, TargetScope


Expand Down Expand Up @@ -56,3 +56,34 @@ def test_bounty_agents_list(self):
def test_authority_range(self):
for agent in BOUNTY_AGENTS:
assert 0.0 <= agent.initial_authority <= 1.0


# ---------------------------------------------------------------------------
# Model resolution
# ---------------------------------------------------------------------------

class TestResolveModel:
def test_default_model_returned_without_env(self, monkeypatch):
monkeypatch.delenv("FIRM_HUNT_DIRECTOR_MODEL", raising=False)
monkeypatch.delenv("FIRM_DEFAULT_MODEL", raising=False)
assert _resolve_model("hunt-director", "claude-default") == "claude-default"

def test_specific_env_var_overrides_default(self, monkeypatch):
monkeypatch.setenv("FIRM_HUNT_DIRECTOR_MODEL", "my-custom-model")
monkeypatch.delenv("FIRM_DEFAULT_MODEL", raising=False)
assert _resolve_model("hunt-director", "claude-default") == "my-custom-model"

def test_global_default_env_var(self, monkeypatch):
monkeypatch.delenv("FIRM_HUNT_DIRECTOR_MODEL", raising=False)
monkeypatch.setenv("FIRM_DEFAULT_MODEL", "gpt-4-turbo")
assert _resolve_model("hunt-director", "claude-default") == "gpt-4-turbo"

def test_specific_env_var_takes_priority_over_global(self, monkeypatch):
monkeypatch.setenv("FIRM_RECON_AGENT_MODEL", "specific-model")
monkeypatch.setenv("FIRM_DEFAULT_MODEL", "global-model")
assert _resolve_model("recon-agent", "fallback") == "specific-model"

def test_hyphen_to_underscore_conversion(self, monkeypatch):
monkeypatch.setenv("FIRM_REPORT_WRITER_MODEL", "report-model")
monkeypatch.delenv("FIRM_DEFAULT_MODEL", raising=False)
assert _resolve_model("report-writer", "fallback") == "report-model"
79 changes: 77 additions & 2 deletions tests/test_bounty_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@
⚠️ Contenu généré par IA — validation humaine requise avant utilisation.
"""

from unittest.mock import patch

import pytest
from unittest.mock import patch, MagicMock

from firm.bounty.scope import Asset, AssetType, ScopeEnforcer, TargetScope
from firm.bounty.tools.scanner import (
RateLimiter,
_resolve_ffuf_wordlist,
_resolve_httpx_bin,
make_bounty_tools,
make_recon_tools,
make_report_tools,
make_scanning_tools,
)


# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -154,3 +156,76 @@ def test_report_generate(self):
)
assert "# XSS in search" in result
assert "CWE-79" in result


# ---------------------------------------------------------------------------
# Binary / wordlist resolution
# ---------------------------------------------------------------------------

class TestResolveHttpxBin:
def test_env_var_takes_priority(self, monkeypatch):
monkeypatch.setenv("HTTPX_BIN", "/custom/httpx")
assert _resolve_httpx_bin() == "/custom/httpx"

def test_env_var_empty_falls_through(self, monkeypatch, tmp_path):
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tmp_path fixture parameter is declared but never used in this test. It can be removed.

Suggested change
def test_env_var_empty_falls_through(self, monkeypatch, tmp_path):
def test_env_var_empty_falls_through(self, monkeypatch):

Copilot uses AI. Check for mistakes.
monkeypatch.setenv("HTTPX_BIN", "")
# Ensure no tool is found on PATH and Homebrew path does not exist
with patch("shutil.which", return_value=None), \
patch("firm.bounty.tools.scanner.Path") as mock_path:
mock_path.return_value.exists.return_value = False
result = _resolve_httpx_bin()
assert result == "httpx-toolkit"

def test_shutil_which_used_when_no_env(self, monkeypatch):
monkeypatch.delenv("HTTPX_BIN", raising=False)
with patch("shutil.which", side_effect=lambda name: "/usr/bin/httpx-toolkit" if name == "httpx-toolkit" else None):
result = _resolve_httpx_bin()
assert result == "/usr/bin/httpx-toolkit"


class TestResolveFfufWordlist:
def test_explicit_wordlist_returned_if_exists(self, tmp_path):
wl = tmp_path / "wordlist.txt"
wl.write_text("admin\nlogin\n")
assert _resolve_ffuf_wordlist(str(wl)) == str(wl)

def test_explicit_wordlist_returns_none_if_missing(self):
assert _resolve_ffuf_wordlist("/does/not/exist.txt") is None

def test_env_var_used_when_no_explicit(self, monkeypatch, tmp_path):
wl = tmp_path / "custom.txt"
wl.write_text("secret\n")
monkeypatch.setenv("FFUF_WORDLIST", str(wl))
assert _resolve_ffuf_wordlist() == str(wl)

def test_env_var_missing_file_returns_none(self, monkeypatch):
monkeypatch.setenv("FFUF_WORDLIST", "/nonexistent/wordlist.txt")
assert _resolve_ffuf_wordlist() is None

def test_returns_none_when_nothing_available(self, monkeypatch):
monkeypatch.delenv("FFUF_WORDLIST", raising=False)
with patch("firm.bounty.tools.scanner.Path") as mock_path:
mock_path.return_value.exists.return_value = False
mock_path.home.return_value.__truediv__ = lambda *_: mock_path.return_value
result = _resolve_ffuf_wordlist()
assert result is None

@patch("firm.bounty.tools.scanner._run", return_value="")
def test_scan_ffuf_wordlist_not_found_message(self, mock_run, enforcer, limiter, monkeypatch):
monkeypatch.delenv("FFUF_WORDLIST", raising=False)
with patch("firm.bounty.tools.scanner._resolve_ffuf_wordlist", return_value=None):
tools = {t["name"]: t["callable"] for t in make_scanning_tools(enforcer, limiter)}
result = tools["scan_ffuf"]("target.com")
assert "WORDLIST NOT FOUND" in result
mock_run.assert_not_called()

@patch("firm.bounty.tools.scanner._run", return_value="ffuf output")
def test_scan_ffuf_uses_env_wordlist(self, mock_run, enforcer, limiter, monkeypatch, tmp_path):
wl = tmp_path / "wl.txt"
wl.write_text("index\n")
monkeypatch.setenv("FFUF_WORDLIST", str(wl))
tools = {t["name"]: t["callable"] for t in make_scanning_tools(enforcer, limiter)}
result = tools["scan_ffuf"]("target.com")
assert "BLOCKED" not in result
assert "WORDLIST NOT FOUND" not in result
mock_run.assert_called_once()
Loading