-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathActiveMQ_RCE_Vulnerability_Checker.py
91 lines (73 loc) · 3.07 KB
/
ActiveMQ_RCE_Vulnerability_Checker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# 作者: VulnExpo
# 日期: 2023-10-26
import socket
import socks
import re
from distutils.version import StrictVersion
import argparse
import threading
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
def extract_ip_port_from_url(url):
url = url.replace("http://", "").replace("https://", "")
parts = url.split(":")
if len(parts) == 2:
ip, port = parts[0], parts[1]
return ip, int(port)
else:
print(f"无法解析 URL:{url}")
return None, None
def check_for_vulnerability(ip, port, proxies={}, success_file=None):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip, port))
sock.settimeout(10)
try:
response_data = sock.recv(1024)
version_match = re.search(r'ProviderVersion.*?([\d.]+)',response_data.decode('unicode_escape'))
if version_match:
version_str = version_match.group(1)
current_version = StrictVersion(version_str)
if (StrictVersion('5.18.0') <= current_version < StrictVersion('5.18.3')) or (current_version < StrictVersion('5.17.6')):
with open(success_file, 'a') as s_file:
s_file.write(f"++++++++++++++++++\n")
s_file.write(f"目标URL: {ip}:{port}\n")
s_file.write(f"漏洞版本: {current_version}\n\n")
else:
print(f'在 {ip}:{port} 的响应中找不到 ActiveMQ 提供程序版本')
except Exception as e:
print(f"发生异常:{e}")
finally:
sock.close()
def scan_targets(urls, proxies={}, success_file=None):
for url in urls:
url = url.strip()
ip, port = extract_ip_port_from_url(url)
if ip is not None and port is not None:
check_for_vulnerability(ip, port, proxies, success_file)
def multi_threaded_scan(urls, proxies={}, success_file=None, num_threads=4):
threads = []
for i in range(num_threads):
thread = threading.Thread(target=scan_targets, args=(urls[i::num_threads], proxies, success_file))
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Apache ActiveMQ (版本 < 5.18.3) 漏洞版本检测")
parser.add_argument("-u", "--url", help="目标URL")
parser.add_argument("-f", "--file", default="url.txt", help="目标URL列表,默认为url.txt")
parser.add_argument("-t", "--threads", type=int, default=4, help="线程数,默认为4")
args = parser.parse_args()
if not args.url and not args.file:
print("请使用 -u 指定要扫描的目标URL或使用默认文件 url.txt。")
exit(1)
if args.url:
urls = [args.url]
elif args.file:
with open(args.file, 'r') as file:
urls = file.readlines()
proxies = {}
success_file = 'success_targets.txt'
multi_threaded_scan(urls, proxies, success_file, args.threads)
print("扫描完成,成功的目标已保存到 success_targets.txt 文件中。")