Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 148 additions & 75 deletions minimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import array

import xmltodict
from itertools import combinations

from threading import Thread
from functools import partial
Expand Down Expand Up @@ -70,50 +71,100 @@ def removeHeader(self, req, target_header):
print("DEBUG: Header, target_Header", header, target_header)
new_headers.append(header)
return self._helpers.buildHttpMessage(new_headers, req[req_info.getBodyOffset():])

def _remove_header_field(self, req, target_header, target_field):
req_info = self._helpers.analyzeRequest(req)
new_headers = []
headers = req_info.getHeaders()
for header in headers:
if header.lower().startswith(target_header.lower()):
header_parts = header.split(': ', 1)
if len(header_parts) == 2:
header_name, header_value = header_parts
if target_header.lower() == 'cookie':
fields = [f for f in header_value.split('; ') if not f.strip().startswith(target_field + '=')]
if fields:
new_headers.append(header_name + ': ' + '; '.join(fields))
continue
new_headers.append(header)
return self._helpers.buildHttpMessage(new_headers, req[req_info.getBodyOffset():])

def _minimize_cookies(self, etalon, invariants, initial_req):
request_info = self._helpers.analyzeRequest(initial_req)

cookie_params = [p for p in request_info.getParameters() if p.getType() == IParameter.PARAM_COOKIE]
if not cookie_params:
return initial_req

original_cookie_header = [h for h in request_info.getHeaders() if h.lower().startswith('cookie:')]
if not original_cookie_header:
return initial_req

cookie_fields = [c.strip() for c in original_cookie_header[0].split(':', 1)[1].split(';')]

for i in range(1, len(cookie_fields) + 1):
for combo in combinations(cookie_fields, i):
new_cookie_value = '; '.join(combo)

headers = [h for h in request_info.getHeaders() if not h.lower().startswith('cookie:')]
if new_cookie_value:
headers.append("Cookie: " + new_cookie_value)

new_req = self._helpers.buildHttpMessage(headers, initial_req[request_info.getBodyOffset():])

resp = self._cb.makeHttpRequest(self._httpServ, new_req).getResponse()
if self.compare(etalon, resp, invariants):
print("Found a working cookie set:", combo)
return new_req

print("No minimal cookie set found. Falling back to the last working request.")
return initial_req

def _minimize(self, replace):
try:
self._fix_classloader_problems()
seen_json = seen_xml = False
request_info = self._helpers.analyzeRequest(self._request)
current_req = self._request.getRequest()

etalon = self._cb.makeHttpRequest(self._httpServ, current_req).getResponse()
etalon2 = self._cb.makeHttpRequest(self._httpServ, current_req).getResponse()
invariants = set(self._helpers.analyzeResponseVariations([etalon, etalon2]).getInvariantAttributes())
invariants -= IGNORED_INVARIANTS
print("Request invariants", invariants)
for param in request_info.getParameters():

temp_req = copy.copy(current_req)

for param in list(request_info.getParameters()):
param_type = param.getType()
if param_type in [IParameter.PARAM_URL, IParameter.PARAM_BODY, IParameter.PARAM_COOKIE]:
if param_type in [IParameter.PARAM_URL, IParameter.PARAM_BODY]:
print("Trying", param_type, param.getName(), param.getValue())
req = self._helpers.removeParameter(current_req, param)
resp = self._cb.makeHttpRequest(self._httpServ, req).getResponse()
req_without_param = self._helpers.removeParameter(temp_req, param)
resp = self._cb.makeHttpRequest(self._httpServ, req_without_param).getResponse()
if self.compare(etalon, resp, invariants):
print("excluded:", param.getType(), param.getName(), param.getValue())
current_req = self._fix_cookies(req)
else:
if param_type == IParameter.PARAM_JSON:
seen_json = True
elif param_type == IParameter.PARAM_XML:
seen_xml = True
else:
print("Unsupported type:", param.getType())

# minimize headers
# do not remove GET/POST and Host header -> skip first 2 elements
for header in request_info.getHeaders()[2:]:
req = self.removeHeader(current_req, header)
resp = self._cb.makeHttpRequest(self._httpServ, req).getResponse()
if self.compare(etalon, resp, invariants):
print("excluded: Header ", header)
current_req = self._fix_cookies(req)
print("excluded:", param.getName())
temp_req = self._fix_cookies(req_without_param)

request_info = self._helpers.analyzeRequest(temp_req)
headers_to_check = [h for h in request_info.getHeaders()[2:] if not h.lower().startswith('cookie:')]
for header in headers_to_check:
req_without_header = self.removeHeader(temp_req, header)
resp = self._cb.makeHttpRequest(self._httpServ, req_without_header).getResponse()
if self.compare(etalon, resp, invariants):
print("excluded: Header ", header)
temp_req = self._fix_cookies(req_without_header)

current_req = self._minimize_cookies(etalon, invariants, temp_req)
request_info = self._helpers.analyzeRequest(current_req)

seen_json = (request_info.getContentType() == IRequestInfo.CONTENT_TYPE_JSON or seen_json)
seen_xml = (request_info.getContentType() == IRequestInfo.CONTENT_TYPE_XML or seen_xml)
seen_json = (request_info.getContentType() == IRequestInfo.CONTENT_TYPE_JSON or any(p.getType() == IParameter.PARAM_JSON for p in request_info.getParameters()))
seen_xml = (request_info.getContentType() == IRequestInfo.CONTENT_TYPE_XML or any(p.getType() == IParameter.PARAM_XML for p in request_info.getParameters()))

