-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2 from Gnomee1337/code_refactor
Refactored in Class system
- Loading branch information
Showing
5 changed files
with
696 additions
and
468 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,84 +1,107 @@ | ||
import datetime | ||
import logging | ||
import os | ||
import re | ||
|
||
import aiohttp | ||
import dns.resolver | ||
import aiofiles | ||
from bs4 import BeautifulSoup | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
class IPGatherer: | ||
def __init__(self): | ||
self.log_dir = os.path.normpath(os.path.join(os.path.realpath(__file__), '../../cache/viewdnsinfo_req_logs/')) | ||
os.makedirs(self.log_dir, exist_ok=True) | ||
self.all_ips = set() | ||
|
||
async def gather_ips(self, domains: set): | ||
for domain in domains: | ||
domain_ips = await self._ip_history_viewdnsinfo(domain) | ||
if domain_ips: | ||
domain_ips = await self._remove_original_ips(domain, domain_ips) | ||
await self._write_domain_related_ips_to_file(domain, domain_ips) | ||
self.all_ips.update(domain_ips) | ||
await self._write_all_possible_ips_to_file() | ||
return sorted(self.all_ips) | ||
|
||
async def ip_history_viewdnsinfo(domain: str): | ||
viewdnsinfo_ips_output = set() | ||
# Verify that 'viewdnsinfo_req_logs' directory exists | ||
if not os.path.isdir(os.path.normpath( | ||
os.path.dirname(os.path.join(os.path.realpath(__file__), '../../cache/viewdnsinfo_req_logs/')))): | ||
os.makedirs( | ||
os.path.normpath( | ||
os.path.dirname(os.path.join(os.path.realpath(__file__), '../../cache/viewdnsinfo_req_logs/')))) | ||
async with aiohttp.ClientSession() as session: | ||
# GET-Request for each domain | ||
async with session.get(f'https://viewdns.info/iphistory/?domain={domain}', | ||
timeout=3 | ||
) as resp: | ||
response_text = await resp.text() | ||
if not response_text.find("403 Forbidden - Naughty!"): | ||
print('SKIP Viewdnsinfo | Daily Limit Exceeded. (Possible bypass: new IP or use viewdns.info API Key)') | ||
return -403 | ||
else: | ||
# Write HTML-Response to file | ||
with open(os.path.normpath(os.path.join(os.path.realpath(__file__), | ||
f'../../cache/viewdnsinfo_req_logs/{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_HTML.txt')), | ||
'a') as get_request_file: | ||
get_request_file.write(response_text) | ||
async def _ip_history_viewdnsinfo(self, domain: str): | ||
viewdnsinfo_ips_output = set() | ||
async with aiohttp.ClientSession() as session: | ||
async with session.get(f'https://viewdns.info/iphistory/?domain={domain}', timeout=30) as resp: | ||
response_text = await resp.text() | ||
if "403 Forbidden - Naughty!" in response_text: | ||
print( | ||
'SKIP Viewdnsinfo | Daily Limit Exceeded. (Possible bypass: new IP or use viewdns.info API Key)') | ||
return -403 | ||
# Write HTML response to file | ||
await self._write_html_response(domain, response_text) | ||
# Setup soup | ||
soup = BeautifulSoup(response_text.encode('utf-8'), 'html.parser') | ||
rb = soup.find_all('table', {'border': '1'}) | ||
# Find all IPs in HTML-Response | ||
ip_pattern = re.compile(r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}') | ||
viewdnsinfo_ips_output.update(ip_pattern.findall(str(rb))) | ||
# Write only IPs to file | ||
with open(os.path.normpath(os.path.join(os.path.realpath(__file__), | ||
f'../../cache/viewdnsinfo_req_logs/{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_only_ips.txt')), | ||
'a') as domains_only_file: | ||
domains_only_file.write( | ||
"\n".join(str(viewdnsinfo_out_ips) for viewdnsinfo_out_ips in viewdnsinfo_ips_output)) | ||
return list(viewdnsinfo_ips_output) | ||
# Improved regex for IP address extraction | ||
ip_pattern = re.compile( | ||
r'\b(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' | ||
r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' | ||
r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' | ||
r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b' | ||
) | ||
# Find the table containing the IP addresses | ||
tables = soup.find_all('table', {'border': '1'}) | ||
for table in tables: # Iterate over each table found | ||
# Iterate through all <td> elements in the table | ||
for td in table.find_all('td'): | ||
text = td.get_text(strip=True) | ||
# Check if the text matches the IP pattern | ||
if ip_pattern.match(text): | ||
viewdnsinfo_ips_output.add(text) | ||
# Write only extracted IPs to file | ||
await self._write_extracted_ips_to_file(domain, viewdnsinfo_ips_output) | ||
return viewdnsinfo_ips_output | ||
|
||
async def _remove_original_ips(self, domain: str, all_domain_ips: set): | ||
try: | ||
# Resolve the original IPs for the given domain | ||
original_ips = dns.resolver.resolve(domain, 'A') | ||
for ip in original_ips: | ||
# Use the .to_text() method to get the string representation of the IP | ||
all_domain_ips.discard(ip.to_text()) | ||
except dns.exception.DNSException: | ||
pass # Handle DNS resolution errors silently | ||
return all_domain_ips | ||
|
||
async def _write_html_response(self, domain: str, response_text: str): | ||
# Use this sanitized domain when generating filenames | ||
sanitized_domain = self.sanitize_filename(domain) | ||
file_path = os.path.join(self.log_dir, | ||
f'{sanitized_domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_HTML.txt') | ||
async with aiofiles.open(file_path, 'w') as file: | ||
await file.write(response_text) | ||
|
||
async def _write_extracted_ips_to_file(self, domain: str, viewdnsinfo_ips_output: set): | ||
# Use this sanitized domain when generating filenames | ||
sanitized_domain = self.sanitize_filename(domain) | ||
file_path = os.path.join(self.log_dir, | ||
f'{sanitized_domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_only_ips.txt') | ||
async with aiofiles.open(file_path, 'w') as file: | ||
await file.write("\n".join(str(ip) for ip in viewdnsinfo_ips_output)) | ||
|
||
async def _write_domain_related_ips_to_file(self, domain: str, domain_ips: set): | ||
# Use this sanitized domain when generating filenames | ||
sanitized_domain = self.sanitize_filename(domain) | ||
file_path = os.path.normpath(os.path.join(os.path.realpath(__file__), | ||
f'../../cache/{sanitized_domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_IPs.txt')) | ||
async with aiofiles.open(file_path, 'w') as file: | ||
await file.write("\n".join(sorted(domain_ips))) | ||
|
||
async def _write_all_possible_ips_to_file(self): | ||
file_path = os.path.normpath(os.path.join(os.path.realpath(__file__), | ||
f'../../cache/ALL_DOMAINS_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_IPs.txt')) | ||
async with aiofiles.open(file_path, 'w') as file: | ||
await file.write("\n".join(str(ip) for ip in sorted(self.all_ips))) | ||
|
||
async def ip_gathering(domains: set): | ||
all_ips = set() | ||
for domain in domains: | ||
all_domain_ips = set() | ||
# Find all possible IPs for each domain | ||
all_domain_ips.update(await ip_history_viewdnsinfo(domain)) | ||
# Not found any IPs | ||
if len(all_domain_ips) == 0: | ||
continue | ||
else: | ||
# Remove original domain IP from list | ||
try: | ||
domain_original_ips = dns.resolver.resolve(domain, 'A') | ||
for ip in domain_original_ips: | ||
all_domain_ips.discard(str(ip)) | ||
except dns.exception.DNSException as e: | ||
# print(e) | ||
continue | ||
# Write to file all possible ips for domain | ||
with open(os.path.normpath(os.path.join(os.path.realpath(__file__), | ||
f'../../cache/{domain}_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_IPs.txt')), | ||
'a') as all_subdomains_ips_file: | ||
all_subdomains_ips_file.write( | ||
"\n".join(str(ip_in_ips_for_domain) for ip_in_ips_for_domain in sorted(all_domain_ips))) | ||
# Add all ips to 'all_ips' | ||
all_ips.update(all_domain_ips) | ||
# Clear set() for next ips gathering | ||
all_domain_ips.clear() | ||
# Write to file combination of ALL ips for every given domain as input | ||
with open(os.path.normpath(os.path.join(os.path.realpath(__file__), | ||
f'../../cache/ALL_DOMAINS_{datetime.datetime.now().strftime("%d-%m-%Y_%Hh%Mm%Ss")}_IPs.txt')), | ||
'a') as ips_for_all_domains: | ||
ips_for_all_domains.write( | ||
"\n".join(str(ip_in_all) for ip_in_all in sorted(all_ips))) | ||
return sorted(all_ips) | ||
def sanitize_filename(self, domain: str) -> str: | ||
# Remove non-alphanumeric characters (keep dots for domain separation) | ||
sanitized_domain = re.sub(r'[^A-Za-z0-9.-]+', '', domain) | ||
# Optionally limit the length of the domain name in the filename | ||
max_length = 50 | ||
if len(sanitized_domain) > max_length: | ||
sanitized_domain = sanitized_domain[:max_length] | ||
return sanitized_domain |
Oops, something went wrong.