Skip to content

Commit

Permalink
Add support for NAPALM FTP Indexer - #120
Browse files Browse the repository at this point in the history
  • Loading branch information
drighetto committed Jan 27, 2024
1 parent 814d64a commit 7bb9230
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 53 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ __pycache__/
filetype_dork_result.txt
test.py
dnsdumpster.*
*.tmp
129 changes: 76 additions & 53 deletions wpr.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
from dnsdumpster.DNSDumpsterAPI import DNSDumpsterAPI


USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
MOBILE_APP_STORE_COUNTRY_STORE_CODE = "LU" # Luxembourg
DEFAULT_CALL_TIMEOUT = 60 # 1 minute
DEFAULT_CALL_TIMEOUT = 30
WAPPALYZER_MAX_MONTHS_RESULT_OLD = 6
INTERESTING_FILE_EXTENSIONS = ["pdf", "doc", "docx", "xls", "xlsx", "ppt", "pptx", "pps", "odp", "ods", "odt", "rtf",
"java", "cs", "vb", "py", "rb", "zip", "tar", "gz", "7z", "eml", "msg", "sql", "ini",
Expand Down Expand Up @@ -127,8 +127,7 @@ def get_intelx_infos(ip_or_domain, api_key, http_proxy):
}
# First we must do a search
service_url = f"https://2.intelx.io/intelligent/search"
response = requests.post(service_url, data=json.dumps(payload), headers=request_headers,
proxies=web_proxies, verify=(http_proxy is None), timeout=DEFAULT_CALL_TIMEOUT)
response = requests.post(service_url, data=json.dumps(payload), headers=request_headers, proxies=web_proxies, verify=(http_proxy is None), timeout=DEFAULT_CALL_TIMEOUT)
if response.status_code != 200:
infos.append(f"HTTP response code {response.status_code} received for the search!")
return infos
Expand Down Expand Up @@ -171,11 +170,9 @@ def extract_infos_from_virus_total_response(http_response):
infos = []
if http_response.status_code != 200:
if http_response.status_code != 204:
infos.append(
f"HTTP response code {http_response.status_code} received!")
infos.append(f"HTTP response code {http_response.status_code} received!")
else:
infos.append(
f"Request rate limit exceeded: Wait one minute and re-run the script!")
infos.append(f"Request rate limit exceeded: Wait one minute and re-run the script!")
else:
results = http_response.json()
# From VT API doc regarding the "response_code" property:
Expand All @@ -196,11 +193,9 @@ def extract_infos_from_virus_total_response(http_response):
if "undetected_urls" in results:
urls_undetected_count = len(results["undetected_urls"])
if "detected_downloaded_samples" in results:
samples_detected_download_count = len(
results["detected_downloaded_samples"])
samples_detected_download_count = len(results["detected_downloaded_samples"])
if "undetected_downloaded_samples" in results:
samples_undetected_download_count = len(
results["undetected_downloaded_samples"])
samples_undetected_download_count = len(results["undetected_downloaded_samples"])
infos.append(f"URLs at this IP address that have at least one detection on a URL scan = {urls_detected_count}")
infos.append(f"URLs at this IP address with no detections on a URL scan = {urls_undetected_count}")
infos.append(f"Files that have been downloaded from this IP address with at least one AV detection = {samples_detected_download_count}")
Expand Down Expand Up @@ -369,8 +364,7 @@ def get_passive_shared_hosts(ip, http_proxy):
infos = []
# See https://www.threatminer.org/api.php
service_url = f"https://api.threatminer.org/v2/host.php?q={ip}&rt=2"
response = requests.get(service_url, headers={
"User-Agent": USER_AGENT}, proxies=web_proxies, verify=(http_proxy is None), timeout=DEFAULT_CALL_TIMEOUT)
response = requests.get(service_url, headers={"User-Agent": USER_AGENT}, proxies=web_proxies, verify=(http_proxy is None), timeout=DEFAULT_CALL_TIMEOUT)
if response.status_code != 200:
infos.append(f"HTTP response code {response.status_code} received from ThreatMiner API !")
else:
Expand Down Expand Up @@ -609,8 +603,7 @@ def get_certificate_transparency_log_subdomains(domain, http_proxy):
service_url = f"https://crt.sh/?q=%.{domain}&output=json"
response = requests.get(service_url, headers={"User-Agent": USER_AGENT}, proxies=web_proxies, verify=(http_proxy is None), timeout=DEFAULT_CALL_TIMEOUT)
if response.status_code != 200:
infos.append(
f"HTTP response code {response.status_code} received!")
infos.append(f"HTTP response code {response.status_code} received!")
return infos
results = response.json()
for entry in results:
Expand Down Expand Up @@ -655,13 +648,11 @@ def get_softwareheritage_infos(domain_or_ip, http_proxy):
# Set a long timeout (up to 4 minutes) because the response take a while to reply
response = requests.get(service_url, headers={"User-Agent": USER_AGENT}, proxies=web_proxies, verify=(http_proxy is None), timeout=DEFAULT_CALL_TIMEOUT)
if response.status_code != 200:
infos["DATA"].append(
f"HTTP response code {response.status_code} received!")
infos["DATA"].append(f"HTTP response code {response.status_code} received!")
return infos
results = response.json()
remaining_allowed_call_for_current_hour = response.headers["X-RateLimit-Remaining"]
next_call_count_reset = datetime.datetime.fromtimestamp(
int(response.headers["X-RateLimit-Reset"]))
next_call_count_reset = datetime.datetime.fromtimestamp(int(response.headers["X-RateLimit-Reset"]))
infos["LIMIT"] = f"{remaining_allowed_call_for_current_hour} call(s) can still be performed in the current hours (reseted at {next_call_count_reset})."
for entry in results:
infos["DATA"].append(entry["url"])
Expand Down Expand Up @@ -718,7 +709,7 @@ def get_mobile_app_infos(domain, http_proxy):
infos["DATA"].append(f"HTTP response code {response.status_code} received!")
return infos
results = response.text
android_bundle_regex = f"id=({domain_infos.suffix}\.{domain_infos.domain}\.[a-z0-9A-Z\.\-_]+)"
android_bundle_regex = f"id=({domain_infos.suffix}\\.{domain_infos.domain}\\.[a-z0-9A-Z\\.\\-_]+)"
bundles = re.findall(android_bundle_regex, results)
for bundle in bundles:
infos["DATA"].append(f"Android app found with PackageId '{bundle}'.")
Expand All @@ -742,16 +733,13 @@ def get_dns_dumpster_infos(domain, http_proxy):
if len(results) > 0:
data = results["dns_records"]
for entry in data["dns"]:
infos["DATA"].append(
f"[DNS ]: IP \"{entry['ip']}\" - Domain \"{entry['domain']}\" - ReverseDNS \"{entry['reverse_dns']}\" - AS \"{entry['as']}\"")
infos["DATA"].append(f"[DNS ]: IP \"{entry['ip']}\" - Domain \"{entry['domain']}\" - ReverseDNS \"{entry['reverse_dns']}\" - AS \"{entry['as']}\"")
for entry in data["mx"]:
infos["DATA"].append(
f"[MX ]: IP \"{entry['ip']}\" - Domain \"{entry['domain']}\" - ReverseDNS \"{entry['reverse_dns']}\" - AS \"{entry['as']}\"")
infos["DATA"].append(f"[MX ]: IP \"{entry['ip']}\" - Domain \"{entry['domain']}\" - ReverseDNS \"{entry['reverse_dns']}\" - AS \"{entry['as']}\"")
for entry in data["txt"]:
infos["DATA"].append(f"[TXT ]: {entry}")
for entry in data["host"]:
infos["DATA"].append(
f"[HOST]: IP \"{entry['ip']}\" - Domain \"{entry['domain']}\" - ReverseDNS \"{entry['reverse_dns']}\" - AS \"{entry['as']}\"")
infos["DATA"].append(f"[HOST]: IP \"{entry['ip']}\" - Domain \"{entry['domain']}\" - ReverseDNS \"{entry['reverse_dns']}\" - AS \"{entry['as']}\"")
if results["xls_data"] != None:
infos["XLS"] = base64.b64decode(results["xls_data"])
if results["image_data"] != None:
Expand Down Expand Up @@ -898,26 +886,53 @@ def get_leakix_info(field_type, field_value, http_proxy):
return infos