if seen_json or seen_xml:
body_offset = request_info.getBodyOffset()
headers = self._request.getRequest()[:body_offset].tostring()
body = self._request.getRequest()[body_offset:].tostring()
headers = self._helpers.bytesToString(current_req[:body_offset])
body = self._helpers.bytesToString(current_req[body_offset:])

if seen_json:
print('Minimizing json...')
dumpmethod = partial(json.dumps, indent=4)
Expand All @@ -122,70 +173,93 @@ def _minimize(self, replace):
print('Minimizing XML...')
dumpmethod = partial(xmltodict.unparse, pretty=True)
loadmethod = xmltodict.parse
# The minimization routine for both xml and json is the same,
# the only difference is with load and dump functions
def check(body):
if len(body) == 0 and not seen_json:
# XML with and no root node is invalid

def check(body_data):
if isinstance(body_data, dict) and not body_data and not seen_json:
return False
body = str(dumpmethod(body))
req = fix_content_type(headers, body)
resp = self._cb.makeHttpRequest(self._httpServ, req).getResponse()
if self.compare(etalon, resp, invariants):
print("Not changed: " + body)
return True
else:
print("Changed: " + body)

try:
serialized_body = dumpmethod(body_data)
req = fix_content_type(headers, serialized_body)
resp = self._cb.makeHttpRequest(self._httpServ, req).getResponse()
return self.compare(etalon, resp, invariants)
except Exception as e:
print("Error during check:", e)
return False
body = loadmethod(body)
body = bf_search(body, check)
current_req = fix_content_type(headers, str(dumpmethod(body)))

body_data = loadmethod(body)
body_data = bf_search(body_data, check)
current_req = fix_content_type(headers, dumpmethod(body_data))

if replace:
self._request.setRequest(current_req)
else:
self._cb.sendToRepeater(
self._httpServ.getHost(),
self._httpServ.getPort(),
self._httpServ.getProtocol() == 'https',
current_req,
"minimized"
self._httpServ.getHost(),
self._httpServ.getPort(),
self._httpServ.getProtocol() == 'https',
current_req,
"minimized"
)
except:
print traceback.format_exc()

def bf_search(body, check_func):
print('Starting to minimize', body)
if isinstance(body, dict):
to_test = body.items()
assemble = lambda l : dict(l)
to_test = list(body.items())
elif type(body) == list:
to_test = zip(range(len(body)), body)
assemble = lambda l: list(zip(*sorted(l))[1] if len(l) else [])
#1. Test all sub-elements
tested = []
while len(to_test):
current = to_test.pop()
print('Trying to eliminate', current)
if not check_func(assemble(to_test+tested)):
tested.append(current)
#2. Recurse into remainig sub_items
to_test = tested
to_test = list(zip(range(len(body)), body))

tested = []
while len(to_test):
key, value = to_test.pop()
if isinstance(value,list) or isinstance(value, dict):
def check_func_rec(body):
return check_func(assemble(to_test + tested + [(key, body)]))
value = bf_search(value, check_func_rec)
tested.append((key, value))
return assemble(tested)
current_key, current_value = to_test.pop()

test_body = copy.deepcopy(body)
if isinstance(body, dict):
del test_body[current_key]
elif isinstance(body, list):
del test_body[current_key]

if check_func(test_body):
print('Successfully eliminated', current_key)
body = test_body
to_test = [(k, v) for k, v in to_test if (k != current_key if isinstance(body, dict) else k != current_key)]
else:
print('Could not eliminate', current_key)
tested.append((current_key, current_value))

if isinstance(body, dict):
for key in list(body.keys()):
value = body[key]
if isinstance(value, (list, dict)):
def check_func_rec(new_value):
test_body = copy.deepcopy(body)
test_body[key] = new_value
return check_func(test_body)
body[key] = bf_search(value, check_func_rec)
elif isinstance(body, list):
for i in range(len(body)):
value = body[i]
if isinstance(value, (list, dict)):
def check_func_rec(new_value):
test_body = copy.deepcopy(body)
test_body[i] = new_value
return check_func(test_body)
body[i] = bf_search(value, check_func_rec)

return body

def fix_content_type(headers, body):
headers = headers.split('\r\n')
for i in range(len(headers)):
if headers[i].lower().startswith('content-length'):
headers[i] = 'Content-Length: ' + str(len(body))
return array.array('b', '\r\n'.join(headers) + body)
new_headers = []
body_bytes = body.encode('utf-8')
for header in headers:
if header.lower().startswith('content-length'):
new_headers.append('Content-Length: ' + str(len(body_bytes)))
else:
new_headers.append(header)
return array.array('b', '\r\n'.join(new_headers) + '\r\n\r\n' + body_bytes)

class BurpExtender(IBurpExtender, IContextMenuFactory):
def registerExtenderCallbacks(self, callbacks):
Expand All @@ -201,13 +275,12 @@ def createMenuItems(self, invocation):
Minimizer(self._callbacks, invocation.getSelectedMessages()).minimize,
True
)
),
),
JMenuItem(
"Minimize in a new tab",
actionPerformed=partial(
Minimizer(self._callbacks, invocation.getSelectedMessages()).minimize,
False
)
),
]

),
]