From ef8b9904c7e7993ce0d95709b42a4c79731b4da2 Mon Sep 17 00:00:00 2001 From: gnomee1337 <41423212+Gnomee1337@users.noreply.github.com> Date: Sat, 19 Oct 2024 07:30:11 +0300 Subject: [PATCH 1/4] refactored WAFAbuser and WAFUtility. TODO: subdomain_gathering --- modules/subdomain_gathering.py | 11 +- modules/utility.py | 142 +++++++++------- waf-abuser.py | 290 +++++++++++++++++---------------- 3 files changed, 237 insertions(+), 206 deletions(-) diff --git a/modules/subdomain_gathering.py b/modules/subdomain_gathering.py index c2ed58c..b410714 100644 --- a/modules/subdomain_gathering.py +++ b/modules/subdomain_gathering.py @@ -2,19 +2,13 @@ import json import logging import os -from itertools import chain - import aiohttp +from itertools import chain from bs4 import BeautifulSoup - from modules.utility import get_top_domains logger = logging.getLogger(__name__) -''' -CREATE EXCEPTION ON DNSDUMPSTER DAY LIMIT -''' - async def dnsdumpster_scraping(domain: str): dnsdumpster_output = [] @@ -115,7 +109,8 @@ async def hackertarget_scraping(domain: str): ) as resp: response_text = await resp.text(encoding='utf-8') if not response_text.find('API count exceeded'): - print('SKIP HackerTarget | Daily Limit Exceeded. (Possible bypass: new IP or use hackertarget.com API Key)') + print( + 'SKIP HackerTarget | Daily Limit Exceeded. (Possible bypass: new IP or use hackertarget.com API Key)') else: # Write TEXT-Response to file with open(os.path.normpath(os.path.join(os.path.realpath(__file__), diff --git a/modules/utility.py b/modules/utility.py index e0d6749..8e75360 100644 --- a/modules/utility.py +++ b/modules/utility.py @@ -2,77 +2,95 @@ import ipaddress import logging import os -from itertools import chain - +import aiofiles import aiohttp import tldextract from html_similarity import similarity +from itertools import chain -logger = logging.getLogger(__name__) -logging.basicConfig() - - -async def get_page_content(get_page: str): - async with aiohttp.ClientSession() as session: - try: - async with session.get(url=f"https://{get_page}", verify_ssl=False, timeout=3 - ) as get_resp: - page_response = await get_resp.text() - return page_response - except aiohttp.ClientConnectorError as cce: - logger.debug('Connection Error | ', str(cce)) - logger.info(f'Skipped | Error with {get_page}') - return 0 - except asyncio.TimeoutError as te: - return 0 +class WAFUtils: + def __init__(self): + self.logger = self._setup_logger() + self.custom_tldextract = self._initialize_tldextract() -# Compare two HTML pages -async def compare_two_pages(original_page: str, check_page: str): - async with aiohttp.ClientSession() as session: - try: - # async with session.get(url=f"https://{original_page}", verify_ssl=False, timeout=3 - # ) as original_resp: - # original_page_response = await original_resp.text() - async with session.get(url=f"http://{check_page}", verify_ssl=False, timeout=3 - ) as check_resp: - check_page_response = await check_resp.text() - # Compare original_page with check_page and return list[tuple(IP,Similarity_Percentage),...] - return (check_page, int(similarity(str(original_page), str(check_page_response), k=0.3) * 100)) - except aiohttp.ClientConnectorError as cce: - logger.debug('Connection Error | ', str(cce)) - logger.info(f'Skipped | Error with {check_page}') - return 0 - except asyncio.TimeoutError as te: - return 0 + @staticmethod + def _setup_logger(): + """Sets up the logger for the class.""" + logging.basicConfig() + return logging.getLogger(__name__) + @staticmethod + def _initialize_tldextract(): + """Initializes TLDExtract with a custom cache directory.""" + cache_dir = os.path.normpath(os.path.join(os.path.realpath(__file__), '../../cache/tldextract-cache')) + return tldextract.TLDExtract(cache_dir=cache_dir) -# Read all WAF Ranges from 'PublicWAFs.txt' -async def parse_public_waf_ranges(): - with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), '../data/PublicWAFs.txt'), 'r') as publicWAFs: - next(publicWAFs) - return [ip.strip() for ip in publicWAFs] + async def get_page_content(self, get_page: str) -> str: + """Fetches the content of a webpage.""" + url = f"https://{get_page}" + async with aiohttp.ClientSession() as session: + try: + async with session.get(url=url, ssl=False, timeout=aiohttp.ClientTimeout(total=3) + ) as response: + return await response.text() + except aiohttp.ClientConnectorError as e: + self.logger.debug(f"Connection Error with {get_page}: {e}") + self.logger.info(f"Skipped | Error with {get_page}") + return "" + except asyncio.TimeoutError: + self.logger.info(f"Timeout occurred for {get_page}") + return "" + except Exception as e: + self.logger.error(f"Unexpected error with {get_page}: {e}") + return "" + async def compare_two_pages(self, original_page: str, check_page: str): + """Compares two HTML pages and returns their similarity.""" + url = f"http://{check_page}" + async with aiohttp.ClientSession() as session: + try: + # Fetch the check_page content + async with session.get(url=url, ssl=False, timeout=aiohttp.ClientTimeout(total=3) + ) as check_resp: + check_page_response = await check_resp.text() + # Compare original_page with check_page and return list[tuple(IP,Similarity_Percentage),...] + similarity_percentage = int(similarity(str(original_page), str(check_page_response), k=0.3) * 100) + return (check_page, similarity_percentage) + except aiohttp.ClientConnectorError as e: + self.logger.debug(f"Connection Error with {check_page}: {e}") + self.logger.info(f"Skipped | Error with {check_page}") + return (check_page, 0) + except asyncio.TimeoutError: + self.logger.info(f"Timeout occurred for {check_page}") + return (check_page, 0) + except Exception as e: + self.logger.error(f"Unexpected error with {check_page}: {e}") + return (check_page, 0) -# Check every IP to filter out for WAF appearance -async def filter_out_waf_ips(ips_to_check: set): - waf_ips_with_cidr = await parse_public_waf_ranges() - clear_ips = set() - all_waf_ips = set(chain.from_iterable(ipaddress.ip_network(waf_ip) for waf_ip in waf_ips_with_cidr)) - for ip_to_check in ips_to_check: - if ipaddress.ip_address(ip_to_check) not in all_waf_ips: - clear_ips.add(ip_to_check) - return clear_ips + async def parse_public_waf_ranges(self) -> list[str]: + """Reads WAF ranges from the PublicWAFs.txt file.""" + file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../data/PublicWAFs.txt') + async with aiofiles.open(file_path, mode='r') as public_WAFs: + # Read the rest of the lines, strip, and return them as a list + return [line.strip() for line in await public_WAFs.readlines()] + async def filter_out_waf_ips(self, ips_to_check: set[str]) -> set[str]: + """Filters out IPs that are in WAF ranges.""" + # Parse the WAF IP ranges + waf_ips_with_cidr = await self.parse_public_waf_ranges() + # Flatten all WAF IPs into a set + all_waf_ips = set(chain.from_iterable(ipaddress.ip_network(waf_ip) for waf_ip in waf_ips_with_cidr)) + # Filter the IPs that are not in WAF ranges + return {ip for ip in ips_to_check if ipaddress.ip_address(ip) not in all_waf_ips} -# Extract TLD from each domain -async def get_top_domains(domains: list[str]): - domains = list(filter(None, domains)) - custom_tldextract = tldextract.TLDExtract( - cache_dir=f"{os.path.normpath(os.path.join(os.path.realpath(__file__), '../../cache/tldextract-cache'))}") - return [str( - custom_tldextract.extract_str(domain).domain - + '.' - + custom_tldextract.extract_str(domain).suffix - ) - for domain in domains] + async def get_top_domains(self, domains: list[str]) -> list[str]: + """Extracts top-level domains from a list of domains.""" + # Filter out empty or None entries from the list + domains = [domain for domain in domains if domain] + # Extract domain and suffix, and combine them to get the full top-level domain + return [ + f"{extracted.domain}.{extracted.suffix}" + for domain in domains + if (extracted := self.custom_tldextract(domain)) + ] diff --git a/waf-abuser.py b/waf-abuser.py index 6ecf060..79305b4 100644 --- a/waf-abuser.py +++ b/waf-abuser.py @@ -1,163 +1,181 @@ #!/usr/bin/env python3 import argparse +import os import datetime import fileinput import sys import threading import traceback -import os - -from colorama import Fore -from colorama import init as colorama_init - +from colorama import Fore, init as colorama_init from modules.ip_gathering import ip_gathering from modules.subdomain_gathering import subdomain_gathering -from modules.utility import * - - -async def print_banner(): - colorama_init() - banner = f"""{Fore.MAGENTA} - +-----------------------------+ - |╦ ╦╔═╗╔═╗ ╔═╗╔╗ ╦ ╦╔═╗╔═╗╦═╗| - |║║║╠═╣╠╣ ╠═╣╠╩╗║ ║╚═╗║╣ ╠╦╝| - |╚╩╝╩ ╩╚ ╩ ╩╚═╝╚═╝╚═╝╚═╝╩╚═| - +-----------------------------+ -{Fore.RESET}""" - print(banner) - - -async def create_logger(name: str, logger_level: logging): - if logger_level is logging.DEBUG: - logging.basicConfig(stream=sys.stdout, - format='[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', - encoding='utf-8', level=logger_level) - else: - logging.basicConfig(stream=sys.stdout, format='{%(filename)s:%(lineno)d} | %(message)s', encoding='utf-8', - level=logger_level) - logger = logging.getLogger(name) - return logger - - -async def arguments(): - parser = argparse.ArgumentParser( - description='WAF-Abuser will search the history for unprotected IPs associated with given domains to bypass the WAF over a direct connection') - required = parser.add_argument_group("Required arguments") - optional = parser.add_argument_group("Optional arguments") - input_group = parser.add_mutually_exclusive_group(required=True) - input_group.add_argument('-d', '--domain', action='store', dest='input_domain', metavar='"domain"', - help='Specify the domain for searches', - ) - input_group.add_argument('-f', '--file', action='store', dest='file_domains', metavar='FILE with domains', - nargs='*', - help='Specify the file with domains for searches', - ) - optional.add_argument('--similarity-rate', action='store', dest='similarity_rate', default=70, metavar='[0-100]', - help='Specify minimum passing percentage for page similarity. Default value: 70', - ) - optional.add_argument('--domains-only', action='store_true', dest='domains_only', - help='Find only domains and subdomains', ) - return parser.parse_args() - - -async def main(): - args = await arguments() - logger = await create_logger(__name__, logging.CRITICAL) - colorama_init() - # Print banner - await print_banner() - # Get domain name from arguments - input_domains = set() - similarity_rate = args.similarity_rate - domains_only_flag = args.domains_only - if args.file_domains: - for line in fileinput.input(files=args.file_domains): - input_domains.add(line.strip()) - elif args.input_domain: - input_domains.add(args.input_domain) - else: - raise ValueError("Improper -d/-f argument") - # Gathering subdomains for input domains - print("1. Gathering subdomains") - find_subdomains = set() - find_subdomains.update(await subdomain_gathering(input_domains)) - logger.debug(find_subdomains) - if domains_only_flag: - print(f"{Fore.GREEN}Found {len(find_subdomains)} domains/subdomains:{Fore.RESET}") - for domain in find_subdomains: - print(domain) - print(f"File output: {os.path.normpath(os.path.join(os.path.realpath(__file__), '../cache/'))}") - return 0 - # Gathering IPs for subdomains - print("2. Gathering IPs") - find_ips = set() - find_ips.update(await ip_gathering(find_subdomains)) - logger.debug(find_ips) - # Filtering out WAF-IPs from gathered IPs - print("3. Filtering out WAF IPs") - filtered_out_ips = set() - filtered_out_ips.update(await filter_out_waf_ips(find_ips)) - logger.debug(filtered_out_ips) - # All IPs were from WAF-Ranges - if len(filtered_out_ips) == 0: - print(f"{Fore.GREEN}Found 0 possible non-WAF IPs") - return 0 - else: +from modules.utility import WAFUtils + + +class WAFAbuser: + def __init__(self, logger_level=WAFUtils.logging.CRITICAL): + self.logger = self.create_logger(logger_level) + self.input_domains = set() + self.similarity_rate = 70 + self.domains_only_flag = False + + @staticmethod + def create_logger(logger_level): + log_format = ( + '[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s' + if logger_level == WAFUtils.logging.DEBUG + else '{%(filename)s:%(lineno)d} | %(message)s' + ) + WAFUtils.logging.basicConfig( + stream=sys.stdout, + format=log_format, + encoding='utf-8', + level=logger_level + ) + return WAFUtils.logging.getLogger(__name__) + + @staticmethod + async def print_banner(): + colorama_init() + banner = f"""{Fore.MAGENTA} + +-----------------------------+ + |╦ ╦╔═╗╔═╗ ╔═╗╔╗ ╦ ╦╔═╗╔═╗╦═╗| + |║║║╠═╣╠╣ ╠═╣╠╩╗║ ║╚═╗║╣ ╠╦╝| + |╚╩╝╩ ╩╚ ╩ ╩╚═╝╚═╝╚═╝╚═╝╩╚═| + +-----------------------------+ + {Fore.RESET}""" + print(banner) + + async def parse_arguments(self): + parser = argparse.ArgumentParser( + description='WAF-Abuser will search the history for unprotected IPs associated with given domains to bypass the WAF over a direct connection') + required = parser.add_argument_group("Required arguments") + optional = parser.add_argument_group("Optional arguments") + input_group = parser.add_mutually_exclusive_group(required=True) + input_group.add_argument('-d', '--domain', action='store', dest='input_domain', metavar='"domain"', + help='Specify the domain for searches') + input_group.add_argument('-f', '--file', action='store', dest='file_domains', metavar='FILE with domains', + nargs='*', help='Specify the file with domains for searches') + optional.add_argument('--similarity-rate', action='store', dest='similarity_rate', default=self.similarity_rate, + metavar='[0-100]', + help=f'Specify minimum passing percentage for page similarity. Default: {self.similarity_rate}') + optional.add_argument('--domains-only', action='store_true', dest='domains_only', + help='Find only domains and subdomains') + # Parse the arguments + args = parser.parse_args() + # Get similarity rate + self.similarity_rate = args.similarity_rate + # Get domain only flag + self.domains_only_flag = args.domains_only + # Get domain names from arguments + if args.file_domains: + self.input_domains.update(line.strip() for line in fileinput.input(files=args.file_domains)) + elif args.input_domain: + self.input_domains.add(args.input_domain) + else: + raise ValueError("Improper -d/-f argument") + + async def gather_subdomains(self): + print("1. Gathering subdomains") + find_subdomains = await subdomain_gathering(self.input_domains) + self.logger.debug(f"Subdomains gathered: {find_subdomains}") + return find_subdomains + + async def gather_ips(self, subdomains): + print("2. Gathering IPs") + find_ips = await ip_gathering(subdomains) + self.logger.debug(find_ips) + return find_ips + + async def filter_waf_ips(self, ips): + print("3. Filtering out WAF IPs") + filtered_out_ips = await WAFUtils.filter_out_waf_ips(ips) + self.logger.debug(filtered_out_ips) + return filtered_out_ips + + async def compare_ips_with_domains(self, filtered_out_ips): print("4. Comparing found IPs with original domain") # Compare input domain content with filtered out IPs content similarity_output = set() - for input_domain in input_domains: - current_domain_content = await get_page_content(input_domain) + for input_domain in self.input_domains: + current_domain_content = await WAFUtils.get_page_content(input_domain) if current_domain_content == 0: - continue - else: - for filtered_ip in filtered_out_ips: - compare_result = await compare_two_pages(original_page=current_domain_content, - check_page=filtered_ip) - # Possible connection error/unavailable page - if compare_result == 0: - continue - # Add if similarity rate > than specified (Default 70%) - elif compare_result[1] > int(similarity_rate): - similarity_output.add(compare_result) - else: - continue - # Output final results - if len(similarity_output) == 0: - print( - f"5. {Fore.YELLOW}Found 0 pages with similarity > {str(similarity_rate)}%{Fore.RESET}" - "\nYou can reduce the similarity percentage [--similarity_rate 70]" - "\nDefault similarity value: 70") - return 0 - else: + continue # Skip if there was an error fetching the domain content + await self.compare_with_filtered_ips(current_domain_content, filtered_out_ips, similarity_output) + return similarity_output + + async def compare_with_filtered_ips(self, current_domain_content, filtered_out_ips, similarity_output): + for filtered_ip in filtered_out_ips: + compare_result = await WAFUtils.compare_two_pages(original_page=current_domain_content, + check_page=filtered_ip) + # Add if similarity rate > than specified (Default 70%) + if compare_result != 0 and compare_result[1] > int(self.similarity_rate): + similarity_output.add(compare_result) + + async def output_results(self, similarity_output): + if not similarity_output: + print(f"5. {Fore.YELLOW}Found 0 pages with similarity > {self.similarity_rate}%{Fore.RESET}") + return + self.print_similarity_header() + self.print_similarity_details(similarity_output) + await self.save_results_to_file(similarity_output) + + def print_similarity_header(self): print(f"5. {Fore.GREEN}Found possible IPs:") + + def print_similarity_details(self, similarity_output): row_format = "{:>15}" * (len(similarity_output) + 1) print(row_format.format("IP", "Similarity")) for ip_and_rate in similarity_output: - print(row_format.format(ip_and_rate[0], str(ip_and_rate[1]) + '%')) + print(row_format.format(ip_and_rate[0], f"{ip_and_rate[1]})%")) + + async def save_results_to_file(self, similarity_output): # Verify that 'output' directory exists - if not os.path.isdir(os.path.normpath(os.path.dirname(os.path.join(os.path.realpath(__file__), '../output/')))): - os.makedirs(os.path.normpath(os.path.dirname(os.path.join(os.path.realpath(__file__), '../output/')))) - with open(os.path.normpath(os.path.join(os.path.realpath(__file__), - f'../output/possible_WAF_bypass_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}.txt')), - 'a') as waf_bypass_to_file: - waf_bypass_to_file.write( - "\n".join(row_format.format(ip_and_rate[0], str(ip_and_rate[1]) + '%') for ip_and_rate in - similarity_output)) + output_dir = os.path.normpath(os.path.dirname(os.path.join(os.path.realpath(__file__), '../output/'))) + os.makedirs(output_dir, exist_ok=True) + output_file = os.path.join(output_dir, + f'possible_WAF_bypass_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}.txt') + with open(output_file, 'a') as waf_bypass_to_file: + lines = [ + f"{ip_and_rate[0]:>15} {ip_and_rate[1]}%" for ip_and_rate in similarity_output + ] + waf_bypass_to_file.write("\n".join(lines) + "\n") + + async def run(self): + await self.print_banner() + await self.parse_arguments() + subdomains = await self.gather_subdomains() + if self.domains_only_flag: + await self.display_subdomains(subdomains) + return + ips = await self.gather_ips(subdomains) + filtered_out_ips = await self.filter_waf_ips(ips) + if not filtered_out_ips: + print(f"{Fore.GREEN}Found 0 possible non-WAF IPs") + return + similarity_output = await self.compare_ips_with_domains(filtered_out_ips) + await self.output_results(similarity_output) + + async def display_subdomains(self, subdomains): + print(f"{Fore.GREEN}Found {len(subdomains)} domains/subdomains:{Fore.RESET}") + for domain in subdomains: + print(domain) + print(f"File output: {os.path.normpath(os.path.join(os.path.realpath(__file__), '../cache/'))}") if __name__ == '__main__': + scanner = WAFAbuser() try: - asyncio.run(main()) - except KeyboardInterrupt: - pass - except SystemExit: - raise - except: + WAFUtils.asyncio.run(scanner.run()) + except (KeyboardInterrupt, SystemExit): + pass # Graceful exit on user interrupt or system exit + except Exception: traceback.print_exc() finally: # Reference: http://stackoverflow.com/questions/1635080/terminate-a-multi-thread-python-program + # Exit the program gracefully based on active threads + exit_code = getattr(os, "_exitcode", 0) if threading.active_count() > 1: - os._exit(getattr(os, "_exitcode", 0)) + os._exit(exit_code) else: - sys.exit(getattr(os, "_exitcode", 0)) + sys.exit(exit_code) From 6c1e63cd9db0fcad493ecf007f0b9efb51424eec Mon Sep 17 00:00:00 2001 From: gnomee1337 <41423212+Gnomee1337@users.noreply.github.com> Date: Sat, 19 Oct 2024 18:48:20 +0300 Subject: [PATCH 2/4] refactored whole project --- modules/ip_gathering.py | 148 +++++----- modules/subdomain_gathering.py | 521 +++++++++++++++++++++------------ requirements.txt | 3 +- waf-abuser.py | 20 +- 4 files changed, 418 insertions(+), 274 deletions(-) diff --git a/modules/ip_gathering.py b/modules/ip_gathering.py index a20d6c7..a5c96c3 100644 --- a/modules/ip_gathering.py +++ b/modules/ip_gathering.py @@ -1,84 +1,88 @@ import datetime -import logging import os import re - import aiohttp import dns.resolver +import aiofiles from bs4 import BeautifulSoup -logger = logging.getLogger(__name__) +class IPGatherer: + def __init__(self): + self.log_dir = os.path.normpath(os.path.join(os.path.realpath(__file__), '../../cache/viewdnsinfo_req_logs/')) + os.makedirs(self.log_dir, exist_ok=True) + self.all_ips = set() + + async def gather_ips(self, domains: set): + for domain in domains: + domain_ips = await self._ip_history_viewdnsinfo(domain) + if domain_ips: + domain_ips = await self._remove_original_ips(domain, domain_ips) + await self._write_domain_related_ips_to_file(domain, domain_ips) + self.all_ips.update(domain_ips) + await self._write_all_possible_ips_to_file() + return sorted(self.all_ips) -async def ip_history_viewdnsinfo(domain: str): - viewdnsinfo_ips_output = set() - # Verify that 'viewdnsinfo_req_logs' directory exists - if not os.path.isdir(os.path.normpath( - os.path.dirname(os.path.join(os.path.realpath(__file__), '../../cache/viewdnsinfo_req_logs/')))): - os.makedirs( - os.path.normpath( - os.path.dirname(os.path.join(os.path.realpath(__file__), '../../cache/viewdnsinfo_req_logs/')))) - async with aiohttp.ClientSession() as session: - # GET-Request for each domain - async with session.get(f'https://viewdns.info/iphistory/?domain={domain}', - timeout=3 - ) as resp: - response_text = await resp.text() - if not response_text.find("403 Forbidden - Naughty!"): - print('SKIP Viewdnsinfo | Daily Limit Exceeded. (Possible bypass: new IP or use viewdns.info API Key)') - return -403 - else: - # Write HTML-Response to file - with open(os.path.normpath(os.path.join(os.path.realpath(__file__), - f'../../cache/viewdnsinfo_req_logs/{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_HTML.txt')), - 'a') as get_request_file: - get_request_file.write(response_text) + async def _ip_history_viewdnsinfo(self, domain: str): + viewdnsinfo_ips_output = set() + async with aiohttp.ClientSession() as session: + async with session.get(f'https://viewdns.info/iphistory/?domain={domain}', timeout=3) as resp: + response_text = await resp.text() + if "403 Forbidden - Naughty!" in response_text: + print( + 'SKIP Viewdnsinfo | Daily Limit Exceeded. (Possible bypass: new IP or use viewdns.info API Key)') + return -403 + # Write HTML response to file + await self._write_html_response(domain, response_text) + # Extract IPs from HTML response soup = BeautifulSoup(response_text.encode('utf-8'), 'html.parser') - rb = soup.find_all('table', {'border': '1'}) - # Find all IPs in HTML-Response - ip_pattern = re.compile(r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}') - viewdnsinfo_ips_output.update(ip_pattern.findall(str(rb))) - # Write only IPs to file - with open(os.path.normpath(os.path.join(os.path.realpath(__file__), - f'../../cache/viewdnsinfo_req_logs/{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_only_ips.txt')), - 'a') as domains_only_file: - domains_only_file.write( - "\n".join(str(viewdnsinfo_out_ips) for viewdnsinfo_out_ips in viewdnsinfo_ips_output)) - return list(viewdnsinfo_ips_output) + tables = soup.find_all('table', {'border': '1'}) + # Extract text from all tables + table_text = ' '.join(table.get_text() for table in tables) + # Improved regex for IP address extraction + ip_pattern = re.compile( + r'\b(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' + r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' + r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' + r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b' + ) + # Search in the extracted text + viewdnsinfo_ips_output.update(ip_pattern.findall(table_text)) + # Write only extracted IPs to file + await self._write_extracted_ips_to_file(domain, viewdnsinfo_ips_output) + return viewdnsinfo_ips_output + + async def _remove_original_ips(self, domain: str, all_domain_ips: set): + try: + # Resolve the original IPs for the given domain + original_ips = dns.resolver.resolve(domain, 'A') + for ip in original_ips: + # Use the .to_text() method to get the string representation of the IP + all_domain_ips.discard(ip.to_text()) + except dns.exception.DNSException: + pass # Handle DNS resolution errors silently + return all_domain_ips + + async def _write_html_response(self, domain: str, response_text: str): + file_path = os.path.join(self.log_dir, + f'{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_HTML.txt') + async with aiofiles.open(file_path, 'w') as file: + await file.write(response_text) + + async def _write_extracted_ips_to_file(self, domain: str, viewdnsinfo_ips_output: set): + file_path = os.path.join(self.log_dir, + f'{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_only_ips.txt') + async with aiofiles.open(file_path, 'w') as file: + await file.write("\n".join(str(ip) for ip in viewdnsinfo_ips_output)) + async def _write_domain_related_ips_to_file(self, domain: str, domain_ips: set): + file_path = os.path.normpath(os.path.join(os.path.realpath(__file__), + f'../../cache/{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_IPs.txt')) + async with aiofiles.open(file_path, 'w') as file: + await file.write("\n".join(sorted(domain_ips))) -async def ip_gathering(domains: set): - all_ips = set() - for domain in domains: - all_domain_ips = set() - # Find all possible IPs for each domain - all_domain_ips.update(await ip_history_viewdnsinfo(domain)) - # Not found any IPs - if len(all_domain_ips) == 0: - continue - else: - # Remove original domain IP from list - try: - domain_original_ips = dns.resolver.resolve(domain, 'A') - for ip in domain_original_ips: - all_domain_ips.discard(str(ip)) - except dns.exception.DNSException as e: - # print(e) - continue - # Write to file all possible ips for domain - with open(os.path.normpath(os.path.join(os.path.realpath(__file__), - f'../../cache/{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_IPs.txt')), - 'a') as all_subdomains_ips_file: - all_subdomains_ips_file.write( - "\n".join(str(ip_in_ips_for_domain) for ip_in_ips_for_domain in sorted(all_domain_ips))) - # Add all ips to 'all_ips' - all_ips.update(all_domain_ips) - # Clear set() for next ips gathering - all_domain_ips.clear() - # Write to file combination of ALL ips for every given domain as input - with open(os.path.normpath(os.path.join(os.path.realpath(__file__), - f'../../cache/ALL_DOMAINS_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_IPs.txt')), - 'a') as ips_for_all_domains: - ips_for_all_domains.write( - "\n".join(str(ip_in_all) for ip_in_all in sorted(all_ips))) - return sorted(all_ips) + async def _write_all_possible_ips_to_file(self): + file_path = os.path.normpath(os.path.join(os.path.realpath(__file__), + f'../../cache/ALL_DOMAINS_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_IPs.txt')) + async with aiofiles.open(file_path, 'w') as file: + await file.write("\n".join(str(ip) for ip in sorted(self.all_ips))) diff --git a/modules/subdomain_gathering.py b/modules/subdomain_gathering.py index b410714..4def53c 100644 --- a/modules/subdomain_gathering.py +++ b/modules/subdomain_gathering.py @@ -1,200 +1,337 @@ +import asyncio import datetime import json -import logging import os +import aiofiles import aiohttp from itertools import chain from bs4 import BeautifulSoup -from modules.utility import get_top_domains - -logger = logging.getLogger(__name__) - - -async def dnsdumpster_scraping(domain: str): - dnsdumpster_output = [] - CSRFtoken = '' - # Verify that 'dnsdumpster_req_logs' directory exists - if not os.path.isdir(os.path.normpath( - os.path.dirname(os.path.join(os.path.realpath(__file__), '../../cache/dnsdumpster_req_logs/')))): - os.makedirs( - os.path.normpath( - os.path.dirname(os.path.join(os.path.realpath(__file__), '../../cache/dnsdumpster_req_logs/')))) - async with aiohttp.ClientSession(cookie_jar=aiohttp.CookieJar()) as session: - # GET-Request for each domain to receive unique CSRFToken - async with session.get('https://dnsdumpster.com') as resp: - cookies = session.cookie_jar.filter_cookies('https://dnsdumpster.com') - CSRFtoken = str(cookies.get('csrftoken')).split('Set-Cookie: csrftoken=')[1] - # POST-Request for each domain - async with session.post('https://dnsdumpster.com', - data={'csrfmiddlewaretoken': CSRFtoken, - 'targetip': domain, - 'user': 'free'}, - headers={'Host': 'dnsdumpster.com', - 'Pragma': 'no-cache', - 'Cache-Control': 'no-cache', - 'Upgrade-Insecure-Requests': '1', - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36', - 'Origin': 'https://dnsdumpster.com', - 'Content-Type': 'application/x-www-form-urlencoded', - 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', - 'Referer': 'https://dnsdumpster.com/', - 'Accept-Language': 'en-US,en;q=0.9,nl;q=0.8', - 'Cookie': f'csrftoken={CSRFtoken}'} - ) as resp: - response_text = await resp.text() - # Write HTML-Response to file - with open(os.path.normpath(os.path.join(os.path.realpath(__file__), - f'../../cache/dnsdumpster_req_logs/{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_HTML.txt')), - 'a') as post_request_file: - post_request_file.write(response_text) - soup = BeautifulSoup(response_text.encode('utf-8'), 'html.parser') +from modules.utility import WAFUtils +from abc import ABC, abstractmethod + + +class BaseScraper(ABC): + """Abstract base class for all scrapers.""" + + def __init__(self, domain: str): + self.domain = domain + self.cache_dir = os.path.normpath(os.path.join(os.path.realpath(__file__), '../../cache')) + os.makedirs(self.cache_dir, exist_ok=True) + + @abstractmethod + async def scrape(self): + """Method to perform the actual scraping logic, to be implemented by subclasses.""" + pass + + async def _write_to_file(self, content: str, file_name: str): + """Asynchronously write content to a file using aiofiles.""" + file_path = os.path.join(self.cache_dir, file_name) + async with aiofiles.open(file_path, 'a') as file: + await file.write(content) + + +class JsonScraper(BaseScraper): + """Abstract class for scrapers that expect JSON responses.""" + + async def fetch_json(self, url: str): + async with aiohttp.ClientSession() as session: + async with session.get(url, headers={'Accept': 'application/json'}) as resp: + return await resp.json(encoding='utf-8') + + +class CrtShScraper(JsonScraper): + """Scraper for crt.sh""" + + def __init__(self, domain: str): + super().__init__(domain) + self.log_dir = os.path.join(self.cache_dir, 'crtsh_req_logs') + os.makedirs(self.log_dir, exist_ok=True) + + async def scrape(self): + """Main scraping method for crt.sh""" + crtsh_output = [] + # Fetch JSON data from crt.sh + response_json = await self._fetch_crtsh_data() + # Write the JSON response to a file + await self._write_json_response(response_json) + # Extract and filter domains from the response + crtsh_output_filtered = self._extract_and_filter_domains(response_json) + # Write the filtered domains to a file + await self._write_domains_to_file(crtsh_output_filtered) + return list(crtsh_output_filtered) + + async def _fetch_crtsh_data(self): + """Fetch JSON data from crt.sh""" + url = f'https://crt.sh/?q={self.domain}&output=json' + async with aiohttp.ClientSession() as session: + async with session.get(url) as resp: + return await resp.json() + + async def _write_json_response(self, response_json): + """Write the raw JSON response to a file""" + file_path = os.path.join( + self.log_dir, f'{self.domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}.json' + ) + async with aiofiles.open(file_path, 'w') as json_file: + await json_file.write(json.dumps(response_json, sort_keys=True, indent=4)) + + def _extract_and_filter_domains(self, response_json): + """Extract and filter domains from the JSON response""" + crtsh_output = [ + record['name_value'].split('\n') for record in response_json + ] + # Flatten list and filter out wildcard domains + crtsh_output_flatten = set(chain.from_iterable(crtsh_output)) + crtsh_output_filtered = {domain for domain in crtsh_output_flatten if not domain.startswith('*.')} + return crtsh_output_filtered + + async def _write_domains_to_file(self, crtsh_output_filtered): + """Write the filtered domains to a file""" + file_path = os.path.join( + self.log_dir, f'{self.domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_only_domains.txt' + ) + async with aiofiles.open(file_path, 'w') as domains_file: + await domains_file.write("\n".join(sorted(crtsh_output_filtered))) + + +class DnsDumpsterScraper(BaseScraper): + """Scraper for DnsDumpster.com""" + + DNSDUMPSTER_URL = 'https://dnsdumpster.com' + + def __init__(self, domain: str): + super().__init__(domain) + self.log_dir = os.path.join(self.cache_dir, 'dnsdumpster_req_logs') + os.makedirs(self.log_dir, exist_ok=True) + + async def scrape(self): + csrf_token = await self._get_csrf_token() + # POST request to dnsdumpster.com with CSRF token and domain + response_text = await self._post_domain_data(csrf_token) + # Write the full HTML response to a file + await self._write_html_response(response_text) + # Parse HTML response and extract domains + dnsdumpster_output = self._extract_domains(response_text) + # Write extracted domains to a file + await self._write_domains_to_file(dnsdumpster_output) + return dnsdumpster_output + + async def _get_csrf_token(self): + """Get CSRF token from dnsdumpster.com.""" + async with aiohttp.ClientSession(cookie_jar=aiohttp.CookieJar()) as session: + async with session.get(self.DNSDUMPSTER_URL) as response: + cookies = session.cookie_jar.filter_cookies(self.DNSDUMPSTER_URL) + csrf_token = str(cookies.get('csrftoken')).split('Set-Cookie: csrftoken=')[1] + return csrf_token + + async def _post_domain_data(self, csrf_token: str): + """Send POST request to dnsdumpster.com with domain data.""" + async with aiohttp.ClientSession() as session: + async with session.post( + self.DNSDUMPSTER_URL, + data={ + 'csrfmiddlewaretoken': csrf_token, + 'targetip': self.domain, + 'user': 'free' + }, + headers={ + 'Host': 'dnsdumpster.com', + 'Pragma': 'no-cache', + 'Cache-Control': 'no-cache', + 'Upgrade-Insecure-Requests': '1', + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36', + 'Origin': self.DNSDUMPSTER_URL, + 'Content-Type': 'application/x-www-form-urlencoded', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', + 'Referer': self.DNSDUMPSTER_URL, + 'Accept-Language': 'en-US,en;q=0.9,nl;q=0.8', + 'Cookie': f'csrftoken={csrf_token}' + } + ) as resp: + return await resp.text() + + def _extract_domains(self, response_text: str): + """Extract domain names from the HTML response using BeautifulSoup.""" + soup = BeautifulSoup(response_text, 'html.parser') rb = soup.find_all('td', {'class': 'col-md-4'}) - # Find all domains in HTML-Response - for found_domain in rb: - dnsdumpster_output.append( - found_domain.text.replace('\n', '').split('HTTP')[0].replace('. ', '').lstrip('1234567890 ').rstrip( - '.')) + domains = [ + found_domain.text.replace('\n', '').split('HTTP')[0].replace('. ', '').lstrip('1234567890 ').rstrip('.') + for found_domain in rb + ] + return domains + + async def _write_html_response(self, response_text: str): + """Write HTML response to a file asynchronously.""" + file_path = os.path.join(self.log_dir, + f'{self.domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_HTML.txt') + async with aiofiles.open(file_path, 'w') as post_request_file: + await post_request_file.write(response_text) + + async def _write_domains_to_file(self, dnsdumpster_output): + """Write only the extracted domains to a file asynchronously.""" + file_path = os.path.join(self.log_dir, + f'{self.domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_only_domains.txt') + async with aiofiles.open(file_path, 'w') as domains_only_file: + await domains_only_file.write("\n".join(sorted(dnsdumpster_output))) + + +class CertSpotterScraper(BaseScraper): + """Scraper for CertSpotter API.""" + + def __init__(self, domain: str): + super().__init__(domain) + self.log_dir = os.path.join(self.cache_dir, 'certspotter_req_logs') + os.makedirs(self.log_dir, exist_ok=True) + + async def scrape(self): + """Main scraping method.""" + certspotter_output = set() + async with aiohttp.ClientSession() as session: + response_json = await self._fetch_certspotter_data(session) + # Run file writing and domain extraction concurrently + certspotter_output, _ = await asyncio.gather( + self._extract_domains(response_json), + self._write_json_response(response_json) + ) # Write only domains to file - with open(os.path.normpath(os.path.join(os.path.realpath(__file__), - f'../../cache/dnsdumpster_req_logs/{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_only_domains.txt')), - 'a') as domains_only_file: - domains_only_file.write( - "\n".join(str(dnsdumpster_out_domain) for dnsdumpster_out_domain in dnsdumpster_output)) - return dnsdumpster_output - - -async def certspotter_scraping(domain: str): - certspotter_output = set() - # Verify that 'certspotter_req_logs' directory exists - if not os.path.isdir(os.path.normpath( - os.path.dirname(os.path.join(os.path.realpath(__file__), '../../cache/certspotter_req_logs/')))): - os.makedirs( - os.path.normpath( - os.path.dirname(os.path.join(os.path.realpath(__file__), '../../cache/certspotter_req_logs/')))) - async with aiohttp.ClientSession() as session: - # Get-Request for each domain with JSON-Response - async with session.get(f'https://api.certspotter.com/v1/issuances?domain={domain}&expand=dns_names', - headers={'Accept': 'application/json'} - ) as resp: - response_text = await resp.json(encoding='utf-8') - # Write JSON-Response to file - with open(os.path.normpath(os.path.join(os.path.realpath(__file__), - f'../../cache/certspotter_req_logs/{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}.json')), - 'a') as json_request_file: - json.dump(response_text, json_request_file, sort_keys=True, indent=4) - # Get all domains from JSON-Response - for dict_in_resp in response_text: - for list_in_dict_resp in dict_in_resp['dns_names']: - certspotter_output.add(list_in_dict_resp.lstrip('*.')) - # Write only domains to file and remove wildcards - with open(os.path.normpath(os.path.join(os.path.realpath(__file__), - f'../../cache/certspotter_req_logs/{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_only_domains.txt')), - 'a') as domains_only_file: - domains_only_file.write( - "\n".join(str(certspotter_out_domain) for certspotter_out_domain in certspotter_output)) - return list(certspotter_output) - - -async def hackertarget_scraping(domain: str): - hackertarget_output = set() - # Verify that 'hackertarget_req_logs' directory exists - if not os.path.isdir(os.path.normpath( - os.path.dirname(os.path.join(os.path.realpath(__file__), '../../cache/hackertarget_req_logs/')))): - os.makedirs( - os.path.normpath( - os.path.dirname(os.path.join(os.path.realpath(__file__), '../../cache/hackertarget_req_logs/')))) - async with aiohttp.ClientSession() as session: - # Get-Request for each domain with TEXT-Response - async with session.get(f'https://api.hackertarget.com/hostsearch/?q={domain}', - ) as resp: - response_text = await resp.text(encoding='utf-8') - if not response_text.find('API count exceeded'): - print( - 'SKIP HackerTarget | Daily Limit Exceeded. (Possible bypass: new IP or use hackertarget.com API Key)') - else: - # Write TEXT-Response to file - with open(os.path.normpath(os.path.join(os.path.realpath(__file__), - f'../../cache/hackertarget_req_logs/{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_TEXT.txt')), - 'a') as text_request_file: - text_request_file.write(str(response_text)) - # Get all domains from TEXT-Response - for line in response_text.split(): - hackertarget_output.add(line.split(sep=",")[0]) - # Write only domains to file - with open(os.path.normpath(os.path.join(os.path.realpath(__file__), - f'../../cache/hackertarget_req_logs/{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_only_domains.txt')), - 'a') as domains_only_file: - domains_only_file.write( - "\n".join(str(hackertarget_out_domain) for hackertarget_out_domain in hackertarget_output)) - return list(hackertarget_output) - - -async def crtsh_scraping(domain: str): - crtsh_output = list() - # Verify that 'crtsh_req_logs' directory exists - if not os.path.isdir( - os.path.normpath(os.path.dirname(os.path.join(os.path.realpath(__file__), '../../cache/crtsh_req_logs/')))): - os.makedirs( - os.path.normpath(os.path.dirname(os.path.join(os.path.realpath(__file__), '../../cache/crtsh_req_logs/')))) - async with aiohttp.ClientSession() as session: - # Get-Request for each domain with JSON-Response - async with session.get(f'https://crt.sh/?q={domain}&output=json', - headers={'Accept': 'application/json'} - ) as resp: - response_text = await resp.json(encoding='utf-8') - # Write JSON-Response to file - with open(os.path.normpath(os.path.join(os.path.realpath(__file__), - f'../../cache/crtsh_req_logs/{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}.json') - ), - 'a') as json_request_file: - json.dump(response_text, json_request_file, sort_keys=True, indent=4) - # Get all domains from JSON-Response - for list_in_resp in response_text: - crtsh_output.append(list_in_resp['name_value'].split('\n')) - # Flatten list(dict(),list(),str,...) to set() for only unique values - crtsh_output_flatten = set(chain.from_iterable(crtsh_output)) - # Filter out wildcard domains - crtsh_output_flatten = {filter_domain for filter_domain in crtsh_output_flatten if - str(filter_domain).find('*.')} - # Write only domains to file - with open(os.path.normpath(os.path.join(os.path.realpath(__file__), - f'../../cache/crtsh_req_logs/{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_only_domains.txt')), - 'a') as domains_only_file: - domains_only_file.write( - "\n".join(str(crtsh_out_domain) for crtsh_out_domain in crtsh_output_flatten)) - return list(crtsh_output_flatten) - - -async def subdomain_gathering(domains: set): - all_domains_and_subdomains = set() - for domain in domains: - all_subdomains_set = set() - # Find all possible subdomain/domain for each domain - all_subdomains_set.update(await dnsdumpster_scraping(domain)) - all_subdomains_set.update(await certspotter_scraping(domain)) - all_subdomains_set.update(await hackertarget_scraping(domain)) - all_subdomains_set.update(await crtsh_scraping(domain)) - # Add own domain - all_subdomains_set.add(domain) - # Add TLD - all_subdomains_set.update(await get_top_domains([domain])) - if len(all_subdomains_set) == 0: - continue - else: - # Write to file all possible subdomains for domain - with open(os.path.normpath(os.path.join(os.path.realpath(__file__), - f'../../cache/{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_subdomains.txt')), - 'a') as all_subdomains: - all_subdomains.write( - "\n".join(str(subdomain_in_all) for subdomain_in_all in sorted(all_subdomains_set))) - # Add all subdomains to 'all_domains_and_subdomains' - all_domains_and_subdomains.update(all_subdomains_set) - # Clear set() for next domain gathering - all_subdomains_set.clear() - # Write to file combination of ALL domains/subdomains for every given domain as input - with open(os.path.normpath(os.path.join(os.path.realpath(__file__), - f'../../cache/ALL_DOMAINS_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}.txt')), - 'a') as all_domains: - all_domains.write( - "\n".join(str(domain_in_all) for domain_in_all in sorted(all_domains_and_subdomains))) - return sorted(all_domains_and_subdomains) + await self._write_domains_to_file(certspotter_output) + return list(certspotter_output) + + async def _fetch_certspotter_data(self, session): + """Send GET request to CertSpotter API and retrieve JSON response.""" + async with session.get( + f'https://api.certspotter.com/v1/issuances?domain={self.domain}&expand=dns_names', + headers={'Accept': 'application/json'} + ) as resp: + return await resp.json(encoding='utf-8') + + async def _write_json_response(self, response_json): + """Write JSON response to a file asynchronously.""" + file_path = os.path.join( + self.log_dir, f'{self.domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}.json' + ) + async with aiofiles.open(file_path, 'w') as json_request_file: + await json.dump(response_json, json_request_file, sort_keys=True, indent=4) + + async def _write_domains_to_file(self, certspotter_output): + """Write extracted domains (no wildcards) to a file asynchronously.""" + file_path = os.path.join( + self.log_dir, f'{self.domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_only_domains.txt' + ) + async with aiofiles.open(file_path, 'w') as domains_only_file: + await domains_only_file.write("\n".join(sorted(certspotter_output))) + + async def _extract_domains(self, response_json): + """Extract domain names from JSON response and clean up wildcard entries.""" + certspotter_output = set() + for cert_data in response_json: + for dns_name in cert_data['dns_names']: + certspotter_output.add(dns_name.lstrip('*.')) + return certspotter_output + + +class HackerTargetScraper(BaseScraper): + """Scraper for HackerTarget API.""" + + def __init__(self, domain: str): + super().__init__(domain) + self.log_dir = os.path.join(self.cache_dir, 'hackertarget_req_logs') + os.makedirs(self.log_dir, exist_ok=True) + + async def scrape(self): + """Main scraping method.""" + hackertarget_output = set() + async with aiohttp.ClientSession() as session: + response_text = await self._fetch_hackertarget_data(session) + if 'API count exceeded' in response_text: + print('SKIP HackerTarget | Daily Limit Exceeded. (Possible bypass: new IP or use hackertarget.com API Key)') + return list(hackertarget_output) + # Run file writing and domain extraction concurrently + hackertarget_output, _ = await asyncio.gather( + self._extract_domains(response_text), + self._write_text_response(response_text) + ) + # Write extracted domains to file + await self._write_domains_to_file(hackertarget_output) + return list(hackertarget_output) + + async def _fetch_hackertarget_data(self, session): + """Send GET request to HackerTarget API and retrieve text response.""" + async with session.get(f'https://api.hackertarget.com/hostsearch/?q={self.domain}') as resp: + return await resp.text(encoding='utf-8') + + async def _write_text_response(self, response_text): + """Write text response to a file asynchronously.""" + file_path = os.path.join( + self.log_dir, f'{self.domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_TEXT.txt' + ) + async with aiofiles.open(file_path, 'w') as text_request_file: + await text_request_file.write(response_text) + + async def _write_domains_to_file(self, hackertarget_output): + """Write extracted domains to a file asynchronously.""" + file_path = os.path.join( + self.log_dir, f'{self.domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_only_domains.txt' + ) + async with aiofiles.open(file_path, 'w') as domains_only_file: + await domains_only_file.write("\n".join(sorted(hackertarget_output))) + + async def _extract_domains(self, response_text): + """Extract domain names from the text response.""" + hackertarget_output = set() + for line in response_text.splitlines(): + if "," in line: + domain = line.split(",")[0] + hackertarget_output.add(domain) + return hackertarget_output + + +class SubdomainGatherer: + """Class to gather subdomains using multiple scrapers.""" + + def __init__(self, domains: set): + self.domains = domains + self.all_subdomains = set() + + async def gather_subdomains(self): + for domain in self.domains: + domain_subdomains = set() + domain_subdomains.update(await self.scrape_domain(domain)) + await self._write_domain_subdomains_to_file(domain, domain_subdomains) + + # Add domain and top-level domain (TLD) + domain_subdomains.add(domain) + domain_subdomains.update(await WAFUtils.get_top_domains([domain])) + + # Add subdomains to overall set + self.all_subdomains.update(domain_subdomains) + + # Write all domains/subdomains to a final file + await self._write_all_subdomains_to_file() + return sorted(self.all_subdomains) + + async def scrape_domain(self, domain: str): + """Method to scrape multiple sources for a given domain.""" + subdomains = set() + scrapers = [ + # Add other scrapers here (DnsDumpsterScraper, CertSpotterScraper, etc.) + CrtShScraper(domain), + DnsDumpsterScraper(domain), + CertSpotterScraper(domain), + HackerTargetScraper(domain), + ] + for scraper in scrapers: + subdomains.update(await scraper.scrape()) + return subdomains + + async def _write_domain_subdomains_to_file(self, domain: str, subdomains: set): + file_path = os.path.normpath(os.path.join(os.path.realpath(__file__), + f'../../cache/{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_subdomains.txt')) + async with aiofiles.open(file_path, 'a') as file: + await file.write("\n".join(sorted(subdomains))) + + async def _write_all_subdomains_to_file(self): + file_path = os.path.normpath(os.path.join(os.path.realpath(__file__), + f'../../cache/ALL_DOMAINS_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}.txt')) + async with aiofiles.open(file_path, 'a') as file: + await file.write("\n".join(sorted(self.all_subdomains))) diff --git a/requirements.txt b/requirements.txt index 16d09af..59c0307 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ +aiofiles==24.1.0 aiohttp==3.9.2 beautifulsoup4==4.12.3 colorama==0.4.6 -dnspython==2.5.0 +dnspython==2.7.0 html_similarity==0.3.3 tldextract==5.1.1 diff --git a/waf-abuser.py b/waf-abuser.py index 79305b4..33fb074 100644 --- a/waf-abuser.py +++ b/waf-abuser.py @@ -6,14 +6,16 @@ import sys import threading import traceback +import logging +import asyncio from colorama import Fore, init as colorama_init -from modules.ip_gathering import ip_gathering -from modules.subdomain_gathering import subdomain_gathering from modules.utility import WAFUtils +from modules.subdomain_gathering import SubdomainGatherer +from modules.ip_gathering import IPGatherer class WAFAbuser: - def __init__(self, logger_level=WAFUtils.logging.CRITICAL): + def __init__(self, logger_level=logging.CRITICAL): self.logger = self.create_logger(logger_level) self.input_domains = set() self.similarity_rate = 70 @@ -23,16 +25,16 @@ def __init__(self, logger_level=WAFUtils.logging.CRITICAL): def create_logger(logger_level): log_format = ( '[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s' - if logger_level == WAFUtils.logging.DEBUG + if logger_level == logging.DEBUG else '{%(filename)s:%(lineno)d} | %(message)s' ) - WAFUtils.logging.basicConfig( + logging.basicConfig( stream=sys.stdout, format=log_format, encoding='utf-8', level=logger_level ) - return WAFUtils.logging.getLogger(__name__) + return logging.getLogger(__name__) @staticmethod async def print_banner(): @@ -77,13 +79,13 @@ async def parse_arguments(self): async def gather_subdomains(self): print("1. Gathering subdomains") - find_subdomains = await subdomain_gathering(self.input_domains) + find_subdomains = await SubdomainGatherer(self.input_domains).gather_subdomains() self.logger.debug(f"Subdomains gathered: {find_subdomains}") return find_subdomains async def gather_ips(self, subdomains): print("2. Gathering IPs") - find_ips = await ip_gathering(subdomains) + find_ips = await IPGatherer.gather_ips(subdomains) self.logger.debug(find_ips) return find_ips @@ -166,7 +168,7 @@ async def display_subdomains(self, subdomains): if __name__ == '__main__': scanner = WAFAbuser() try: - WAFUtils.asyncio.run(scanner.run()) + asyncio.run(scanner.run()) except (KeyboardInterrupt, SystemExit): pass # Graceful exit on user interrupt or system exit except Exception: From 23f481a7b4cb9afb1f0b0030de874c24570c3a9d Mon Sep 17 00:00:00 2001 From: gnomee1337 <41423212+Gnomee1337@users.noreply.github.com> Date: Sun, 20 Oct 2024 01:06:32 +0300 Subject: [PATCH 3/4] fix viewdnsinfo issue with finding table --- modules/ip_gathering.py | 18 +++++++++++------- modules/subdomain_gathering.py | 18 +++++++++--------- waf-abuser.py | 16 ++++++++++------ 3 files changed, 30 insertions(+), 22 deletions(-) diff --git a/modules/ip_gathering.py b/modules/ip_gathering.py index a5c96c3..4e59117 100644 --- a/modules/ip_gathering.py +++ b/modules/ip_gathering.py @@ -26,7 +26,7 @@ async def gather_ips(self, domains: set): async def _ip_history_viewdnsinfo(self, domain: str): viewdnsinfo_ips_output = set() async with aiohttp.ClientSession() as session: - async with session.get(f'https://viewdns.info/iphistory/?domain={domain}', timeout=3) as resp: + async with session.get(f'https://viewdns.info/iphistory/?domain={domain}', timeout=30) as resp: response_text = await resp.text() if "403 Forbidden - Naughty!" in response_text: print( @@ -34,11 +34,8 @@ async def _ip_history_viewdnsinfo(self, domain: str): return -403 # Write HTML response to file await self._write_html_response(domain, response_text) - # Extract IPs from HTML response + # Setup soup soup = BeautifulSoup(response_text.encode('utf-8'), 'html.parser') - tables = soup.find_all('table', {'border': '1'}) - # Extract text from all tables - table_text = ' '.join(table.get_text() for table in tables) # Improved regex for IP address extraction ip_pattern = re.compile( r'\b(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' @@ -46,8 +43,15 @@ async def _ip_history_viewdnsinfo(self, domain: str): r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b' ) - # Search in the extracted text - viewdnsinfo_ips_output.update(ip_pattern.findall(table_text)) + # Find the table containing the IP addresses + tables = soup.find_all('table', {'border': '1'}) + for table in tables: # Iterate over each table found + # Iterate through all elements in the table + for td in table.find_all('td'): + text = td.get_text(strip=True) + # Check if the text matches the IP pattern + if ip_pattern.match(text): + viewdnsinfo_ips_output.add(text) # Write only extracted IPs to file await self._write_extracted_ips_to_file(domain, viewdnsinfo_ips_output) return viewdnsinfo_ips_output diff --git a/modules/subdomain_gathering.py b/modules/subdomain_gathering.py index 4def53c..d3ae7cb 100644 --- a/modules/subdomain_gathering.py +++ b/modules/subdomain_gathering.py @@ -6,9 +6,10 @@ import aiohttp from itertools import chain from bs4 import BeautifulSoup -from modules.utility import WAFUtils from abc import ABC, abstractmethod +from modules.utility import WAFUtils + class BaseScraper(ABC): """Abstract base class for all scrapers.""" @@ -294,18 +295,17 @@ def __init__(self, domains: set): self.all_subdomains = set() async def gather_subdomains(self): + waf_utils = WAFUtils() for domain in self.domains: domain_subdomains = set() domain_subdomains.update(await self.scrape_domain(domain)) await self._write_domain_subdomains_to_file(domain, domain_subdomains) - - # Add domain and top-level domain (TLD) + # Add domain itself domain_subdomains.add(domain) - domain_subdomains.update(await WAFUtils.get_top_domains([domain])) - + # And add top-level domain (TLD) + domain_subdomains.update(await waf_utils.get_top_domains([domain])) # Add subdomains to overall set self.all_subdomains.update(domain_subdomains) - # Write all domains/subdomains to a final file await self._write_all_subdomains_to_file() return sorted(self.all_subdomains) @@ -316,9 +316,9 @@ async def scrape_domain(self, domain: str): scrapers = [ # Add other scrapers here (DnsDumpsterScraper, CertSpotterScraper, etc.) CrtShScraper(domain), - DnsDumpsterScraper(domain), - CertSpotterScraper(domain), - HackerTargetScraper(domain), + # DnsDumpsterScraper(domain), + # CertSpotterScraper(domain), + # HackerTargetScraper(domain), ] for scraper in scrapers: subdomains.update(await scraper.scrape()) diff --git a/waf-abuser.py b/waf-abuser.py index 33fb074..931e182 100644 --- a/waf-abuser.py +++ b/waf-abuser.py @@ -9,6 +9,7 @@ import logging import asyncio from colorama import Fore, init as colorama_init + from modules.utility import WAFUtils from modules.subdomain_gathering import SubdomainGatherer from modules.ip_gathering import IPGatherer @@ -18,8 +19,11 @@ class WAFAbuser: def __init__(self, logger_level=logging.CRITICAL): self.logger = self.create_logger(logger_level) self.input_domains = set() - self.similarity_rate = 70 + self.similarity_rate = 0 self.domains_only_flag = False + self.ip_gatherer = IPGatherer() + self.waf_utils = WAFUtils() + self.subdomain_gatherer = SubdomainGatherer(self.input_domains) @staticmethod def create_logger(logger_level): @@ -85,13 +89,13 @@ async def gather_subdomains(self): async def gather_ips(self, subdomains): print("2. Gathering IPs") - find_ips = await IPGatherer.gather_ips(subdomains) + find_ips = await self.ip_gatherer.gather_ips(subdomains) self.logger.debug(find_ips) return find_ips async def filter_waf_ips(self, ips): print("3. Filtering out WAF IPs") - filtered_out_ips = await WAFUtils.filter_out_waf_ips(ips) + filtered_out_ips = await self.waf_utils.filter_out_waf_ips(ips) self.logger.debug(filtered_out_ips) return filtered_out_ips @@ -100,7 +104,7 @@ async def compare_ips_with_domains(self, filtered_out_ips): # Compare input domain content with filtered out IPs content similarity_output = set() for input_domain in self.input_domains: - current_domain_content = await WAFUtils.get_page_content(input_domain) + current_domain_content = await self.waf_utils.get_page_content(input_domain) if current_domain_content == 0: continue # Skip if there was an error fetching the domain content await self.compare_with_filtered_ips(current_domain_content, filtered_out_ips, similarity_output) @@ -108,8 +112,8 @@ async def compare_ips_with_domains(self, filtered_out_ips): async def compare_with_filtered_ips(self, current_domain_content, filtered_out_ips, similarity_output): for filtered_ip in filtered_out_ips: - compare_result = await WAFUtils.compare_two_pages(original_page=current_domain_content, - check_page=filtered_ip) + compare_result = await self.waf_utils.compare_two_pages(original_page=current_domain_content, + check_page=filtered_ip) # Add if similarity rate > than specified (Default 70%) if compare_result != 0 and compare_result[1] > int(self.similarity_rate): similarity_output.add(compare_result) From d14d88968fafbf2049dc585e4e39fdceb5be3c84 Mon Sep 17 00:00:00 2001 From: gnomee1337 <41423212+Gnomee1337@users.noreply.github.com> Date: Sun, 20 Oct 2024 03:24:46 +0300 Subject: [PATCH 4/4] fixed issue with filename sanitation. Added retries to crt.sh for possible long response. --- modules/ip_gathering.py | 21 ++++++++++++-- modules/subdomain_gathering.py | 50 +++++++++++++++++++++++++++------- waf-abuser.py | 2 +- 3 files changed, 59 insertions(+), 14 deletions(-) diff --git a/modules/ip_gathering.py b/modules/ip_gathering.py index 4e59117..2c59465 100644 --- a/modules/ip_gathering.py +++ b/modules/ip_gathering.py @@ -68,20 +68,26 @@ async def _remove_original_ips(self, domain: str, all_domain_ips: set): return all_domain_ips async def _write_html_response(self, domain: str, response_text: str): + # Use this sanitized domain when generating filenames + sanitized_domain = self.sanitize_filename(domain) file_path = os.path.join(self.log_dir, - f'{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_HTML.txt') + f'{sanitized_domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_HTML.txt') async with aiofiles.open(file_path, 'w') as file: await file.write(response_text) async def _write_extracted_ips_to_file(self, domain: str, viewdnsinfo_ips_output: set): + # Use this sanitized domain when generating filenames + sanitized_domain = self.sanitize_filename(domain) file_path = os.path.join(self.log_dir, - f'{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_only_ips.txt') + f'{sanitized_domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_only_ips.txt') async with aiofiles.open(file_path, 'w') as file: await file.write("\n".join(str(ip) for ip in viewdnsinfo_ips_output)) async def _write_domain_related_ips_to_file(self, domain: str, domain_ips: set): + # Use this sanitized domain when generating filenames + sanitized_domain = self.sanitize_filename(domain) file_path = os.path.normpath(os.path.join(os.path.realpath(__file__), - f'../../cache/{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_IPs.txt')) + f'../../cache/{sanitized_domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_IPs.txt')) async with aiofiles.open(file_path, 'w') as file: await file.write("\n".join(sorted(domain_ips))) @@ -90,3 +96,12 @@ async def _write_all_possible_ips_to_file(self): f'../../cache/ALL_DOMAINS_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_IPs.txt')) async with aiofiles.open(file_path, 'w') as file: await file.write("\n".join(str(ip) for ip in sorted(self.all_ips))) + + def sanitize_filename(self, domain: str) -> str: + # Remove non-alphanumeric characters (keep dots for domain separation) + sanitized_domain = re.sub(r'[^A-Za-z0-9.-]+', '', domain) + # Optionally limit the length of the domain name in the filename + max_length = 50 + if len(sanitized_domain) > max_length: + sanitized_domain = sanitized_domain[:max_length] + return sanitized_domain diff --git a/modules/subdomain_gathering.py b/modules/subdomain_gathering.py index d3ae7cb..a4f7fa2 100644 --- a/modules/subdomain_gathering.py +++ b/modules/subdomain_gathering.py @@ -2,6 +2,8 @@ import datetime import json import os +import re + import aiofiles import aiohttp from itertools import chain @@ -61,12 +63,32 @@ async def scrape(self): await self._write_domains_to_file(crtsh_output_filtered) return list(crtsh_output_filtered) - async def _fetch_crtsh_data(self): + async def _fetch_crtsh_data(self, retries=3, delay=5): """Fetch JSON data from crt.sh""" url = f'https://crt.sh/?q={self.domain}&output=json' - async with aiohttp.ClientSession() as session: - async with session.get(url) as resp: - return await resp.json() + for attempt in range(retries): + try: + async with aiohttp.ClientSession() as session: + async with session.get(url, timeout=30) as resp: + if resp.status != 200: + print(f"Error: crt.sh Received status code {resp.status} on attempt {attempt + 1}") + continue + # Check if the response is in JSON format + content_type = resp.headers.get('Content-Type', '').lower() + if 'application/json' in content_type: + return await resp.json() + else: + # If not JSON, treat it as text (likely an HTML error page) + text_response = await resp.text() + print(f"crt.sh Unexpected content type: {content_type}") + # Print part of the response for debugging + print("crt.sh Response content: {text_response[:500]}") + return None + except (aiohttp.ClientError, asyncio.TimeoutError) as e: + print(f"crt.sh request error on attempt {attempt + 1}: {e}") + await asyncio.sleep(delay) + print("All crt.sh attempts failed.") + return None async def _write_json_response(self, response_json): """Write the raw JSON response to a file""" @@ -78,6 +100,10 @@ async def _write_json_response(self, response_json): def _extract_and_filter_domains(self, response_json): """Extract and filter domains from the JSON response""" + # Check if response_json is None before attempting to process it + if response_json is None: + print("Error: No valid data returned from crt.sh") + return [] crtsh_output = [ record['name_value'].split('\n') for record in response_json ] @@ -202,7 +228,8 @@ async def _fetch_certspotter_data(self, session): """Send GET request to CertSpotter API and retrieve JSON response.""" async with session.get( f'https://api.certspotter.com/v1/issuances?domain={self.domain}&expand=dns_names', - headers={'Accept': 'application/json'} + headers={'Accept': 'application/json'}, + timeout=30 ) as resp: return await resp.json(encoding='utf-8') @@ -212,7 +239,10 @@ async def _write_json_response(self, response_json): self.log_dir, f'{self.domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}.json' ) async with aiofiles.open(file_path, 'w') as json_request_file: - await json.dump(response_json, json_request_file, sort_keys=True, indent=4) + # Create the JSON string synchronously + json_string = json.dumps(response_json, sort_keys=True, indent=4) + # Write the JSON content to the file asynchronously + await json_request_file.write(json_string) async def _write_domains_to_file(self, certspotter_output): """Write extracted domains (no wildcards) to a file asynchronously.""" @@ -258,7 +288,7 @@ async def scrape(self): async def _fetch_hackertarget_data(self, session): """Send GET request to HackerTarget API and retrieve text response.""" - async with session.get(f'https://api.hackertarget.com/hostsearch/?q={self.domain}') as resp: + async with session.get(f'https://api.hackertarget.com/hostsearch/?q={self.domain}', timeout=30) as resp: return await resp.text(encoding='utf-8') async def _write_text_response(self, response_text): @@ -316,9 +346,9 @@ async def scrape_domain(self, domain: str): scrapers = [ # Add other scrapers here (DnsDumpsterScraper, CertSpotterScraper, etc.) CrtShScraper(domain), - # DnsDumpsterScraper(domain), - # CertSpotterScraper(domain), - # HackerTargetScraper(domain), + DnsDumpsterScraper(domain), + CertSpotterScraper(domain), + HackerTargetScraper(domain), ] for scraper in scrapers: subdomains.update(await scraper.scrape()) diff --git a/waf-abuser.py b/waf-abuser.py index 931e182..3eb05d3 100644 --- a/waf-abuser.py +++ b/waf-abuser.py @@ -19,7 +19,7 @@ class WAFAbuser: def __init__(self, logger_level=logging.CRITICAL): self.logger = self.create_logger(logger_level) self.input_domains = set() - self.similarity_rate = 0 + self.similarity_rate = 70 self.domains_only_flag = False self.ip_gatherer = IPGatherer() self.waf_utils = WAFUtils()