def get_napalm_ftp_indexer_info(domain, http_proxy):
infos = {"DATA": [], "ERROR": None}
# See https://www.searchftps.net/
service_url = f"https://www.searchftps.net/"
expected_response_marker = "showing results"
regex_results_count = r'Showing\s+results\s+\d+\s+to\s+\d+\s+of\s+about\s+(\d+)'
try:
web_proxies = configure_proxy(http_proxy)
req_session = requests.Session()
req_session.headers.update({"User-Agent": USER_AGENT, "Content-Type": "application/x-www-form-urlencoded"})
req_session.proxies.update(web_proxies)
req_session.verify = (http_proxy is None)
form_data = {"action": "result", "args": f"k={domain}&t=and&o=date-desc&s=0"}
response = req_session.post(url=service_url, data=form_data)
if response.status_code != 200:
infos["ERROR"] = f"HTTP response code {response.status_code} received!"
infos["DATA"].clear()
return infos
results = response.text
if expected_response_marker not in results.lower():
with open("debug.tmp", mode="w", encoding="utf-8") as f:
f.write(results)
infos["ERROR"] = f"Non expected response received, marker '{expected_response_marker}' not found, see 'debug.tmp' file generated."
infos["DATA"].clear()
return infos
results_count = re.findall(regex_results_count, results, re.IGNORECASE | re.MULTILINE)
if len(results_count) > 0 and int(results_count[0]) > 0:
infos["DATA"].append(f"{results_count[0]} entries present on the site.")
except Exception as e:
infos["ERROR"] = f"Error during web call: {str(e)}"
infos["DATA"].clear()
return infos


