From 7bb92306ea4d994e8e24693d87f22b97e7ddd50d Mon Sep 17 00:00:00 2001 From: drighetto Date: Sat, 27 Jan 2024 18:50:37 +0100 Subject: [PATCH] Add support for NAPALM FTP Indexer - #120 --- .gitignore | 1 + wpr.py | 129 +++++++++++++++++++++++++++++++---------------------- 2 files changed, 77 insertions(+), 53 deletions(-) diff --git a/.gitignore b/.gitignore index 60ffb6b..7f58c27 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ __pycache__/ filetype_dork_result.txt test.py dnsdumpster.* +*.tmp \ No newline at end of file diff --git a/wpr.py b/wpr.py index d248fd8..f0f2765 100644 --- a/wpr.py +++ b/wpr.py @@ -33,9 +33,9 @@ from dnsdumpster.DNSDumpsterAPI import DNSDumpsterAPI -USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36" +USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" MOBILE_APP_STORE_COUNTRY_STORE_CODE = "LU" # Luxembourg -DEFAULT_CALL_TIMEOUT = 60 # 1 minute +DEFAULT_CALL_TIMEOUT = 30 WAPPALYZER_MAX_MONTHS_RESULT_OLD = 6 INTERESTING_FILE_EXTENSIONS = ["pdf", "doc", "docx", "xls", "xlsx", "ppt", "pptx", "pps", "odp", "ods", "odt", "rtf", "java", "cs", "vb", "py", "rb", "zip", "tar", "gz", "7z", "eml", "msg", "sql", "ini", @@ -127,8 +127,7 @@ def get_intelx_infos(ip_or_domain, api_key, http_proxy): } # First we must do a search service_url = f"https://2.intelx.io/intelligent/search" - response = requests.post(service_url, data=json.dumps(payload), headers=request_headers, - proxies=web_proxies, verify=(http_proxy is None), timeout=DEFAULT_CALL_TIMEOUT) + response = requests.post(service_url, data=json.dumps(payload), headers=request_headers, proxies=web_proxies, verify=(http_proxy is None), timeout=DEFAULT_CALL_TIMEOUT) if response.status_code != 200: infos.append(f"HTTP response code {response.status_code} received for the search!") return infos @@ -171,11 +170,9 @@ def extract_infos_from_virus_total_response(http_response): infos = [] if http_response.status_code != 200: if http_response.status_code != 204: - infos.append( - f"HTTP response code {http_response.status_code} received!") + infos.append(f"HTTP response code {http_response.status_code} received!") else: - infos.append( - f"Request rate limit exceeded: Wait one minute and re-run the script!") + infos.append(f"Request rate limit exceeded: Wait one minute and re-run the script!") else: results = http_response.json() # From VT API doc regarding the "response_code" property: @@ -196,11 +193,9 @@ def extract_infos_from_virus_total_response(http_response): if "undetected_urls" in results: urls_undetected_count = len(results["undetected_urls"]) if "detected_downloaded_samples" in results: - samples_detected_download_count = len( - results["detected_downloaded_samples"]) + samples_detected_download_count = len(results["detected_downloaded_samples"]) if "undetected_downloaded_samples" in results: - samples_undetected_download_count = len( - results["undetected_downloaded_samples"]) + samples_undetected_download_count = len(results["undetected_downloaded_samples"]) infos.append(f"URLs at this IP address that have at least one detection on a URL scan = {urls_detected_count}") infos.append(f"URLs at this IP address with no detections on a URL scan = {urls_undetected_count}") infos.append(f"Files that have been downloaded from this IP address with at least one AV detection = {samples_detected_download_count}") @@ -369,8 +364,7 @@ def get_passive_shared_hosts(ip, http_proxy): infos = [] # See https://www.threatminer.org/api.php service_url = f"https://api.threatminer.org/v2/host.php?q={ip}&rt=2" - response = requests.get(service_url, headers={ - "User-Agent": USER_AGENT}, proxies=web_proxies, verify=(http_proxy is None), timeout=DEFAULT_CALL_TIMEOUT) + response = requests.get(service_url, headers={"User-Agent": USER_AGENT}, proxies=web_proxies, verify=(http_proxy is None), timeout=DEFAULT_CALL_TIMEOUT) if response.status_code != 200: infos.append(f"HTTP response code {response.status_code} received from ThreatMiner API !") else: @@ -609,8 +603,7 @@ def get_certificate_transparency_log_subdomains(domain, http_proxy): service_url = f"https://crt.sh/?q=%.{domain}&output=json" response = requests.get(service_url, headers={"User-Agent": USER_AGENT}, proxies=web_proxies, verify=(http_proxy is None), timeout=DEFAULT_CALL_TIMEOUT) if response.status_code != 200: - infos.append( - f"HTTP response code {response.status_code} received!") + infos.append(f"HTTP response code {response.status_code} received!") return infos results = response.json() for entry in results: @@ -655,13 +648,11 @@ def get_softwareheritage_infos(domain_or_ip, http_proxy): # Set a long timeout (up to 4 minutes) because the response take a while to reply response = requests.get(service_url, headers={"User-Agent": USER_AGENT}, proxies=web_proxies, verify=(http_proxy is None), timeout=DEFAULT_CALL_TIMEOUT) if response.status_code != 200: - infos["DATA"].append( - f"HTTP response code {response.status_code} received!") + infos["DATA"].append(f"HTTP response code {response.status_code} received!") return infos results = response.json() remaining_allowed_call_for_current_hour = response.headers["X-RateLimit-Remaining"] - next_call_count_reset = datetime.datetime.fromtimestamp( - int(response.headers["X-RateLimit-Reset"])) + next_call_count_reset = datetime.datetime.fromtimestamp(int(response.headers["X-RateLimit-Reset"])) infos["LIMIT"] = f"{remaining_allowed_call_for_current_hour} call(s) can still be performed in the current hours (reseted at {next_call_count_reset})." for entry in results: infos["DATA"].append(entry["url"]) @@ -718,7 +709,7 @@ def get_mobile_app_infos(domain, http_proxy): infos["DATA"].append(f"HTTP response code {response.status_code} received!") return infos results = response.text - android_bundle_regex = f"id=({domain_infos.suffix}\.{domain_infos.domain}\.[a-z0-9A-Z\.\-_]+)" + android_bundle_regex = f"id=({domain_infos.suffix}\\.{domain_infos.domain}\\.[a-z0-9A-Z\\.\\-_]+)" bundles = re.findall(android_bundle_regex, results) for bundle in bundles: infos["DATA"].append(f"Android app found with PackageId '{bundle}'.") @@ -742,16 +733,13 @@ def get_dns_dumpster_infos(domain, http_proxy): if len(results) > 0: data = results["dns_records"] for entry in data["dns"]: - infos["DATA"].append( - f"[DNS ]: IP \"{entry['ip']}\" - Domain \"{entry['domain']}\" - ReverseDNS \"{entry['reverse_dns']}\" - AS \"{entry['as']}\"") + infos["DATA"].append(f"[DNS ]: IP \"{entry['ip']}\" - Domain \"{entry['domain']}\" - ReverseDNS \"{entry['reverse_dns']}\" - AS \"{entry['as']}\"") for entry in data["mx"]: - infos["DATA"].append( - f"[MX ]: IP \"{entry['ip']}\" - Domain \"{entry['domain']}\" - ReverseDNS \"{entry['reverse_dns']}\" - AS \"{entry['as']}\"") + infos["DATA"].append(f"[MX ]: IP \"{entry['ip']}\" - Domain \"{entry['domain']}\" - ReverseDNS \"{entry['reverse_dns']}\" - AS \"{entry['as']}\"") for entry in data["txt"]: infos["DATA"].append(f"[TXT ]: {entry}") for entry in data["host"]: - infos["DATA"].append( - f"[HOST]: IP \"{entry['ip']}\" - Domain \"{entry['domain']}\" - ReverseDNS \"{entry['reverse_dns']}\" - AS \"{entry['as']}\"") + infos["DATA"].append(f"[HOST]: IP \"{entry['ip']}\" - Domain \"{entry['domain']}\" - ReverseDNS \"{entry['reverse_dns']}\" - AS \"{entry['as']}\"") if results["xls_data"] != None: infos["XLS"] = base64.b64decode(results["xls_data"]) if results["image_data"] != None: @@ -898,26 +886,53 @@ def get_leakix_info(field_type, field_value, http_proxy): return infos +def get_napalm_ftp_indexer_info(domain, http_proxy): + infos = {"DATA": [], "ERROR": None} + # See https://www.searchftps.net/ + service_url = f"https://www.searchftps.net/" + expected_response_marker = "showing results" + regex_results_count = r'Showing\s+results\s+\d+\s+to\s+\d+\s+of\s+about\s+(\d+)' + try: + web_proxies = configure_proxy(http_proxy) + req_session = requests.Session() + req_session.headers.update({"User-Agent": USER_AGENT, "Content-Type": "application/x-www-form-urlencoded"}) + req_session.proxies.update(web_proxies) + req_session.verify = (http_proxy is None) + form_data = {"action": "result", "args": f"k={domain}&t=and&o=date-desc&s=0"} + response = req_session.post(url=service_url, data=form_data) + if response.status_code != 200: + infos["ERROR"] = f"HTTP response code {response.status_code} received!" + infos["DATA"].clear() + return infos + results = response.text + if expected_response_marker not in results.lower(): + with open("debug.tmp", mode="w", encoding="utf-8") as f: + f.write(results) + infos["ERROR"] = f"Non expected response received, marker '{expected_response_marker}' not found, see 'debug.tmp' file generated." + infos["DATA"].clear() + return infos + results_count = re.findall(regex_results_count, results, re.IGNORECASE | re.MULTILINE) + if len(results_count) > 0 and int(results_count[0]) > 0: + infos["DATA"].append(f"{results_count[0]} entries present on the site.") + except Exception as e: + infos["ERROR"] = f"Error during web call: {str(e)}" + infos["DATA"].clear() + return infos + + if __name__ == "__main__": requests.packages.urllib3.disable_warnings() colorama.init() start_time = time.time() parser = argparse.ArgumentParser() required_params = parser.add_argument_group("required arguments") - required_params.add_argument("-d", action="store", dest="domain_name", - help="Domain to analyse (ex: righettod.eu).", required=True) - parser.add_argument("-a", action="store", dest="api_key_file", default=None, - help="Configuration INI file with all API keys (ex: conf.ini).", required=False) - parser.add_argument("-n", action="store", dest="name_server", default=None, - help="Name server to use for the DNS query (ex: 8.8.8.8).", required=False) - parser.add_argument("-p", action="store", dest="http_proxy", default=None, - help="HTTP proxy to use for all HTTP call to differents services (ex: http://88.198.50.103:9080).", required=False) - parser.add_argument("-s", action="store_true", dest="store_filetype_dork_result", default=False, - help="Save the result of the Google/Bing Dork searching for interesting files to the file 'filetype_dork_result.txt'.", required=False) - parser.add_argument("-t", action="store", dest="request_timeout", type=int, default=DEFAULT_CALL_TIMEOUT, - help="Delay in seconds allowed for a HTTP request to reply before to fall in timeout (ex: 20).", required=False) - parser.add_argument("-m", action="store", dest="mobile_app_store_country_code", default=MOBILE_APP_STORE_COUNTRY_STORE_CODE, - help="Country code to define in which store mobile app will be searched (ex: LU).", required=False) + required_params.add_argument("-d", action="store", dest="domain_name", help="Domain to analyse (ex: righettod.eu).", required=True) + parser.add_argument("-a", action="store", dest="api_key_file", default=None, help="Configuration INI file with all API keys (ex: conf.ini).", required=False) + parser.add_argument("-n", action="store", dest="name_server", default=None, help="Name server to use for the DNS query (ex: 8.8.8.8).", required=False) + parser.add_argument("-p", action="store", dest="http_proxy", default=None, help="HTTP proxy to use for all HTTP call to differents services (ex: http://88.198.50.103:9080).", required=False) + parser.add_argument("-s", action="store_true", dest="store_filetype_dork_result", default=False, help="Save the result of the Google/Bing Dork searching for interesting files to the file 'filetype_dork_result.txt'.", required=False) + parser.add_argument("-t", action="store", dest="request_timeout", type=int, default=DEFAULT_CALL_TIMEOUT, help="Delay in seconds allowed for a HTTP request to reply before to fall in timeout (ex: 20).", required=False) + parser.add_argument("-m", action="store", dest="mobile_app_store_country_code", default=MOBILE_APP_STORE_COUNTRY_STORE_CODE, help="Country code to define in which store mobile app will be searched (ex: LU).", required=False) args = parser.parse_args() api_key_config = configparser.ConfigParser() api_key_config["API_KEYS"] = {} @@ -969,14 +984,12 @@ def get_leakix_info(field_type, field_value, http_proxy): print(colored(f"[DNS] Extract the aliases...", "blue", attrs=["bold"])) cnames = get_cnames(args.domain_name, args.name_server) print_infos(cnames) - print(colored( - f"[WHOIS] Extract the owner information of the IP addresses...", "blue", attrs=["bold"])) + print(colored(f"[WHOIS] Extract the owner information of the IP addresses...", "blue", attrs=["bold"])) for ip in ips: print(colored(f"{ip}", "yellow", attrs=["bold"])) informations = get_ip_owner(ip, http_proxy_to_use) print_infos(informations, " ") - print(colored( - f"[SHODAN] Extract the general information of the IP addresses and the domain...", "blue", attrs=["bold"])) + print(colored(f"[SHODAN] Extract the general information of the IP addresses and the domain...", "blue", attrs=["bold"])) if "shodan" in api_key_config["API_KEYS"]: api_key = api_key_config["API_KEYS"]["shodan"] print(colored(f"{args.domain_name}", "yellow", attrs=["bold"])) @@ -998,8 +1011,7 @@ def get_leakix_info(field_type, field_value, http_proxy): is_single_ip = len(ips) < 2 for ip in ips: print(colored(f"{ip}", "yellow", attrs=["bold"])) - informations = get_shodan_cpe_cve_infos( - ip, api_key, http_proxy_to_use) + informations = get_shodan_cpe_cve_infos(ip, api_key, http_proxy_to_use) print_infos(informations, " ") # Add tempo due to API limitation (API methods are rate-limited to 1 request by second) if not is_single_ip: @@ -1065,8 +1077,7 @@ def get_leakix_info(field_type, field_value, http_proxy): print_infos(informations, " ") else: print(colored(f"Skipped because no API key was specified!", "red", attrs=["bold"])) - print(colored( - f"[BING] Apply Bing Dork for the domain, get the 50 first records (max per page allowed by the API)...", "blue", attrs=["bold"])) + print(colored(f"[BING] Apply Bing Dork for the domain, get the 50 first records (max per page allowed by the API)...", "blue", attrs=["bold"])) if "azure-cognitive-services-bing-web-search" in api_key_config["API_KEYS"]: file_types = " OR filetype:".join(INTERESTING_FILE_EXTENSIONS) dork = f"site:{args.domain_name} AND (filetype:{file_types})" @@ -1126,8 +1137,7 @@ def get_leakix_info(field_type, field_value, http_proxy): infos_for_ip = {} for ip in ips: infos_for_ip[ip] = get_intelx_infos(ip, api_key, http_proxy_to_use) - infos_for_domain = get_intelx_infos( - args.domain_name, api_key, http_proxy_to_use) + infos_for_domain = get_intelx_infos(args.domain_name, api_key, http_proxy_to_use) for ip in ips: print(colored(f"{ip}", "yellow", attrs=["bold"])) if len(infos_for_ip[ip]) > 0: @@ -1188,8 +1198,7 @@ def get_leakix_info(field_type, field_value, http_proxy): else: print_infos(informations["DATA"], prefix=" ") else: - print(colored(f"Skipped because no API key was specified!", - "red", attrs=["bold"])) + print(colored(f"Skipped because no API key was specified!", "red", attrs=["bold"])) print(colored(f"[GOOGLE PLAY + APPLE APP STORE] Verify if the company provide mobile apps on official stores...", "blue", attrs=["bold"])) print(colored("[i]", "green") + f" Searches were performed into application stores for the country code '{MOBILE_APP_STORE_COUNTRY_STORE_CODE}'.") print(colored(f"{args.domain_name}", "yellow", attrs=["bold"])) @@ -1219,6 +1228,20 @@ def get_leakix_info(field_type, field_value, http_proxy): print(f" https://search.0t.rocks/records?domain={args.domain_name}") for ip in ips: print(f" https://search.0t.rocks/records?ips={ip}") + print(colored(f"[NAPALM FTP INDEXER] Verify if entires are present for domain '{args.domain_name}', domain without TLD '{domain_no_tld}' and IPv4 addresses...", "blue", attrs=["bold"])) + print(colored("[i]", "green") + f" Go to https://www.searchftps.net for the details and content.") + print(colored(f"{args.domain_name}", "yellow", attrs=["bold"])) + informations = get_napalm_ftp_indexer_info(args.domain_name, http_proxy_to_use) + print_infos(informations["DATA"], " ") + print(colored(f"{domain_no_tld}", "yellow", attrs=["bold"])) + informations = get_napalm_ftp_indexer_info(domain_no_tld, http_proxy_to_use) + print_infos(informations["DATA"], " ") + for ip in ips: + # Skip IPV6 + if ":" not in ip: + print(colored(f"{ip}", "yellow", attrs=["bold"])) + informations = get_napalm_ftp_indexer_info(ip, http_proxy_to_use) + print_infos(informations["DATA"], " ") # Final processing delay = round(time.time() - start_time, 2) print("")