diff --git a/hexstrike_mcp.py b/hexstrike_mcp.py index 23b083b47..f24149ec5 100644 --- a/hexstrike_mcp.py +++ b/hexstrike_mcp.py @@ -27,6 +27,7 @@ from datetime import datetime from mcp.server.fastmcp import FastMCP +from mcp.types import ToolAnnotations class HexStrikeColors: """Enhanced color palette matching the server's ModernVisualEngine.COLORS""" @@ -280,7 +281,14 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: # CORE NETWORK SCANNING TOOLS # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Nmap Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def nmap_scan(target: str, scan_type: str = "-sV", ports: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute an enhanced Nmap scan against a target with real-time logging. @@ -323,7 +331,14 @@ def nmap_scan(target: str, scan_type: str = "-sV", ports: str = "", additional_a return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Gobuster Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def gobuster_scan(url: str, mode: str = "dir", wordlist: str = "/usr/share/wordlists/dirb/common.txt", additional_args: str = "") -> Dict[str, Any]: """ Execute Gobuster to find directories, DNS subdomains, or virtual hosts with enhanced logging. @@ -367,7 +382,14 @@ def gobuster_scan(url: str, mode: str = "dir", wordlist: str = "/usr/share/wordl return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Nuclei Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def nuclei_scan(target: str, severity: str = "", tags: str = "", template: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute Nuclei vulnerability scanner with enhanced logging and real-time progress. @@ -418,7 +440,14 @@ def nuclei_scan(target: str, severity: str = "", tags: str = "", template: str = # CLOUD SECURITY TOOLS # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Prowler Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def prowler_scan(provider: str = "aws", profile: str = "default", region: str = "", checks: str = "", output_dir: str = "/tmp/prowler_output", output_format: str = "json", additional_args: str = "") -> Dict[str, Any]: """ Execute Prowler for comprehensive cloud security assessment. @@ -452,7 +481,14 @@ def prowler_scan(provider: str = "aws", profile: str = "default", region: str = logger.error(f"❌ Prowler assessment failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Trivy Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def trivy_scan(scan_type: str = "image", target: str = "", output_format: str = "json", severity: str = "", output_file: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute Trivy for container and filesystem vulnerability scanning. @@ -488,7 +524,14 @@ def trivy_scan(scan_type: str = "image", target: str = "", output_format: str = # ENHANCED CLOUD AND CONTAINER SECURITY TOOLS (v6.0) # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Scout Suite Assessment", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def scout_suite_assessment(provider: str = "aws", profile: str = "default", report_dir: str = "/tmp/scout-suite", services: str = "", exceptions: str = "", additional_args: str = "") -> Dict[str, Any]: @@ -522,7 +565,14 @@ def scout_suite_assessment(provider: str = "aws", profile: str = "default", logger.error(f"❌ Scout Suite assessment failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Cloudmapper Analysis", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def cloudmapper_analysis(action: str = "collect", account: str = "", config: str = "config.json", additional_args: str = "") -> Dict[str, Any]: """ @@ -551,7 +601,14 @@ def cloudmapper_analysis(action: str = "collect", account: str = "", logger.error(f"❌ CloudMapper {action} failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Pacu Exploitation", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def pacu_exploitation(session_name: str = "hexstrike_session", modules: str = "", data_services: str = "", regions: str = "", additional_args: str = "") -> Dict[str, Any]: @@ -583,7 +640,14 @@ def pacu_exploitation(session_name: str = "hexstrike_session", modules: str = "" logger.error(f"❌ Pacu exploitation failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Kube Hunter Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def kube_hunter_scan(target: str = "", remote: str = "", cidr: str = "", interface: str = "", active: bool = False, report: str = "json", additional_args: str = "") -> Dict[str, Any]: @@ -619,7 +683,14 @@ def kube_hunter_scan(target: str = "", remote: str = "", cidr: str = "", logger.error(f"❌ kube-hunter scan failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Kube Bench CIS", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def kube_bench_cis(targets: str = "", version: str = "", config_dir: str = "", output_format: str = "json", additional_args: str = "") -> Dict[str, Any]: """ @@ -650,7 +721,14 @@ def kube_bench_cis(targets: str = "", version: str = "", config_dir: str = "", logger.error(f"❌ kube-bench benchmark failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Docker Bench Security Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def docker_bench_security_scan(checks: str = "", exclude: str = "", output_file: str = "/tmp/docker-bench-results.json", additional_args: str = "") -> Dict[str, Any]: @@ -680,7 +758,14 @@ def docker_bench_security_scan(checks: str = "", exclude: str = "", logger.error(f"❌ Docker Bench Security failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Clair Vulnerability Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def clair_vulnerability_scan(image: str, config: str = "/etc/clair/config.yaml", output_format: str = "json", additional_args: str = "") -> Dict[str, Any]: """ @@ -709,7 +794,14 @@ def clair_vulnerability_scan(image: str, config: str = "/etc/clair/config.yaml", logger.error(f"❌ Clair scan failed for {image}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Falco Runtime Monitoring", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def falco_runtime_monitoring(config_file: str = "/etc/falco/falco.yaml", rules_file: str = "", output_format: str = "json", duration: int = 60, additional_args: str = "") -> Dict[str, Any]: @@ -741,7 +833,14 @@ def falco_runtime_monitoring(config_file: str = "/etc/falco/falco.yaml", logger.error(f"❌ Falco monitoring failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Checkov IAC Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def checkov_iac_scan(directory: str = ".", framework: str = "", check: str = "", skip_check: str = "", output_format: str = "json", additional_args: str = "") -> Dict[str, Any]: @@ -775,7 +874,14 @@ def checkov_iac_scan(directory: str = ".", framework: str = "", check: str = "", logger.error(f"❌ Checkov scan failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Terrascan IAC Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def terrascan_iac_scan(scan_type: str = "all", iac_dir: str = ".", policy_type: str = "", output_format: str = "json", severity: str = "", additional_args: str = "") -> Dict[str, Any]: @@ -813,7 +919,14 @@ def terrascan_iac_scan(scan_type: str = "all", iac_dir: str = ".", # FILE OPERATIONS & PAYLOAD GENERATION # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Create File", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def create_file(filename: str, content: str, binary: bool = False) -> Dict[str, Any]: """ Create a file with specified content on the HexStrike server. @@ -839,7 +952,14 @@ def create_file(filename: str, content: str, binary: bool = False) -> Dict[str, logger.error(f"❌ Failed to create file: {filename}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Modify File", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def modify_file(filename: str, content: str, append: bool = False) -> Dict[str, Any]: """ Modify an existing file on the HexStrike server. @@ -865,7 +985,14 @@ def modify_file(filename: str, content: str, append: bool = False) -> Dict[str, logger.error(f"❌ Failed to modify file: {filename}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Delete File", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def delete_file(filename: str) -> Dict[str, Any]: """ Delete a file or directory on the HexStrike server. @@ -887,7 +1014,14 @@ def delete_file(filename: str) -> Dict[str, Any]: logger.error(f"❌ Failed to delete file: {filename}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="List Files", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def list_files(directory: str = ".") -> Dict[str, Any]: """ List files in a directory on the HexStrike server. @@ -907,7 +1041,14 @@ def list_files(directory: str = ".") -> Dict[str, Any]: logger.error(f"❌ Failed to list files in {directory}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Generate Payload", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def generate_payload(payload_type: str = "buffer", size: int = 1024, pattern: str = "A", filename: str = "") -> Dict[str, Any]: """ Generate large payloads for testing and exploitation. @@ -941,7 +1082,14 @@ def generate_payload(payload_type: str = "buffer", size: int = 1024, pattern: st # PYTHON ENVIRONMENT MANAGEMENT # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Install Python Package", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def install_python_package(package: str, env_name: str = "default") -> Dict[str, Any]: """ Install a Python package in a virtual environment on the HexStrike server. @@ -965,7 +1113,14 @@ def install_python_package(package: str, env_name: str = "default") -> Dict[str, logger.error(f"❌ Failed to install package {package}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Execute Python Script", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def execute_python_script(script: str, env_name: str = "default", filename: str = "") -> Dict[str, Any]: """ Execute a Python script in a virtual environment on the HexStrike server. @@ -997,7 +1152,14 @@ def execute_python_script(script: str, env_name: str = "default", filename: str # ADDITIONAL SECURITY TOOLS FROM ORIGINAL IMPLEMENTATION # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Dirb Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def dirb_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", additional_args: str = "") -> Dict[str, Any]: """ Execute Dirb for directory brute forcing with enhanced logging. @@ -1023,7 +1185,14 @@ def dirb_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", logger.error(f"❌ Dirb scan failed for {url}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Nikto Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def nikto_scan(target: str, additional_args: str = "") -> Dict[str, Any]: """ Execute Nikto web vulnerability scanner with enhanced logging. @@ -1047,7 +1216,14 @@ def nikto_scan(target: str, additional_args: str = "") -> Dict[str, Any]: logger.error(f"❌ Nikto scan failed for {target}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Sqlmap Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def sqlmap_scan(url: str, data: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute SQLMap for SQL injection testing with enhanced logging. @@ -1073,7 +1249,14 @@ def sqlmap_scan(url: str, data: str = "", additional_args: str = "") -> Dict[str logger.error(f"❌ SQLMap scan failed for {url}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Metasploit Run", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def metasploit_run(module: str, options: Dict[str, Any] = {}) -> Dict[str, Any]: """ Execute a Metasploit module with enhanced logging. @@ -1097,7 +1280,14 @@ def metasploit_run(module: str, options: Dict[str, Any] = {}) -> Dict[str, Any]: logger.error(f"❌ Metasploit module failed: {module}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Hydra Attack", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def hydra_attack( target: str, service: str, @@ -1139,7 +1329,14 @@ def hydra_attack( logger.error(f"❌ Hydra attack failed for {target}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="John Crack", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def john_crack( hash_file: str, wordlist: str = "/usr/share/wordlists/rockyou.txt", @@ -1172,7 +1369,14 @@ def john_crack( logger.error(f"❌ John the Ripper failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Wpscan Analyze", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def wpscan_analyze(url: str, additional_args: str = "") -> Dict[str, Any]: """ Execute WPScan for WordPress vulnerability scanning with enhanced logging. @@ -1196,7 +1400,14 @@ def wpscan_analyze(url: str, additional_args: str = "") -> Dict[str, Any]: logger.error(f"❌ WPScan failed for {url}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Enum4linux Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def enum4linux_scan(target: str, additional_args: str = "-a") -> Dict[str, Any]: """ Execute Enum4linux for SMB enumeration with enhanced logging. @@ -1220,7 +1431,14 @@ def enum4linux_scan(target: str, additional_args: str = "-a") -> Dict[str, Any]: logger.error(f"❌ Enum4linux failed for {target}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Ffuf Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def ffuf_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", mode: str = "directory", match_codes: str = "200,204,301,302,307,401,403", additional_args: str = "") -> Dict[str, Any]: """ Execute FFuf for web fuzzing with enhanced logging. @@ -1250,7 +1468,14 @@ def ffuf_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", logger.error(f"❌ FFuf fuzzing failed for {url}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Netexec Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def netexec_scan(target: str, protocol: str = "smb", username: str = "", password: str = "", hash_value: str = "", module: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute NetExec (formerly CrackMapExec) for network enumeration with enhanced logging. @@ -1284,7 +1509,14 @@ def netexec_scan(target: str, protocol: str = "smb", username: str = "", passwor logger.error(f"❌ NetExec scan failed for {target}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Amass Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def amass_scan(domain: str, mode: str = "enum", additional_args: str = "") -> Dict[str, Any]: """ Execute Amass for subdomain enumeration with enhanced logging. @@ -1310,7 +1542,14 @@ def amass_scan(domain: str, mode: str = "enum", additional_args: str = "") -> Di logger.error(f"❌ Amass failed for {domain}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Hashcat Crack", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def hashcat_crack(hash_file: str, hash_type: str, attack_mode: str = "0", wordlist: str = "/usr/share/wordlists/rockyou.txt", mask: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute Hashcat for advanced password cracking with enhanced logging. @@ -1342,7 +1581,14 @@ def hashcat_crack(hash_file: str, hash_type: str, attack_mode: str = "0", wordli logger.error(f"❌ Hashcat attack failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Subfinder Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def subfinder_scan(domain: str, silent: bool = True, all_sources: bool = False, additional_args: str = "") -> Dict[str, Any]: """ Execute Subfinder for passive subdomain enumeration with enhanced logging. @@ -1370,7 +1616,14 @@ def subfinder_scan(domain: str, silent: bool = True, all_sources: bool = False, logger.error(f"❌ Subfinder failed for {domain}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Smbmap Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def smbmap_scan(target: str, username: str = "", password: str = "", domain: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute SMBMap for SMB share enumeration with enhanced logging. @@ -1404,7 +1657,14 @@ def smbmap_scan(target: str, username: str = "", password: str = "", domain: str # ENHANCED NETWORK PENETRATION TESTING TOOLS (v6.0) # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Rustscan Fast Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def rustscan_fast_scan(target: str, ports: str = "", ulimit: int = 5000, batch_size: int = 4500, timeout: int = 1500, scripts: bool = False, additional_args: str = "") -> Dict[str, Any]: @@ -1440,7 +1700,14 @@ def rustscan_fast_scan(target: str, ports: str = "", ulimit: int = 5000, logger.error(f"❌ Rustscan failed for {target}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Masscan High Speed", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def masscan_high_speed(target: str, ports: str = "1-65535", rate: int = 1000, interface: str = "", router_mac: str = "", source_ip: str = "", banners: bool = False, additional_args: str = "") -> Dict[str, Any]: @@ -1478,7 +1745,14 @@ def masscan_high_speed(target: str, ports: str = "1-65535", rate: int = 1000, logger.error(f"❌ Masscan failed for {target}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Nmap Advanced Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def nmap_advanced_scan(target: str, scan_type: str = "-sS", ports: str = "", timing: str = "T4", nse_scripts: str = "", os_detection: bool = False, version_detection: bool = False, aggressive: bool = False, @@ -1521,7 +1795,14 @@ def nmap_advanced_scan(target: str, scan_type: str = "-sS", ports: str = "", logger.error(f"❌ Advanced Nmap failed for {target}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Autorecon Comprehensive", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def autorecon_comprehensive(target: str, output_dir: str = "/tmp/autorecon", port_scans: str = "top-100-ports", service_scans: str = "default", heartbeat: int = 60, timeout: int = 300, @@ -1558,7 +1839,14 @@ def autorecon_comprehensive(target: str, output_dir: str = "/tmp/autorecon", logger.error(f"❌ AutoRecon failed for {target}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Enum4linux NG Advanced", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def enum4linux_ng_advanced(target: str, username: str = "", password: str = "", domain: str = "", shares: bool = True, users: bool = True, groups: bool = True, policy: bool = True, @@ -1599,7 +1887,14 @@ def enum4linux_ng_advanced(target: str, username: str = "", password: str = "", logger.error(f"❌ Enum4linux-ng failed for {target}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Rpcclient Enumeration", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def rpcclient_enumeration(target: str, username: str = "", password: str = "", domain: str = "", commands: str = "enumdomusers;enumdomgroups;querydominfo", additional_args: str = "") -> Dict[str, Any]: @@ -1633,7 +1928,14 @@ def rpcclient_enumeration(target: str, username: str = "", password: str = "", logger.error(f"❌ rpcclient failed for {target}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Nbtscan Netbios", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def nbtscan_netbios(target: str, verbose: bool = False, timeout: int = 2, additional_args: str = "") -> Dict[str, Any]: """ @@ -1662,7 +1964,14 @@ def nbtscan_netbios(target: str, verbose: bool = False, timeout: int = 2, logger.error(f"❌ nbtscan failed for {target}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="ARP Scan Discovery", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def arp_scan_discovery(target: str = "", interface: str = "", local_network: bool = False, timeout: int = 500, retry: int = 3, additional_args: str = "") -> Dict[str, Any]: """ @@ -1695,7 +2004,14 @@ def arp_scan_discovery(target: str = "", interface: str = "", local_network: boo logger.error(f"❌ arp-scan failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Responder Credential Harvest", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def responder_credential_harvest(interface: str = "eth0", analyze: bool = False, wpad: bool = True, force_wpad_auth: bool = False, fingerprint: bool = False, duration: int = 300, @@ -1732,7 +2048,14 @@ def responder_credential_harvest(interface: str = "eth0", analyze: bool = False, logger.error(f"❌ Responder failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Volatility Analyze", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def volatility_analyze(memory_file: str, plugin: str, profile: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute Volatility for memory forensics analysis with enhanced logging. @@ -1760,7 +2083,14 @@ def volatility_analyze(memory_file: str, plugin: str, profile: str = "", additio logger.error(f"❌ Volatility analysis failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Msfvenom Generate", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def msfvenom_generate(payload: str, format_type: str = "", output_file: str = "", encoder: str = "", iterations: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute MSFVenom for payload generation with enhanced logging. @@ -1796,7 +2126,14 @@ def msfvenom_generate(payload: str, format_type: str = "", output_file: str = "" # BINARY ANALYSIS & REVERSE ENGINEERING TOOLS # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="GDB Analyze", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def gdb_analyze(binary: str, commands: str = "", script_file: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute GDB for binary analysis and debugging with enhanced logging. @@ -1824,7 +2161,14 @@ def gdb_analyze(binary: str, commands: str = "", script_file: str = "", addition logger.error(f"❌ GDB analysis failed for {binary}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Radare2 Analyze", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def radare2_analyze(binary: str, commands: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute Radare2 for binary analysis and reverse engineering with enhanced logging. @@ -1850,7 +2194,14 @@ def radare2_analyze(binary: str, commands: str = "", additional_args: str = "") logger.error(f"❌ Radare2 analysis failed for {binary}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Binwalk Analyze", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def binwalk_analyze(file_path: str, extract: bool = False, additional_args: str = "") -> Dict[str, Any]: """ Execute Binwalk for firmware and file analysis with enhanced logging. @@ -1876,7 +2227,14 @@ def binwalk_analyze(file_path: str, extract: bool = False, additional_args: str logger.error(f"❌ Binwalk analysis failed for {file_path}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Ropgadget Search", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def ropgadget_search(binary: str, gadget_type: str = "", additional_args: str = "") -> Dict[str, Any]: """ Search for ROP gadgets in a binary using ROPgadget with enhanced logging. @@ -1902,7 +2260,14 @@ def ropgadget_search(binary: str, gadget_type: str = "", additional_args: str = logger.error(f"❌ ROPgadget search failed for {binary}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Checksec Analyze", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def checksec_analyze(binary: str) -> Dict[str, Any]: """ Check security features of a binary with enhanced logging. @@ -1924,7 +2289,14 @@ def checksec_analyze(binary: str) -> Dict[str, Any]: logger.error(f"❌ Checksec analysis failed for {binary}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="XXD Hexdump", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def xxd_hexdump(file_path: str, offset: str = "0", length: str = "", additional_args: str = "") -> Dict[str, Any]: """ Create a hex dump of a file using xxd with enhanced logging. @@ -1952,7 +2324,14 @@ def xxd_hexdump(file_path: str, offset: str = "0", length: str = "", additional_ logger.error(f"❌ XXD hex dump failed for {file_path}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Strings Extract", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def strings_extract(file_path: str, min_len: int = 4, additional_args: str = "") -> Dict[str, Any]: """ Extract strings from a binary file with enhanced logging. @@ -1978,7 +2357,14 @@ def strings_extract(file_path: str, min_len: int = 4, additional_args: str = "") logger.error(f"❌ Strings extraction failed for {file_path}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Objdump Analyze", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def objdump_analyze(binary: str, disassemble: bool = True, additional_args: str = "") -> Dict[str, Any]: """ Analyze a binary using objdump with enhanced logging. @@ -2008,7 +2394,14 @@ def objdump_analyze(binary: str, disassemble: bool = True, additional_args: str # ENHANCED BINARY ANALYSIS AND EXPLOITATION FRAMEWORK (v6.0) # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Ghidra Analysis", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def ghidra_analysis(binary: str, project_name: str = "hexstrike_analysis", script_file: str = "", analysis_timeout: int = 300, output_format: str = "xml", additional_args: str = "") -> Dict[str, Any]: @@ -2042,7 +2435,14 @@ def ghidra_analysis(binary: str, project_name: str = "hexstrike_analysis", logger.error(f"❌ Ghidra analysis failed for {binary}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Pwntools Exploit", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def pwntools_exploit(script_content: str = "", target_binary: str = "", target_host: str = "", target_port: int = 0, exploit_type: str = "local", additional_args: str = "") -> Dict[str, Any]: @@ -2076,7 +2476,14 @@ def pwntools_exploit(script_content: str = "", target_binary: str = "", logger.error(f"❌ Pwntools exploit failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="One Gadget Search", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def one_gadget_search(libc_path: str, level: int = 1, additional_args: str = "") -> Dict[str, Any]: """ Execute one_gadget to find one-shot RCE gadgets in libc. @@ -2102,7 +2509,14 @@ def one_gadget_search(libc_path: str, level: int = 1, additional_args: str = "") logger.error(f"❌ one_gadget analysis failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Libc Database Lookup", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def libc_database_lookup(action: str = "find", symbols: str = "", libc_id: str = "", additional_args: str = "") -> Dict[str, Any]: """ @@ -2131,7 +2545,14 @@ def libc_database_lookup(action: str = "find", symbols: str = "", logger.error(f"❌ libc-database {action} failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="GDB Peda Debug", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def gdb_peda_debug(binary: str = "", commands: str = "", attach_pid: int = 0, core_file: str = "", additional_args: str = "") -> Dict[str, Any]: """ @@ -2162,7 +2583,14 @@ def gdb_peda_debug(binary: str = "", commands: str = "", attach_pid: int = 0, logger.error(f"❌ GDB-PEDA analysis failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Angr Symbolic Execution", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def angr_symbolic_execution(binary: str, script_content: str = "", find_address: str = "", avoid_addresses: str = "", analysis_type: str = "symbolic", additional_args: str = "") -> Dict[str, Any]: @@ -2196,7 +2624,14 @@ def angr_symbolic_execution(binary: str, script_content: str = "", logger.error(f"❌ angr analysis failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Ropper Gadget Search", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def ropper_gadget_search(binary: str, gadget_type: str = "rop", quality: int = 1, arch: str = "", search_string: str = "", additional_args: str = "") -> Dict[str, Any]: @@ -2230,7 +2665,14 @@ def ropper_gadget_search(binary: str, gadget_type: str = "rop", quality: int = 1 logger.error(f"❌ ropper analysis failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Pwninit Setup", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def pwninit_setup(binary: str, libc: str = "", ld: str = "", template_type: str = "python", additional_args: str = "") -> Dict[str, Any]: """ @@ -2261,7 +2703,14 @@ def pwninit_setup(binary: str, libc: str = "", ld: str = "", logger.error(f"❌ pwninit setup failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Feroxbuster Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def feroxbuster_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", threads: int = 10, additional_args: str = "") -> Dict[str, Any]: """ Execute Feroxbuster for recursive content discovery with enhanced logging. @@ -2289,7 +2738,14 @@ def feroxbuster_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common logger.error(f"❌ Feroxbuster scan failed for {url}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Dotdotpwn Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def dotdotpwn_scan(target: str, module: str = "http", additional_args: str = "") -> Dict[str, Any]: """ Execute DotDotPwn for directory traversal testing with enhanced logging. @@ -2315,7 +2771,14 @@ def dotdotpwn_scan(target: str, module: str = "http", additional_args: str = "") logger.error(f"❌ DotDotPwn scan failed for {target}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Xsser Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def xsser_scan(url: str, params: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute XSSer for XSS vulnerability testing with enhanced logging. @@ -2341,7 +2804,14 @@ def xsser_scan(url: str, params: str = "", additional_args: str = "") -> Dict[st logger.error(f"❌ XSSer scan failed for {url}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Wfuzz Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def wfuzz_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", additional_args: str = "") -> Dict[str, Any]: """ Execute Wfuzz for web application fuzzing with enhanced logging. @@ -2371,7 +2841,14 @@ def wfuzz_scan(url: str, wordlist: str = "/usr/share/wordlists/dirb/common.txt", # ENHANCED WEB APPLICATION SECURITY TOOLS (v6.0) # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Dirsearch Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def dirsearch_scan(url: str, extensions: str = "php,html,js,txt,xml,json", wordlist: str = "/usr/share/wordlists/dirsearch/common.txt", threads: int = 30, recursive: bool = False, additional_args: str = "") -> Dict[str, Any]: @@ -2405,7 +2882,14 @@ def dirsearch_scan(url: str, extensions: str = "php,html,js,txt,xml,json", logger.error(f"❌ Dirsearch scan failed for {url}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Katana Crawl", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def katana_crawl(url: str, depth: int = 3, js_crawl: bool = True, form_extraction: bool = True, output_format: str = "json", additional_args: str = "") -> Dict[str, Any]: @@ -2439,7 +2923,14 @@ def katana_crawl(url: str, depth: int = 3, js_crawl: bool = True, logger.error(f"❌ Katana crawl failed for {url}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Gau Discovery", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def gau_discovery(domain: str, providers: str = "wayback,commoncrawl,otx,urlscan", include_subs: bool = True, blacklist: str = "png,jpg,gif,jpeg,swf,woff,svg,pdf,css,ico", additional_args: str = "") -> Dict[str, Any]: @@ -2471,7 +2962,14 @@ def gau_discovery(domain: str, providers: str = "wayback,commoncrawl,otx,urlscan logger.error(f"❌ Gau URL discovery failed for {domain}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Waybackurls Discovery", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def waybackurls_discovery(domain: str, get_versions: bool = False, no_subs: bool = False, additional_args: str = "") -> Dict[str, Any]: """ @@ -2500,7 +2998,14 @@ def waybackurls_discovery(domain: str, get_versions: bool = False, logger.error(f"❌ Waybackurls discovery failed for {domain}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Arjun Parameter Discovery", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def arjun_parameter_discovery(url: str, method: str = "GET", wordlist: str = "", delay: int = 0, threads: int = 25, stable: bool = False, additional_args: str = "") -> Dict[str, Any]: @@ -2536,7 +3041,14 @@ def arjun_parameter_discovery(url: str, method: str = "GET", wordlist: str = "", logger.error(f"❌ Arjun parameter discovery failed for {url}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Paramspider Mining", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def paramspider_mining(domain: str, level: int = 2, exclude: str = "png,jpg,gif,jpeg,swf,woff,svg,pdf,css,ico", output: str = "", additional_args: str = "") -> Dict[str, Any]: @@ -2568,7 +3080,14 @@ def paramspider_mining(domain: str, level: int = 2, logger.error(f"❌ ParamSpider mining failed for {domain}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="X8 Parameter Discovery", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def x8_parameter_discovery(url: str, wordlist: str = "/usr/share/wordlists/x8/params.txt", method: str = "GET", body: str = "", headers: str = "", additional_args: str = "") -> Dict[str, Any]: @@ -2602,7 +3121,14 @@ def x8_parameter_discovery(url: str, wordlist: str = "/usr/share/wordlists/x8/pa logger.error(f"❌ x8 parameter discovery failed for {url}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Jaeles Vulnerability Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def jaeles_vulnerability_scan(url: str, signatures: str = "", config: str = "", threads: int = 20, timeout: int = 20, additional_args: str = "") -> Dict[str, Any]: @@ -2636,7 +3162,14 @@ def jaeles_vulnerability_scan(url: str, signatures: str = "", config: str = "", logger.error(f"❌ Jaeles vulnerability scan failed for {url}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Dalfox XSS Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def dalfox_xss_scan(url: str, pipe_mode: bool = False, blind: bool = False, mining_dom: bool = True, mining_dict: bool = True, custom_payload: str = "", additional_args: str = "") -> Dict[str, Any]: @@ -2672,7 +3205,14 @@ def dalfox_xss_scan(url: str, pipe_mode: bool = False, blind: bool = False, logger.error(f"❌ Dalfox XSS scan failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Httpx Probe", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def httpx_probe(target: str, probe: bool = True, tech_detect: bool = False, status_code: bool = False, content_length: bool = False, title: bool = False, web_server: bool = False, threads: int = 50, @@ -2713,7 +3253,14 @@ def httpx_probe(target: str, probe: bool = True, tech_detect: bool = False, logger.error(f"❌ httpx probe failed for {target}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Anew Data Processing", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def anew_data_processing(input_data: str, output_file: str = "", additional_args: str = "") -> Dict[str, Any]: """ @@ -2740,7 +3287,14 @@ def anew_data_processing(input_data: str, output_file: str = "", logger.error("❌ anew data processing failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Qsreplace Parameter Replacement", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def qsreplace_parameter_replacement(urls: str, replacement: str = "FUZZ", additional_args: str = "") -> Dict[str, Any]: """ @@ -2767,7 +3321,14 @@ def qsreplace_parameter_replacement(urls: str, replacement: str = "FUZZ", logger.error("❌ qsreplace parameter replacement failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Uro URL Filtering", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def uro_url_filtering(urls: str, whitelist: str = "", blacklist: str = "", additional_args: str = "") -> Dict[str, Any]: """ @@ -2800,7 +3361,14 @@ def uro_url_filtering(urls: str, whitelist: str = "", blacklist: str = "", # AI-POWERED PAYLOAD GENERATION (v5.0 ENHANCEMENT) # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="AI Generate Payload", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def ai_generate_payload(attack_type: str, complexity: str = "basic", technology: str = "", url: str = "") -> Dict[str, Any]: """ Generate AI-powered contextual payloads for security testing. @@ -2841,7 +3409,14 @@ def ai_generate_payload(attack_type: str, complexity: str = "basic", technology: return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="AI Test Payload", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def ai_test_payload(payload: str, target_url: str, method: str = "GET") -> Dict[str, Any]: """ Test generated payload against target with AI analysis. @@ -2876,7 +3451,14 @@ def ai_test_payload(payload: str, target_url: str, method: str = "GET") -> Dict[ return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="AI Generate Attack Suite", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def ai_generate_attack_suite(target_url: str, attack_types: str = "xss,sqli,lfi") -> Dict[str, Any]: """ Generate comprehensive attack suite with multiple payload types. @@ -2937,7 +3519,14 @@ def ai_generate_attack_suite(target_url: str, attack_types: str = "xss,sqli,lfi" # ADVANCED API TESTING TOOLS (v5.0 ENHANCEMENT) # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="API Fuzzer", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def api_fuzzer(base_url: str, endpoints: str = "", methods: str = "GET,POST,PUT,DELETE", wordlist: str = "/usr/share/wordlists/api/api-endpoints.txt") -> Dict[str, Any]: """ Advanced API endpoint fuzzing with intelligent parameter discovery. @@ -2973,7 +3562,14 @@ def api_fuzzer(base_url: str, endpoints: str = "", methods: str = "GET,POST,PUT, return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Graphql Scanner", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def graphql_scanner(endpoint: str, introspection: bool = True, query_depth: int = 10, test_mutations: bool = True) -> Dict[str, Any]: """ Advanced GraphQL security scanning and introspection. @@ -3015,7 +3611,14 @@ def graphql_scanner(endpoint: str, introspection: bool = True, query_depth: int return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="JWT Analyzer", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def jwt_analyzer(jwt_token: str, target_url: str = "") -> Dict[str, Any]: """ Advanced JWT token analysis and vulnerability testing. @@ -3054,7 +3657,14 @@ def jwt_analyzer(jwt_token: str, target_url: str = "") -> Dict[str, Any]: return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="API Schema Analyzer", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def api_schema_analyzer(schema_url: str, schema_type: str = "openapi") -> Dict[str, Any]: """ Analyze API schemas and identify potential security issues. @@ -3099,7 +3709,14 @@ def api_schema_analyzer(schema_url: str, schema_type: str = "openapi") -> Dict[s return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Comprehensive API Audit", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def comprehensive_api_audit(base_url: str, schema_url: str = "", jwt_token: str = "", graphql_endpoint: str = "") -> Dict[str, Any]: """ Comprehensive API security audit combining multiple testing techniques. @@ -3197,7 +3814,14 @@ def comprehensive_api_audit(base_url: str, schema_url: str = "", jwt_token: str # ADVANCED CTF TOOLS (v5.0 ENHANCEMENT) # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Volatility3 Analyze", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def volatility3_analyze(memory_file: str, plugin: str, output_file: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute Volatility3 for advanced memory forensics with enhanced logging. @@ -3225,7 +3849,14 @@ def volatility3_analyze(memory_file: str, plugin: str, output_file: str = "", ad logger.error(f"❌ Volatility3 analysis failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Foremost Carving", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def foremost_carving(input_file: str, output_dir: str = "/tmp/foremost_output", file_types: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute Foremost for file carving with enhanced logging. @@ -3253,7 +3884,14 @@ def foremost_carving(input_file: str, output_dir: str = "/tmp/foremost_output", logger.error(f"❌ Foremost carving failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Steghide Analysis", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def steghide_analysis(action: str, cover_file: str, embed_file: str = "", passphrase: str = "", output_file: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute Steghide for steganography analysis with enhanced logging. @@ -3285,7 +3923,14 @@ def steghide_analysis(action: str, cover_file: str, embed_file: str = "", passph logger.error(f"❌ Steghide {action} failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Exiftool Extract", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def exiftool_extract(file_path: str, output_format: str = "", tags: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute ExifTool for metadata extraction with enhanced logging. @@ -3313,7 +3958,14 @@ def exiftool_extract(file_path: str, output_format: str = "", tags: str = "", ad logger.error(f"❌ ExifTool analysis failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Hashpump Attack", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def hashpump_attack(signature: str, data: str, key_length: str, append_data: str, additional_args: str = "") -> Dict[str, Any]: """ Execute HashPump for hash length extension attacks with enhanced logging. @@ -3347,7 +3999,14 @@ def hashpump_attack(signature: str, data: str, key_length: str, append_data: str # BUG BOUNTY RECONNAISSANCE TOOLS (v5.0 ENHANCEMENT) # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Hakrawler Crawl", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def hakrawler_crawl(url: str, depth: int = 2, forms: bool = True, robots: bool = True, sitemap: bool = True, wayback: bool = False, additional_args: str = "") -> Dict[str, Any]: """ Execute Hakrawler for web endpoint discovery with enhanced logging. @@ -3388,7 +4047,14 @@ def hakrawler_crawl(url: str, depth: int = 2, forms: bool = True, robots: bool = logger.error(f"❌ Hakrawler crawling failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Httpx Probe", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def httpx_probe(targets: str = "", target_file: str = "", ports: str = "", methods: str = "GET", status_code: str = "", content_length: bool = False, output_file: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute HTTPx for HTTP probing with enhanced logging. @@ -3424,7 +4090,14 @@ def httpx_probe(targets: str = "", target_file: str = "", ports: str = "", metho logger.error(f"❌ HTTPx probing failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Paramspider Discovery", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def paramspider_discovery(domain: str, exclude: str = "", output_file: str = "", level: int = 2, additional_args: str = "") -> Dict[str, Any]: """ Execute ParamSpider for parameter discovery with enhanced logging. @@ -3458,7 +4131,14 @@ def paramspider_discovery(domain: str, exclude: str = "", output_file: str = "", # ADVANCED WEB SECURITY TOOLS CONTINUED # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Burpsuite Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def burpsuite_scan(project_file: str = "", config_file: str = "", target: str = "", headless: bool = False, scan_type: str = "", scan_config: str = "", output_file: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute Burp Suite with enhanced logging. @@ -3494,7 +4174,14 @@ def burpsuite_scan(project_file: str = "", config_file: str = "", target: str = logger.error(f"❌ Burp Suite scan failed") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="ZAP Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def zap_scan(target: str = "", scan_type: str = "baseline", api_key: str = "", daemon: bool = False, port: str = "8090", host: str = "0.0.0.0", format_type: str = "xml", output_file: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute OWASP ZAP with enhanced logging. @@ -3532,7 +4219,14 @@ def zap_scan(target: str = "", scan_type: str = "baseline", api_key: str = "", d logger.error(f"❌ ZAP scan failed for {target}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Arjun Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def arjun_scan(url: str, method: str = "GET", data: str = "", headers: str = "", timeout: str = "", output_file: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute Arjun for parameter discovery with enhanced logging. @@ -3566,7 +4260,14 @@ def arjun_scan(url: str, method: str = "GET", data: str = "", headers: str = "", logger.error(f"❌ Arjun failed for {url}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Wafw00f Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def wafw00f_scan(target: str, additional_args: str = "") -> Dict[str, Any]: """ Execute wafw00f to identify and fingerprint WAF products with enhanced logging. @@ -3590,7 +4291,14 @@ def wafw00f_scan(target: str, additional_args: str = "") -> Dict[str, Any]: logger.error(f"❌ Wafw00f failed for {target}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Fierce Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def fierce_scan(domain: str, dns_server: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute fierce for DNS reconnaissance with enhanced logging. @@ -3616,7 +4324,14 @@ def fierce_scan(domain: str, dns_server: str = "", additional_args: str = "") -> logger.error(f"❌ Fierce failed for {domain}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Dnsenum Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def dnsenum_scan(domain: str, dns_server: str = "", wordlist: str = "", additional_args: str = "") -> Dict[str, Any]: """ Execute dnsenum for DNS enumeration with enhanced logging. @@ -3644,7 +4359,14 @@ def dnsenum_scan(domain: str, dns_server: str = "", wordlist: str = "", addition logger.error(f"❌ DNSenum failed for {domain}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Autorecon Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def autorecon_scan( target: str = "", target_file: str = "", @@ -3786,7 +4508,14 @@ def autorecon_scan( # SYSTEM MONITORING & TELEMETRY # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Server Health", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def server_health() -> Dict[str, Any]: """ Check the health status of the HexStrike AI server. @@ -3802,7 +4531,14 @@ def server_health() -> Dict[str, Any]: logger.warning(f"⚠️ Server health check returned: {result.get('status', 'unknown')}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Get Cache Stats", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def get_cache_stats() -> Dict[str, Any]: """ Get cache statistics from the HexStrike AI server. @@ -3816,7 +4552,14 @@ def get_cache_stats() -> Dict[str, Any]: logger.info(f"📊 Cache hit rate: {result.get('hit_rate', 'unknown')}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Clear Cache", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def clear_cache() -> Dict[str, Any]: """ Clear the cache on the HexStrike AI server. @@ -3832,7 +4575,14 @@ def clear_cache() -> Dict[str, Any]: logger.error(f"❌ Failed to clear cache") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Get Telemetry", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def get_telemetry() -> Dict[str, Any]: """ Get system telemetry from the HexStrike AI server. @@ -3850,7 +4600,14 @@ def get_telemetry() -> Dict[str, Any]: # PROCESS MANAGEMENT TOOLS (v5.0 ENHANCEMENT) # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="List Active Processes", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def list_active_processes() -> Dict[str, Any]: """ List all active processes on the HexStrike AI server. @@ -3866,7 +4623,14 @@ def list_active_processes() -> Dict[str, Any]: logger.error("❌ Failed to list processes") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Get Process Status", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def get_process_status(pid: int) -> Dict[str, Any]: """ Get the status of a specific process. @@ -3885,7 +4649,14 @@ def get_process_status(pid: int) -> Dict[str, Any]: logger.error(f"❌ Process {pid} not found or error occurred") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Terminate Process", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def terminate_process(pid: int) -> Dict[str, Any]: """ Terminate a specific running process. @@ -3904,7 +4675,14 @@ def terminate_process(pid: int) -> Dict[str, Any]: logger.error(f"❌ Failed to terminate process {pid}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Pause Process", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def pause_process(pid: int) -> Dict[str, Any]: """ Pause a specific running process. @@ -3923,7 +4701,14 @@ def pause_process(pid: int) -> Dict[str, Any]: logger.error(f"❌ Failed to pause process {pid}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Resume Process", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def resume_process(pid: int) -> Dict[str, Any]: """ Resume a paused process. @@ -3942,7 +4727,14 @@ def resume_process(pid: int) -> Dict[str, Any]: logger.error(f"❌ Failed to resume process {pid}") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Get Process Dashboard", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def get_process_dashboard() -> Dict[str, Any]: """ Get enhanced process dashboard with visual status indicators. @@ -3965,7 +4757,14 @@ def get_process_dashboard() -> Dict[str, Any]: logger.error("❌ Failed to get process dashboard") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Execute Command", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def execute_command(command: str, use_cache: bool = True) -> Dict[str, Any]: """ Execute an arbitrary command on the HexStrike AI server with enhanced logging. @@ -4009,7 +4808,14 @@ def execute_command(command: str, use_cache: bool = True) -> Dict[str, Any]: # ADVANCED VULNERABILITY INTELLIGENCE MCP TOOLS (v6.0 ENHANCEMENT) # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Monitor CVE Feeds", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def monitor_cve_feeds(hours: int = 24, severity_filter: str = "HIGH,CRITICAL", keywords: str = "") -> Dict[str, Any]: """ Monitor CVE databases for new vulnerabilities with AI analysis. @@ -4040,7 +4846,14 @@ def monitor_cve_feeds(hours: int = 24, severity_filter: str = "HIGH,CRITICAL", k return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Generate Exploit From CVE", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def generate_exploit_from_cve(cve_id: str, target_os: str = "", target_arch: str = "x64", exploit_type: str = "poc", evasion_level: str = "none") -> Dict[str, Any]: """ Generate working exploits from CVE information using AI-powered analysis. @@ -4079,7 +4892,14 @@ def generate_exploit_from_cve(cve_id: str, target_os: str = "", target_arch: str return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Discover Attack Chains", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def discover_attack_chains(target_software: str, attack_depth: int = 3, include_zero_days: bool = False) -> Dict[str, Any]: """ Discover multi-stage attack chains for target software with vulnerability correlation. @@ -4113,7 +4933,14 @@ def discover_attack_chains(target_software: str, attack_depth: int = 3, include_ return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Research Zero Day Opportunities", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def research_zero_day_opportunities(target_software: str, analysis_depth: str = "standard", source_code_url: str = "") -> Dict[str, Any]: """ Automated zero-day vulnerability research using AI analysis and pattern recognition. @@ -4150,7 +4977,14 @@ def research_zero_day_opportunities(target_software: str, analysis_depth: str = return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Correlate Threat Intelligence", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def correlate_threat_intelligence(indicators: str, timeframe: str = "30d", sources: str = "all") -> Dict[str, Any]: """ Correlate threat intelligence across multiple sources with advanced analysis. @@ -4196,7 +5030,14 @@ def correlate_threat_intelligence(indicators: str, timeframe: str = "30d", sourc return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Advanced Payload Generation", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def advanced_payload_generation(attack_type: str, target_context: str = "", evasion_level: str = "standard", custom_constraints: str = "") -> Dict[str, Any]: """ Generate advanced payloads with AI-powered evasion techniques and contextual adaptation. @@ -4244,7 +5085,14 @@ def advanced_payload_generation(attack_type: str, target_context: str = "", evas return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Vulnerability Intelligence Dashboard", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def vulnerability_intelligence_dashboard() -> Dict[str, Any]: """ Get a comprehensive vulnerability intelligence dashboard with latest threats and trends. @@ -4298,7 +5146,14 @@ def vulnerability_intelligence_dashboard() -> Dict[str, Any]: "dashboard": dashboard } - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Threat Hunting Assistant", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def threat_hunting_assistant(target_environment: str, threat_indicators: str = "", hunt_focus: str = "general") -> Dict[str, Any]: """ AI-powered threat hunting assistant with vulnerability correlation and attack simulation. @@ -4408,7 +5263,14 @@ def threat_hunting_assistant(target_environment: str, threat_indicators: str = " # ENHANCED VISUAL OUTPUT TOOLS # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Get Live Dashboard", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def get_live_dashboard() -> Dict[str, Any]: """ Get a beautiful live dashboard showing all active processes with enhanced visual formatting. @@ -4424,7 +5286,14 @@ def get_live_dashboard() -> Dict[str, Any]: logger.error("❌ Failed to retrieve live dashboard") return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Create Vulnerability Report", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def create_vulnerability_report(vulnerabilities: str, target: str = "", scan_type: str = "comprehensive") -> Dict[str, Any]: """ Create a beautiful vulnerability report with severity-based styling and visual indicators. @@ -4478,7 +5347,14 @@ def create_vulnerability_report(vulnerabilities: str, target: str = "", scan_typ logger.error(f"❌ Failed to create vulnerability report: {str(e)}") return {"success": False, "error": str(e)} - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Format Tool Output Visual", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def format_tool_output_visual(tool_name: str, output: str, success: bool = True) -> Dict[str, Any]: """ Format tool output with beautiful visual styling, syntax highlighting, and structure. @@ -4507,7 +5383,14 @@ def format_tool_output_visual(tool_name: str, output: str, success: bool = True) return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Create Scan Summary", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def create_scan_summary(target: str, tools_used: str, vulnerabilities_found: int = 0, execution_time: float = 0.0, findings: str = "") -> Dict[str, Any]: """ @@ -4543,7 +5426,14 @@ def create_scan_summary(target: str, tools_used: str, vulnerabilities_found: int return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Display System Metrics", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def display_system_metrics() -> Dict[str, Any]: """ Display current system metrics and performance indicators with visual formatting. @@ -4592,7 +5482,14 @@ def display_system_metrics() -> Dict[str, Any]: # INTELLIGENT DECISION ENGINE TOOLS # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Analyze Target Intelligence", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def analyze_target_intelligence(target: str) -> Dict[str, Any]: """ Analyze target using AI-powered intelligence to create comprehensive profile. @@ -4616,7 +5513,14 @@ def analyze_target_intelligence(target: str) -> Dict[str, Any]: return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Select Optimal Tools AI", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def select_optimal_tools_ai(target: str, objective: str = "comprehensive") -> Dict[str, Any]: """ Use AI to select optimal security tools based on target analysis and testing objective. @@ -4644,7 +5548,14 @@ def select_optimal_tools_ai(target: str, objective: str = "comprehensive") -> Di return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Optimize Tool Parameters AI", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def optimize_tool_parameters_ai(target: str, tool: str, context: str = "{}") -> Dict[str, Any]: """ Use AI to optimize tool parameters based on target profile and context. @@ -4681,7 +5592,14 @@ def optimize_tool_parameters_ai(target: str, tool: str, context: str = "{}") -> return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Create Attack Chain AI", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def create_attack_chain_ai(target: str, objective: str = "comprehensive") -> Dict[str, Any]: """ Create an intelligent attack chain using AI-driven tool sequencing and optimization. @@ -4713,7 +5631,14 @@ def create_attack_chain_ai(target: str, objective: str = "comprehensive") -> Dic return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Intelligent Smart Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def intelligent_smart_scan(target: str, objective: str = "comprehensive", max_tools: int = 5) -> Dict[str, Any]: """ Execute an intelligent scan using AI-driven tool selection and parameter optimization. @@ -4766,7 +5691,14 @@ def intelligent_smart_scan(target: str, objective: str = "comprehensive", max_to return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Detect Technologies AI", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def detect_technologies_ai(target: str) -> Dict[str, Any]: """ Use AI to detect technologies and provide technology-specific testing recommendations. @@ -4798,7 +5730,14 @@ def detect_technologies_ai(target: str) -> Dict[str, Any]: return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="AI Reconnaissance Workflow", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def ai_reconnaissance_workflow(target: str, depth: str = "standard") -> Dict[str, Any]: """ Execute AI-driven reconnaissance workflow with intelligent tool chaining. @@ -4847,7 +5786,14 @@ def ai_reconnaissance_workflow(target: str, depth: str = "standard") -> Dict[str "timestamp": datetime.now().isoformat() } - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="AI Vulnerability Assessment", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def ai_vulnerability_assessment(target: str, focus_areas: str = "all") -> Dict[str, Any]: """ Perform AI-driven vulnerability assessment with intelligent prioritization. @@ -4907,7 +5853,14 @@ def ai_vulnerability_assessment(target: str, focus_areas: str = "all") -> Dict[s # BUG BOUNTY HUNTING SPECIALIZED WORKFLOWS # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Bugbounty Reconnaissance Workflow", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def bugbounty_reconnaissance_workflow(domain: str, scope: str = "", out_of_scope: str = "", program_type: str = "web") -> Dict[str, Any]: """ @@ -4940,7 +5893,14 @@ def bugbounty_reconnaissance_workflow(domain: str, scope: str = "", out_of_scope return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Bugbounty Vulnerability Hunting", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def bugbounty_vulnerability_hunting(domain: str, priority_vulns: str = "rce,sqli,xss,idor,ssrf", bounty_range: str = "unknown") -> Dict[str, Any]: """ @@ -4971,7 +5931,14 @@ def bugbounty_vulnerability_hunting(domain: str, priority_vulns: str = "rce,sqli return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Bugbounty Business Logic Testing", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def bugbounty_business_logic_testing(domain: str, program_type: str = "web") -> Dict[str, Any]: """ Create business logic testing workflow for advanced bug bounty hunting. @@ -5000,7 +5967,14 @@ def bugbounty_business_logic_testing(domain: str, program_type: str = "web") -> return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Bugbounty OSINT Gathering", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def bugbounty_osint_gathering(domain: str) -> Dict[str, Any]: """ Create OSINT (Open Source Intelligence) gathering workflow for bug bounty reconnaissance. @@ -5025,7 +5999,14 @@ def bugbounty_osint_gathering(domain: str) -> Dict[str, Any]: return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Bugbounty File Upload Testing", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def bugbounty_file_upload_testing(target_url: str) -> Dict[str, Any]: """ Create file upload vulnerability testing workflow with bypass techniques. @@ -5050,7 +6031,14 @@ def bugbounty_file_upload_testing(target_url: str) -> Dict[str, Any]: return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Bugbounty Comprehensive Assessment", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def bugbounty_comprehensive_assessment(domain: str, scope: str = "", priority_vulns: str = "rce,sqli,xss,idor,ssrf", include_osint: bool = True, @@ -5088,7 +6076,14 @@ def bugbounty_comprehensive_assessment(domain: str, scope: str = "", return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Bugbounty Authentication Bypass Testing", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def bugbounty_authentication_bypass_testing(target_url: str, auth_type: str = "form") -> Dict[str, Any]: """ Create authentication bypass testing workflow for bug bounty hunting. @@ -5153,7 +6148,14 @@ def bugbounty_authentication_bypass_testing(target_url: str, auth_type: str = "f # ENHANCED HTTP TESTING FRAMEWORK & BROWSER AGENT (BURP SUITE ALTERNATIVE) # ============================================================================ - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="HTTP Framework Test", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def http_framework_test(url: str, method: str = "GET", data: dict = {}, headers: dict = {}, cookies: dict = {}, action: str = "request") -> Dict[str, Any]: """ @@ -5194,7 +6196,14 @@ def http_framework_test(url: str, method: str = "GET", data: dict = {}, return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Browser Agent Inspect", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def browser_agent_inspect(url: str, headless: bool = True, wait_time: int = 5, action: str = "navigate", proxy_port: int = None, active_tests: bool = False) -> Dict[str, Any]: """ @@ -5242,26 +6251,54 @@ def browser_agent_inspect(url: str, headless: bool = True, wait_time: int = 5, return result # ---------------- Additional HTTP Framework Tools (sync with server) ---------------- - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="HTTP Set Rules", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def http_set_rules(rules: list) -> Dict[str, Any]: """Set match/replace rules used to rewrite parts of URL/query/headers/body before sending. Rule format: {'where':'url|query|headers|body','pattern':'regex','replacement':'string'}""" payload = {"action": "set_rules", "rules": rules} return hexstrike_client.safe_post("api/tools/http-framework", payload) - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="HTTP Set Scope", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def http_set_scope(host: str, include_subdomains: bool = True) -> Dict[str, Any]: """Define in-scope host (and optionally subdomains) so out-of-scope requests are skipped.""" payload = {"action": "set_scope", "host": host, "include_subdomains": include_subdomains} return hexstrike_client.safe_post("api/tools/http-framework", payload) - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="HTTP Repeater", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def http_repeater(request_spec: dict) -> Dict[str, Any]: """Send a crafted request (Burp Repeater equivalent). request_spec keys: url, method, headers, cookies, data.""" payload = {"action": "repeater", "request": request_spec} return hexstrike_client.safe_post("api/tools/http-framework", payload) - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="HTTP Intruder", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def http_intruder(url: str, method: str = "GET", location: str = "query", params: list = None, payloads: list = None, base_data: dict = None, max_requests: int = 100) -> Dict[str, Any]: """Simple Intruder (sniper) fuzzing. Iterates payloads over each param individually. @@ -5278,7 +6315,14 @@ def http_intruder(url: str, method: str = "GET", location: str = "query", params } return hexstrike_client.safe_post("api/tools/http-framework", payload) - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Burpsuite Alternative Scan", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def burpsuite_alternative_scan(target: str, scan_type: str = "comprehensive", headless: bool = True, max_depth: int = 3, max_pages: int = 50) -> Dict[str, Any]: @@ -5339,7 +6383,14 @@ def burpsuite_alternative_scan(target: str, scan_type: str = "comprehensive", return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Error Handling Statistics", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def error_handling_statistics() -> Dict[str, Any]: """ Get intelligent error handling system statistics and recent error patterns. @@ -5370,7 +6421,14 @@ def error_handling_statistics() -> Dict[str, Any]: return result - @mcp.tool() + @mcp.tool( + annotations=ToolAnnotations( + title="Test Error Recovery", + readOnlyHint=False, + destructiveHint=True, + openWorldHint=True, + ), + ) def test_error_recovery(tool_name: str, error_type: str = "timeout", target: str = "example.com") -> Dict[str, Any]: """ diff --git a/requirements.txt b/requirements.txt index ff4cedea6..50fa30070 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,7 +12,7 @@ flask>=2.3.0,<4.0.0 # Web framework for API server (flask import) requests>=2.31.0,<3.0.0 # HTTP library (requests import) psutil>=5.9.0,<6.0.0 # System utilities (psutil import) -fastmcp>=0.2.0,<1.0.0 # MCP framework (from mcp.server.fastmcp import FastMCP) +fastmcp>=2.0.0 # MCP framework (from mcp.server.fastmcp import FastMCP) # ============================================================================ # WEB SCRAPING & AUTOMATION (ACTUALLY USED)