diff --git a/src/firm/bounty/factory.py b/src/firm/bounty/factory.py index a266fa1..485437f 100644 --- a/src/firm/bounty/factory.py +++ b/src/firm/bounty/factory.py @@ -4,14 +4,24 @@ definition. The factory wires up scope enforcement, vuln database, dedup engine, triage pipeline, and reward engine. +Agent model names can be overridden at runtime via environment variables. +The variable name is derived from the agent name: + ``FIRM__MODEL`` where hyphens are replaced by + underscores. For example, to override the ``hunt-director`` model:: + + FIRM_HUNT_DIRECTOR_MODEL=gpt-4o python -m firm ... + +A global fallback ``FIRM_DEFAULT_MODEL`` applies to any agent whose +specific variable is not set. + ⚠️ Contenu généré par IA — validation humaine requise avant utilisation. """ from __future__ import annotations +import os from dataclasses import dataclass -from pathlib import Path -from typing import Any, Optional +from typing import Any from firm.bounty.dedup import DeduplicationEngine from firm.bounty.reward import RewardEngine @@ -20,7 +30,6 @@ from firm.bounty.triage import TriagePipeline from firm.bounty.vulnerability import VulnDatabase - # --------------------------------------------------------------------------- # Agent specs # --------------------------------------------------------------------------- @@ -33,52 +42,72 @@ class AgentSpec: description: str +def _resolve_model(agent_name: str, default_model: str) -> str: + """Return the model to use for *agent_name*. + + Resolution order (first match wins): + + 1. ``FIRM__MODEL`` env var (hyphens → underscores). + Example: ``FIRM_HUNT_DIRECTOR_MODEL`` for the ``hunt-director`` agent. + 2. ``FIRM_DEFAULT_MODEL`` env var — applies to **all** agents. + 3. The hard-coded *default_model* built into the factory. + """ + specific_var = "FIRM_" + agent_name.upper().replace("-", "_") + "_MODEL" + specific = os.environ.get(specific_var, "").strip() + if specific: + return specific + global_default = os.environ.get("FIRM_DEFAULT_MODEL", "").strip() + if global_default: + return global_default + return default_model + + BOUNTY_AGENTS: list[AgentSpec] = [ AgentSpec( name="hunt-director", - model="claude-sonnet-4-20250514", + model=_resolve_model("hunt-director", "claude-sonnet-4-20250514"), initial_authority=0.90, description="Campaign coordinator — plans phases, assigns targets, synthesises results.", ), AgentSpec( name="recon-agent", - model="gpt-4.1", + model=_resolve_model("recon-agent", "gpt-4.1"), initial_authority=0.70, description="Reconnaissance specialist — subdomains, ports, tech stack, URL crawling.", ), AgentSpec( name="web-hunter", - model="claude-sonnet-4-20250514", + model=_resolve_model("web-hunter", "claude-sonnet-4-20250514"), initial_authority=0.65, description="Web vulnerability hunter — SQLi, XSS, SSRF, IDOR on web apps.", ), AgentSpec( name="api-hunter", - model="gpt-4o", + model=_resolve_model("api-hunter", "gpt-4o"), initial_authority=0.65, description="API vulnerability hunter — auth bypass, BOLA, rate limiting, GraphQL.", ), AgentSpec( name="code-auditor", - model="o4-mini", + model=_resolve_model("code-auditor", "o4-mini"), initial_authority=0.60, description="Static code auditor — semgrep, pattern-based detection, dependency audit.", ), AgentSpec( name="mobile-hunter", - model="claude-sonnet-4-20250514", + model=_resolve_model("mobile-hunter", "claude-sonnet-4-20250514"), initial_authority=0.55, description="Mobile app security — APK/IPA analysis, certificate pinning, local storage.", ), AgentSpec( name="web3-hunter", - model="gpt-4.1", + model=_resolve_model("web3-hunter", "gpt-4.1"), initial_authority=0.55, description="Smart contract / blockchain security — reentrancy, flash loans, oracle manipulation.", ), AgentSpec( name="report-writer", - model="gpt-4o", + model=_resolve_model("report-writer", "gpt-4o"), initial_authority=0.40, description="Report writer — crafts clear, detailed H1 reports with reproduction steps.", ), diff --git a/src/firm/bounty/tools/scanner.py b/src/firm/bounty/tools/scanner.py index 8d4f965..f54474d 100644 --- a/src/firm/bounty/tools/scanner.py +++ b/src/firm/bounty/tools/scanner.py @@ -1,28 +1,29 @@ """LLM-callable security tools — recon, scan, report. Every tool is scope-enforced and rate-limited so agents cannot go off-target. -CLI tools are invoked via subprocess; the Go ``httpx`` binary is called via -its full path (``/opt/homebrew/bin/httpx``) to avoid collision with the -Python ``httpx`` pip package. +CLI tools are invoked via subprocess; the Go ``httpx`` binary is resolved via +``shutil.which`` to work on any platform. The ``HTTPX_BIN`` environment +variable can override the binary path explicitly, and ``FFUF_WORDLIST`` can +override the default wordlist used by ``scan_ffuf``. ⚠️ Contenu généré par IA — validation humaine requise avant utilisation. """ from __future__ import annotations -import asyncio -import json +import os import shutil +import socket +import ssl import subprocess import tempfile import time -from dataclasses import dataclass, field +from dataclasses import dataclass from pathlib import Path -from typing import Any, Callable +from typing import Any from firm.bounty.scope import ScopeEnforcer - # --------------------------------------------------------------------------- # Rate limiter (token-bucket per target) # --------------------------------------------------------------------------- @@ -81,8 +82,60 @@ def _run(cmd: list[str], timeout: int = 120) -> str: return f"[TOOL NOT FOUND: {cmd[0]}]" -# Path to Go httpx binary (avoids Python httpx-cli collision) -_HTTPX_BIN = "/opt/homebrew/bin/httpx" +def _resolve_httpx_bin() -> str: + """Return the path to the Go httpx binary. + + Resolution order (first match wins): + 1. ``HTTPX_BIN`` environment variable. + 2. ``httpx-toolkit`` on ``$PATH`` (common Linux package name). + 3. ``httpx`` on ``$PATH`` (generic name, may collide with Python package). + 4. macOS Homebrew default ``/opt/homebrew/bin/httpx``. + + If none is found the first available name is returned anyway so that + the caller receives a useful ``[TOOL NOT FOUND]`` message rather than + a silent failure. + """ + env_override = os.environ.get("HTTPX_BIN", "").strip() + if env_override: + return env_override + for candidate in ("httpx-toolkit", "httpx"): + found = shutil.which(candidate) + if found: + return found + # macOS Homebrew fallback + homebrew_path = "/opt/homebrew/bin/httpx" + if Path(homebrew_path).exists(): + return homebrew_path + return "httpx-toolkit" # will produce a clear TOOL NOT FOUND message + + +_DEFAULT_FFUF_WORDLISTS: list[str] = [ + "/usr/share/seclists/Discovery/Web-Content/common.txt", # Debian/Ubuntu + "/usr/share/SecLists/Discovery/Web-Content/common.txt", # some distros + str(Path.home() / "SecLists/Discovery/Web-Content/common.txt"), # macOS/user install + "/opt/homebrew/share/seclists/Discovery/Web-Content/common.txt", # Homebrew +] + + +def _resolve_ffuf_wordlist(wordlist: str = "") -> str | None: + """Return the first available wordlist path. + + Resolution order: + 1. Explicit ``wordlist`` argument (if non-empty and the file exists). + 2. ``FFUF_WORDLIST`` environment variable. + 3. Platform-specific default locations (see ``_DEFAULT_FFUF_WORDLISTS``). + + Returns ``None`` when no wordlist can be found. + """ + if wordlist: + return wordlist if Path(wordlist).exists() else None + env_wl = os.environ.get("FFUF_WORDLIST", "").strip() + if env_wl: + return env_wl if Path(env_wl).exists() else None + for path in _DEFAULT_FFUF_WORDLISTS: + if Path(path).exists(): + return path + return None # --------------------------------------------------------------------------- @@ -123,7 +176,7 @@ def recon_tech(target: str) -> str: return f"BLOCKED: {target} is not in scope." if not limiter.allow(target): return "RATE LIMITED — retry later." - httpx_bin = _HTTPX_BIN if Path(_HTTPX_BIN).exists() else "httpx-toolkit" + httpx_bin = _resolve_httpx_bin() return _run( [httpx_bin, "-u", f"https://{target}", "-tech-detect", "-status-code", "-title", "-silent"], @@ -201,13 +254,9 @@ def scan_ffuf(target: str, wordlist: str = "", path: str = "/FUZZ") -> str: return f"BLOCKED: {target} is not in scope." if not limiter.allow(target): return "RATE LIMITED — retry later." - wl = wordlist or "/usr/share/seclists/Discovery/Web-Content/common.txt" - if not Path(wl).exists(): - wl_alt = Path.home() / "SecLists/Discovery/Web-Content/common.txt" - if wl_alt.exists(): - wl = str(wl_alt) - else: - return "[WORDLIST NOT FOUND — install SecLists]" + wl = _resolve_ffuf_wordlist(wordlist) + if wl is None: + return "[WORDLIST NOT FOUND — set FFUF_WORDLIST or install SecLists]" url = f"https://{target}{path}" return _run( ["ffuf", "-u", url, "-w", wl, @@ -278,8 +327,6 @@ def scan_ssl(target: str) -> str: return f"BLOCKED: {target} is not in scope." if not limiter.allow(target): return "RATE LIMITED — retry later." - import ssl - import socket findings: list[str] = [] try: ctx = ssl.create_default_context() diff --git a/tests/test_bounty_factory.py b/tests/test_bounty_factory.py index c56e6cd..b1dab04 100644 --- a/tests/test_bounty_factory.py +++ b/tests/test_bounty_factory.py @@ -5,7 +5,7 @@ import pytest -from firm.bounty.factory import BOUNTY_AGENTS, create_bounty_firm +from firm.bounty.factory import BOUNTY_AGENTS, _resolve_model, create_bounty_firm from firm.bounty.scope import Asset, AssetType, TargetScope @@ -56,3 +56,34 @@ def test_bounty_agents_list(self): def test_authority_range(self): for agent in BOUNTY_AGENTS: assert 0.0 <= agent.initial_authority <= 1.0 + + +# --------------------------------------------------------------------------- +# Model resolution +# --------------------------------------------------------------------------- + +class TestResolveModel: + def test_default_model_returned_without_env(self, monkeypatch): + monkeypatch.delenv("FIRM_HUNT_DIRECTOR_MODEL", raising=False) + monkeypatch.delenv("FIRM_DEFAULT_MODEL", raising=False) + assert _resolve_model("hunt-director", "claude-default") == "claude-default" + + def test_specific_env_var_overrides_default(self, monkeypatch): + monkeypatch.setenv("FIRM_HUNT_DIRECTOR_MODEL", "my-custom-model") + monkeypatch.delenv("FIRM_DEFAULT_MODEL", raising=False) + assert _resolve_model("hunt-director", "claude-default") == "my-custom-model" + + def test_global_default_env_var(self, monkeypatch): + monkeypatch.delenv("FIRM_HUNT_DIRECTOR_MODEL", raising=False) + monkeypatch.setenv("FIRM_DEFAULT_MODEL", "gpt-4-turbo") + assert _resolve_model("hunt-director", "claude-default") == "gpt-4-turbo" + + def test_specific_env_var_takes_priority_over_global(self, monkeypatch): + monkeypatch.setenv("FIRM_RECON_AGENT_MODEL", "specific-model") + monkeypatch.setenv("FIRM_DEFAULT_MODEL", "global-model") + assert _resolve_model("recon-agent", "fallback") == "specific-model" + + def test_hyphen_to_underscore_conversion(self, monkeypatch): + monkeypatch.setenv("FIRM_REPORT_WRITER_MODEL", "report-model") + monkeypatch.delenv("FIRM_DEFAULT_MODEL", raising=False) + assert _resolve_model("report-writer", "fallback") == "report-model" diff --git a/tests/test_bounty_tools.py b/tests/test_bounty_tools.py index 506d806..a5926bf 100644 --- a/tests/test_bounty_tools.py +++ b/tests/test_bounty_tools.py @@ -6,19 +6,21 @@ ⚠️ Contenu généré par IA — validation humaine requise avant utilisation. """ +from unittest.mock import patch + import pytest -from unittest.mock import patch, MagicMock from firm.bounty.scope import Asset, AssetType, ScopeEnforcer, TargetScope from firm.bounty.tools.scanner import ( RateLimiter, + _resolve_ffuf_wordlist, + _resolve_httpx_bin, make_bounty_tools, make_recon_tools, make_report_tools, make_scanning_tools, ) - # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @@ -154,3 +156,76 @@ def test_report_generate(self): ) assert "# XSS in search" in result assert "CWE-79" in result + + +# --------------------------------------------------------------------------- +# Binary / wordlist resolution +# --------------------------------------------------------------------------- + +class TestResolveHttpxBin: + def test_env_var_takes_priority(self, monkeypatch): + monkeypatch.setenv("HTTPX_BIN", "/custom/httpx") + assert _resolve_httpx_bin() == "/custom/httpx" + + def test_env_var_empty_falls_through(self, monkeypatch, tmp_path): + monkeypatch.setenv("HTTPX_BIN", "") + # Ensure no tool is found on PATH and Homebrew path does not exist + with patch("shutil.which", return_value=None), \ + patch("firm.bounty.tools.scanner.Path") as mock_path: + mock_path.return_value.exists.return_value = False + result = _resolve_httpx_bin() + assert result == "httpx-toolkit" + + def test_shutil_which_used_when_no_env(self, monkeypatch): + monkeypatch.delenv("HTTPX_BIN", raising=False) + with patch("shutil.which", side_effect=lambda name: "/usr/bin/httpx-toolkit" if name == "httpx-toolkit" else None): + result = _resolve_httpx_bin() + assert result == "/usr/bin/httpx-toolkit" + + +class TestResolveFfufWordlist: + def test_explicit_wordlist_returned_if_exists(self, tmp_path): + wl = tmp_path / "wordlist.txt" + wl.write_text("admin\nlogin\n") + assert _resolve_ffuf_wordlist(str(wl)) == str(wl) + + def test_explicit_wordlist_returns_none_if_missing(self): + assert _resolve_ffuf_wordlist("/does/not/exist.txt") is None + + def test_env_var_used_when_no_explicit(self, monkeypatch, tmp_path): + wl = tmp_path / "custom.txt" + wl.write_text("secret\n") + monkeypatch.setenv("FFUF_WORDLIST", str(wl)) + assert _resolve_ffuf_wordlist() == str(wl) + + def test_env_var_missing_file_returns_none(self, monkeypatch): + monkeypatch.setenv("FFUF_WORDLIST", "/nonexistent/wordlist.txt") + assert _resolve_ffuf_wordlist() is None + + def test_returns_none_when_nothing_available(self, monkeypatch): + monkeypatch.delenv("FFUF_WORDLIST", raising=False) + with patch("firm.bounty.tools.scanner.Path") as mock_path: + mock_path.return_value.exists.return_value = False + mock_path.home.return_value.__truediv__ = lambda *_: mock_path.return_value + result = _resolve_ffuf_wordlist() + assert result is None + + @patch("firm.bounty.tools.scanner._run", return_value="") + def test_scan_ffuf_wordlist_not_found_message(self, mock_run, enforcer, limiter, monkeypatch): + monkeypatch.delenv("FFUF_WORDLIST", raising=False) + with patch("firm.bounty.tools.scanner._resolve_ffuf_wordlist", return_value=None): + tools = {t["name"]: t["callable"] for t in make_scanning_tools(enforcer, limiter)} + result = tools["scan_ffuf"]("target.com") + assert "WORDLIST NOT FOUND" in result + mock_run.assert_not_called() + + @patch("firm.bounty.tools.scanner._run", return_value="ffuf output") + def test_scan_ffuf_uses_env_wordlist(self, mock_run, enforcer, limiter, monkeypatch, tmp_path): + wl = tmp_path / "wl.txt" + wl.write_text("index\n") + monkeypatch.setenv("FFUF_WORDLIST", str(wl)) + tools = {t["name"]: t["callable"] for t in make_scanning_tools(enforcer, limiter)} + result = tools["scan_ffuf"]("target.com") + assert "BLOCKED" not in result + assert "WORDLIST NOT FOUND" not in result + mock_run.assert_called_once()