-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathapkminer.py
executable file
·194 lines (152 loc) · 4.86 KB
/
apkminer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
#!/usr/bin/env python
import sys
import time
import types
import pprint
import signal
import logging
import argparse
import traceback
import cStringIO
import Queue # just for exceptions
import multiprocessing as mp
import os
from os import listdir
from os.path import isfile, join
from androguard.core.bytecodes import apk
from androguard.core.bytecodes import dvm
from androguard.core.analysis import analysis
import analyzers
def get_files_in_dir(dir_path):
return [f for f in listdir(dir_path) if isfile(join(dir_path, f))]
def logger_runner(log_file, res_queue, end_event):
print "started logger"
fd = open(log_file, "a")
while not end_event.is_set():
try:
log_data = res_queue.get(True, 1)
except Queue.Empty:
continue
fd.write(log_data)
fd.flush()
fd.close()
def output_processor(output_queue, end_event, analyzer_out_func):
data = []
while not end_event.is_set():
try:
data.append(output_queue.get(True, 1))
except Queue.Empty:
continue
analyzer_out_func(data)
def runner(func, args, queue, res_queue, output_data):
try:
func(args, queue, res_queue, output_data)
except:
raise Exception("".join(traceback.format_exception(*sys.exc_info())))
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def main():
parser = argparse.ArgumentParser(description='analyzer of APKs')
parser.add_argument("-i", "--in_dir", type=str,
help="directory of apk files to analyze", default=None)
parser.add_argument("-o", "--log_file", type=str,
help="log file to write to", default="OUTPUT.log")
parser.add_argument("-c", "--cores", type=int,
help="force a number of cores to use")
parser.add_argument("-a", "--analyzer", type=str,
help="Select the analyzer you want to use.", default="elf_files")
parser.add_argument("-l", "--list_analyzers", action="store_true",
help="List the possible analyzers")
args = parser.parse_args()
publics = (name for name in dir(analyzers) if not name.startswith('_'))
# dynamically get all analyzers in the directory
analyzer_funcs = {}
selected_output_func = None
selected_stream_func = None
for name in publics:
obj = getattr(analyzers, name)
if hasattr(obj, "analyze"):
analyzer_funcs[name] = obj
if args.list_analyzers:
print "Analyzers:"
for func_name, func in analyzer_funcs.iteritems():
print " %s" % func_name
return
if not args.in_dir:
print "Please provide a input directory with -i"
return
selected_analyzer = None
for func_name, obj in analyzer_funcs.iteritems():
if func_name == args.analyzer:
selected_analyzer = obj
if hasattr(obj, "output_results"):
selected_output_func = getattr(obj,"output_results")
elif hasattr(obj, "stream_results"):
selected_stream_func = getattr(obj,"stream_results")
break
if not selected_analyzer:
print "You selected a bad analyzer [%s]" % args.analyzer
print "Analyzers:"
for func_name, func in analyzer_funcs.iteritems():
print " %s" % func_name
return
if args.cores:
cores = args.cores
else:
cores = mp.cpu_count()
print "Starting '%s' analyzer with %d cores, log file: %s" % (selected_analyzer.__name__, cores, args.log_file)
apk_files = get_files_in_dir(args.in_dir)
# Enable for debugging info.
# mp.log_to_stderr(logging.DEBUG)
manager = mp.Manager()
pool = mp.Pool(cores + 2, init_worker)
apk_queue = manager.Queue()
# for logging
res_queue = manager.Queue()
# for data output
output_data = manager.Queue()
end_event = manager.Event()
# if we have a small count of APK files, limit our worker count
apk_count = len(apk_files)
if apk_count < cores:
cores = apk_count
for apk in apk_files:
apk_queue.put(apk)
try:
# TODO: make the runner handle multiple arg lists?
log_result = pool.apply_async(logger_runner, (args.log_file, res_queue, end_event))
if selected_output_func:
print "started output output_processor"
output_res = pool.apply_async(output_processor, (output_data, end_event, selected_output_func))
elif selected_stream_func:
print "started streaming output processor"
output_res = pool.apply_async(selected_stream_func, (output_data, end_event))
worker_results = []
for i in xrange(0, cores):
worker_results.append(pool.apply_async(runner, (selected_analyzer.analyze, args, apk_queue, res_queue, output_data)))
pool.close()
while len(worker_results) > 0:
for i, res in enumerate(worker_results):
if res.ready():
result = res.get()
if not res.successful():
print "one of the workers failed"
worker_results = []
break
else:
worker_results.pop(i)
time.sleep(1)
print "completed all work"
end_event.set()
pool.join()
# get the exception if the output func fails.
if selected_output_func or selected_stream_func:
output_res.get()
pool.terminate()
pool.join()
except KeyboardInterrupt:
print "Exiting!"
pool.terminate()
pool.join()
if __name__ == '__main__':
main()