-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsearch.py
131 lines (119 loc) · 4.2 KB
/
search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import argparse
import requests
from time import sleep
import json
import os
parser = argparse.ArgumentParser(description="MalwareBazaar Advanced Search")
parser.add_argument("-s", "--search", type=str, help="Search String")
parser.add_argument("-l", "--limit", default=1000, type=int, help="Upper limit of number of hashes to pull per search (default: 1000) (max: 1000)")
parser.add_argument("--get-file", type=str, help="download this file hash")
parser.add_argument("--download-all", action="store_true", help="Download all files from a search and put them in a directory called 'samples'")
args = parser.parse_args()
search_string = args.search
limit = args.limit
get_file = args.get_file
download_all = args.download_all
api_url = "https://mb-api.abuse.ch/api/v1/"
def download_hash(hash):
request_data = {
"query": "get_file",
"sha256_hash": str(hash).lower()
}
download_request = requests.post(url=api_url, data=request_data)
with open(str(hash) + "_pw_infected.zip", "wb") as f:
f.write(download_request.content)
def convert_search_string(search_string):
key_conversion = {
"tag": "get_taginfo",
"file_type": "get_file_type",
"signature": "get_siginfo",
"clamav": "get_clamavinfo",
#"yara": "get_yarainfo", # not working
"serial_number": "get_certificate",
#"issuer_cn": "get_issuer", # not supported as it commonly includes spaceses
"imphash": "get_imphash",
"tlsh": "get_tlsh",
"telfhash": "get_telfhash",
"gimphash": "get_gimphash",
"dhash_icon": "get_dhash_icon"
}
filters = search_string.split(" ")
filters_coverted = []
for filter in filters:
split_filter = filter.split(":")
k = split_filter[0]
try:
q = key_conversion[split_filter[0]]
except KeyError:
print(f"[!] {k} not a valid search operator")
return
v = split_filter[1]
kv = {
'query': q,
k: v,
'limit': limit
}
filters_coverted.append(kv)
return filters_coverted
def search_mb(filters_coverted):
success_list = []
failure_list = []
for filter in filters_coverted:
print(f"[+] Parsing {filter}")
mb_request = requests.post(url=api_url, data=filter)
try:
mb_response = json.loads(mb_request.text)
except json.decoder.JSONDecodeError:
print("[!] JSON Failed To Load")
print(mb_request.text)
return_status = str(mb_response['query_status'])
if return_status != "ok":
failure_list.append(filter)
print("[!] Search Failed")
print(mb_request.text)
continue
data = mb_response["data"]
success_list.append(data)
sleep(1)
return success_list
def parse_results(data):
hashes = []
for result_set in data:
for result in result_set:
file_hash = str(result["sha256_hash"])
hashes.append(file_hash)
matches = set()
for file_hash in hashes:
if hashes.count(file_hash) == len(data): # number of sighting = number of searches
matches.add(file_hash)
if len(matches) == 0:
print("[+] No Matches Found")
elif len(matches) > 0:
print(f"[+] Found {str(len(matches))} matches")
if download_all:
print("[+] Downloading Files to samples/")
if len(matches) > 0:
for hash in matches:
print(f"[+] SHA256: {hash}")
if download_all:
try:
os.mkdir("samples")
except FileExistsError:
pass
os.chdir("samples")
download_hash(hash)
os.chdir("../")
if download_all:
print("[+] Finished Downloading Files")
def main():
if get_file and search_string:
print("[!] Cannot use --get-hash and --search at the same time")
return
if get_file:
download_hash(get_file)
if search_string:
print(f"[+] Searching {search_string}")
filters_coverted = convert_search_string(search_string)
data = search_mb(filters_coverted)
parse_results(data)
main()