-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain_netmonitor.py
executable file
·163 lines (126 loc) · 4.56 KB
/
main_netmonitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from contextlib import contextmanager
from selenium import webdriver
from selenium.common.exceptions import JavascriptException
from selenium.webdriver.firefox.firefox_profile import FirefoxProfile
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.firefox.service import Service
from urls_to_skip import urls_to_skip
import json
import logging
import os
import time
import subprocess
from urllib.parse import urlparse
from urls import URLS
logging.basicConfig(level=logging.INFO)
print("Available URLs:")
for index, url in enumerate(URLS):
print(f"{index}: {url}")
while True:
chosen_index = input("\nPlease enter the index of the URL: ")
try:
chosen_index = int(chosen_index)
except:
print("\nPlease enter a valid integer index.")
continue
if chosen_index >= len(URLS):
print("\nPlease choose a valid index from the list")
continue
url = URLS[chosen_index]
print(f"\nYou chose: {url}")
break
logging.info("Starting firefox session.")
root = os.path.dirname(__file__)
parsed_url = urlparse(url)
hostname = ".".join(parsed_url.hostname.split(".")[:-1])
cookies_file = f"cookies/{hostname}.json"
is_tunnel_enabled = False
def start_ssh_tunnel():
try:
command = ["ssh", "call", "-fN", "-D", "1080"]
subprocess.run(command, check=True)
logging.info("SSH command executed successfully.")
except subprocess.CalledProcessError as e:
logging.info(f"An error occurred while executing the SSH command: {e}")
except Exception as e:
logging.info(f"Unexpected error: {e}")
@contextmanager
def selenium_driver():
options=Options()
if is_tunnel_enabled:
firefox_profile = FirefoxProfile("/home/bikmetle/.mozilla/firefox/699rashk.default-release")
else:
firefox_profile = FirefoxProfile("/home/bikmetle/.mozilla/firefox/dlvoc3tb.default")
options.add_argument("--devtools")
firefox_profile.set_preference("devtools.toolbox.selectedTool", "netmonitor")
firefox_profile.set_preference("devtools.netmonitor.persistlog", True)
options.profile = firefox_profile
geckodriver_path = "/usr/local/bin/geckodriver"
driver_service = Service(executable_path=geckodriver_path)
driver = webdriver.Firefox(
service=driver_service,
options=options,
)
try:
logging.info("Start new firefox session.")
yield driver
finally:
driver.quit()
logging.info("Stop the firefox session.")
def saveCookies(driver):
cookies = driver.get_cookies()
with open(cookies_file, 'w') as file:
json.dump(cookies, file)
logging.info('New Cookies saved successfully')
def loadCookies():
dir_name, file_name = cookies_file.split("/")
if file_name in os.listdir(dir_name):
with open(cookies_file, 'r') as file:
cookies = json.load(file)
for cookie in cookies:
parsed_url = urlparse(driver.current_url)
domain = ".".join(parsed_url.hostname.split(".")[-2:])
cookie['domain']="."+domain
driver.add_cookie(cookie)
else:
logging.info('No cookies file found')
driver.refresh() # Refresh Browser after login
def get_har_data(attempt=0):
if attempt > 10:
raise
attempt += 1
time.sleep(attempt*2)
try:
har_data = driver.execute_async_script(
"HAR.triggerExport().then(arguments[0]);"
)
except JavascriptException:
return get_har_data(attempt)
return har_data
def save_har_data(har_data):
entry_count = 0
for entry in har_data['entries']:
entry_count += 1
if any(url in entry['request']['url'] for url in urls_to_skip):
continue
file = f"har_data/{project_name}/{entry['startedDateTime']}.json"
with open(file, 'w', encoding='utf-8') as f:
json.dump(entry, f, ensure_ascii=False, indent=4)
logging.info(f"{len(har_data['entries'])} requests saved")
with selenium_driver() as driver:
if is_tunnel_enabled:
start_ssh_tunnel()
driver.get(url)
loadCookies()
driver.get(url)
project_name = input("Enter the project name to save or type `exit`: ")
saveCookies(driver)
if project_name != 'exit':
try:
project_dir = f"har_data/{project_name}"
os.mkdir(project_dir)
except OSError as error:
logging.info(f"Failed to create folder: {error}")
driver.install_addon("har-export-trigger.zip", temporary=True)
har_data = get_har_data()
save_har_data(har_data)