if __name__ == "__main__":
requests.packages.urllib3.disable_warnings()
colorama.init()
start_time = time.time()
parser = argparse.ArgumentParser()
required_params = parser.add_argument_group("required arguments")
required_params.add_argument("-d", action="store", dest="domain_name",
help="Domain to analyse (ex: righettod.eu).", required=True)
parser.add_argument("-a", action="store", dest="api_key_file", default=None,
help="Configuration INI file with all API keys (ex: conf.ini).", required=False)
parser.add_argument("-n", action="store", dest="name_server", default=None,
help="Name server to use for the DNS query (ex: 8.8.8.8).", required=False)
parser.add_argument("-p", action="store", dest="http_proxy", default=None,
help="HTTP proxy to use for all HTTP call to differents services (ex: http://88.198.50.103:9080).", required=False)
parser.add_argument("-s", action="store_true", dest="store_filetype_dork_result", default=False,
help="Save the result of the Google/Bing Dork searching for interesting files to the file 'filetype_dork_result.txt'.", required=False)
parser.add_argument("-t", action="store", dest="request_timeout", type=int, default=DEFAULT_CALL_TIMEOUT,
help="Delay in seconds allowed for a HTTP request to reply before to fall in timeout (ex: 20).", required=False)
parser.add_argument("-m", action="store", dest="mobile_app_store_country_code", default=MOBILE_APP_STORE_COUNTRY_STORE_CODE,
help="Country code to define in which store mobile app will be searched (ex: LU).", required=False)
required_params.add_argument("-d", action="store", dest="domain_name", help="Domain to analyse (ex: righettod.eu).", required=True)
parser.add_argument("-a", action="store", dest="api_key_file", default=None, help="Configuration INI file with all API keys (ex: conf.ini).", required=False)
parser.add_argument("-n", action="store", dest="name_server", default=None, help="Name server to use for the DNS query (ex: 8.8.8.8).", required=False)
parser.add_argument("-p", action="store", dest="http_proxy", default=None, help="HTTP proxy to use for all HTTP call to differents services (ex: http://88.198.50.103:9080).", required=False)
parser.add_argument("-s", action="store_true", dest="store_filetype_dork_result", default=False, help="Save the result of the Google/Bing Dork searching for interesting files to the file 'filetype_dork_result.txt'.", required=False)
parser.add_argument("-t", action="store", dest="request_timeout", type=int, default=DEFAULT_CALL_TIMEOUT, help="Delay in seconds allowed for a HTTP request to reply before to fall in timeout (ex: 20).", required=False)
parser.add_argument("-m", action="store", dest="mobile_app_store_country_code", default=MOBILE_APP_STORE_COUNTRY_STORE_CODE, help="Country code to define in which store mobile app will be searched (ex: LU).", required=False)
args = parser.parse_args()
api_key_config = configparser.ConfigParser()
api_key_config["API_KEYS"] = {}
Expand Down Expand Up @@ -969,14 +984,12 @@ def get_leakix_info(field_type, field_value, http_proxy):
print(colored(f"[DNS] Extract the aliases...", "blue", attrs=["bold"]))
cnames = get_cnames(args.domain_name, args.name_server)
print_infos(cnames)
print(colored(
f"[WHOIS] Extract the owner information of the IP addresses...", "blue", attrs=["bold"]))
print(colored(f"[WHOIS] Extract the owner information of the IP addresses...", "blue", attrs=["bold"]))
for ip in ips:
print(colored(f"{ip}", "yellow", attrs=["bold"]))
informations = get_ip_owner(ip, http_proxy_to_use)
print_infos(informations, " ")
print(colored(
f"[SHODAN] Extract the general information of the IP addresses and the domain...", "blue", attrs=["bold"]))
print(colored(f"[SHODAN] Extract the general information of the IP addresses and the domain...", "blue", attrs=["bold"]))
if "shodan" in api_key_config["API_KEYS"]:
api_key = api_key_config["API_KEYS"]["shodan"]
print(colored(f"{args.domain_name}", "yellow", attrs=["bold"]))
Expand All @@ -998,8 +1011,7 @@ def get_leakix_info(field_type, field_value, http_proxy):
is_single_ip = len(ips) < 2
for ip in ips:
print(colored(f"{ip}", "yellow", attrs=["bold"]))
informations = get_shodan_cpe_cve_infos(
ip, api_key, http_proxy_to_use)
informations = get_shodan_cpe_cve_infos(ip, api_key, http_proxy_to_use)
print_infos(informations, " ")
# Add tempo due to API limitation (API methods are rate-limited to 1 request by second)
if not is_single_ip:
Expand Down Expand Up @@ -1065,8 +1077,7 @@ def get_leakix_info(field_type, field_value, http_proxy):
print_infos(informations, " ")
else:
print(colored(f"Skipped because no API key was specified!", "red", attrs=["bold"]))
print(colored(
f"[BING] Apply Bing Dork for the domain, get the 50 first records (max per page allowed by the API)...", "blue", attrs=["bold"]))
print(colored(f"[BING] Apply Bing Dork for the domain, get the 50 first records (max per page allowed by the API)...", "blue", attrs=["bold"]))
if "azure-cognitive-services-bing-web-search" in api_key_config["API_KEYS"]:
file_types = " OR filetype:".join(INTERESTING_FILE_EXTENSIONS)
dork = f"site:{args.domain_name} AND (filetype:{file_types})"
Expand Down Expand Up @@ -1126,8 +1137,7 @@ def get_leakix_info(field_type, field_value, http_proxy):
infos_for_ip = {}
for ip in ips:
infos_for_ip[ip] = get_intelx_infos(ip, api_key, http_proxy_to_use)
infos_for_domain = get_intelx_infos(
args.domain_name, api_key, http_proxy_to_use)
infos_for_domain = get_intelx_infos(args.domain_name, api_key, http_proxy_to_use)
for ip in ips:
print(colored(f"{ip}", "yellow", attrs=["bold"]))
if len(infos_for_ip[ip]) > 0:
Expand Down Expand Up @@ -1188,8 +1198,7 @@ def get_leakix_info(field_type, field_value, http_proxy):
else:
print_infos(informations["DATA"], prefix=" ")
else:
print(colored(f"Skipped because no API key was specified!",
"red", attrs=["bold"]))
print(colored(f"Skipped because no API key was specified!", "red", attrs=["bold"]))
print(colored(f"[GOOGLE PLAY + APPLE APP STORE] Verify if the company provide mobile apps on official stores...", "blue", attrs=["bold"]))
print(colored("[i]", "green") + f" Searches were performed into application stores for the country code '{MOBILE_APP_STORE_COUNTRY_STORE_CODE}'.")
print(colored(f"{args.domain_name}", "yellow", attrs=["bold"]))
Expand Down Expand Up @@ -1219,6 +1228,20 @@ def get_leakix_info(field_type, field_value, http_proxy):
print(f" https://search.0t.rocks/records?domain={args.domain_name}")
for ip in ips:
print(f" https://search.0t.rocks/records?ips={ip}")
print(colored(f"[NAPALM FTP INDEXER] Verify if entires are present for domain '{args.domain_name}', domain without TLD '{domain_no_tld}' and IPv4 addresses...", "blue", attrs=["bold"]))
print(colored("[i]", "green") + f" Go to https://www.searchftps.net for the details and content.")
print(colored(f"{args.domain_name}", "yellow", attrs=["bold"]))
informations = get_napalm_ftp_indexer_info(args.domain_name, http_proxy_to_use)
print_infos(informations["DATA"], " ")
print(colored(f"{domain_no_tld}", "yellow", attrs=["bold"]))
informations = get_napalm_ftp_indexer_info(domain_no_tld, http_proxy_to_use)
print_infos(informations["DATA"], " ")
for ip in ips:
# Skip IPV6
if ":" not in ip:
print(colored(f"{ip}", "yellow", attrs=["bold"]))
informations = get_napalm_ftp_indexer_info(ip, http_proxy_to_use)
print_infos(informations["DATA"], " ")
# Final processing
delay = round(time.time() - start_time, 2)
print("")
Expand Down

0 comments on commit 7bb9230

Please sign in to comment.