diff --git a/MANIFEST.in b/MANIFEST.in index a9e1a081..1ad2a0de 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,5 +1,6 @@ prune media prune test prune docs +prune examples prune .github prune */__pycache__ diff --git a/docs/source/conf.py b/docs/source/conf.py index 6326b8fe..adc39e57 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -14,7 +14,7 @@ project = 'libdebug' copyright = '2024, Gabriele Digregorio, Roberto Alessandro Bertolini, Francesco Panebianco' author = 'JinBlack, Io_no, MrIndeciso, Frank01001' -release = '0.5.3' +release = '0.5.4' # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration diff --git a/docs/source/logging.rst b/docs/source/logging.rst index 4714dbbe..88b0bce9 100644 --- a/docs/source/logging.rst +++ b/docs/source/logging.rst @@ -29,13 +29,39 @@ The best of both worlds ----------------------- The `dbg` option is the combination of the `pipe` and `debugger` options. It displays all logs related to the debugging operations performed on the process by libdebug, as well as interactions with the process pipe: bytes received and bytes sent. -Temporary Logging ------------------ +Change logger levels at runtime +------------------------------- +Logger levels can be changed at runtime using the `libcontext` module. The following example shows how to change the logger levels at runtime. + +.. code-block:: python + + from libdebug import libcontext + libcontext.pipe_logger = 'DEBUG' + libcontext.debugger_logger = 'DEBUG' + libcontext.general_logger = 'DEBUG' + +The `general_logger` refers to the logger used for the general logs, different from the `pipe` and `debugger` logs. Logger levels can be temporarily enabled at runtime using a `with` statement, as shown in the following example. .. code-block:: python from libdebug import libcontext - with libcontext.tmp(pipe_logger='INFO', debugger_logger='DEBUG'): - r.sendline(b'gimme the flag') \ No newline at end of file + with libcontext.tmp(pipe_logger='SILENT', debugger_logger='DEBUG'): + r.sendline(b'gimme the flag') + +In this example, the `pipe_logger` is set to `SILENT`, and the `debugger_logger` is set to `DEBUG`. The logger levels are restored to their previous values when the `with` block is exited. + +The supported logger levels are the following: + +- ``pipe_logger``: ``DEBUG``, ``SILENT`` +- ``debugger_logger``: ``DEBUG``, ``SILENT`` +- ``general_logger``: ``DEBUG``, ``INFO``, ``WARNING``, ``SILENT`` + +The default logger levels are: + +- ``pipe_logger``: ``SILENT`` +- ``debugger_logger``: ``SILENT`` +- ``general_logger``: ``DEBUG`` + +The `DEBUG` level is the most verbose, including all logs. The `INFO` level includes all logs except for the `DEBUG` logs. The `WARNING` level includes only the `WARNING` logs. The `SILENT` level disables all logs. \ No newline at end of file diff --git a/examples/bof_detection/bof b/examples/bof_detection/bof new file mode 100755 index 00000000..5506074a Binary files /dev/null and b/examples/bof_detection/bof differ diff --git a/examples/bof_detection/bof.c b/examples/bof_detection/bof.c new file mode 100644 index 00000000..f7354067 --- /dev/null +++ b/examples/bof_detection/bof.c @@ -0,0 +1,25 @@ +// +// This file is part of libdebug Python library (https://github.com/libdebug/libdebug). +// Copyright (c) 2024 Francesco Panebianco. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for details. +// +// Compile with: +// gcc bof.c -o bof -fno-stack-protector -std=c99 + +#include + +int main(int argc, char** argv) { + char buffer[32]; + + // Setvbuf is used to disable buffering + setvbuf(stdin, NULL, _IONBF, 0); + setvbuf(stdout, NULL, _IONBF, 0); + + printf("libdebug 4 exploitation testing bench\n"); + + printf("Enter a string: "); + // This is a vulnerable function, it does not check the size of the input + gets(buffer); + + return 0; +} diff --git a/examples/bof_detection/detect_bof.py b/examples/bof_detection/detect_bof.py new file mode 100644 index 00000000..ef38779c --- /dev/null +++ b/examples/bof_detection/detect_bof.py @@ -0,0 +1,349 @@ +# +# This file is part of libdebug Python library (https://github.com/libdebug/libdebug). +# Copyright (c) 2024 Francesco Panebianco. All rights reserved. +# Licensed under the MIT license. See LICENSE file in the project root for details. +# +from libdebug import debugger, libcontext +from libdebug.utils.debugging_utils import resolve_address_in_maps +import iced_x86 as iced +import argparse +import os +import magic + +libcontext.sym_lvl = 5 + +########################################### +# -------- Linux Terminal Colors -------- # +########################################### +LT_COLOR_RED = "31" +LT_COLOR_GREEN = "32" +LT_COLOR_YELLOW = "33" +LT_COLOR_BLUE = "34" +LT_COLOR_MAGENTA = "35" +LT_COLOR_CYAN = "36" +LT_COLOR_WHITE = "37" +LT_COLOR_DEFAULT = "39" + + +####################################### +# -------- Utility Functions -------- # +####################################### + +def p64(in_bytes): + return in_bytes.to_bytes(8, byteorder='little') + +def p32(in_bytes): + return in_bytes.to_bytes(4, byteorder='little') + +def p16(in_bytes): + return in_bytes.to_bytes(2, byteorder='little') + +def p8(in_bytes): + return in_bytes.to_bytes(1, byteorder='little') + +def u64(in_bytes): + return int.from_bytes(in_bytes, byteorder='little') + +def u32(in_bytes): + return int.from_bytes(in_bytes, byteorder='little') + +def u16(in_bytes): + return int.from_bytes(in_bytes, byteorder='little') + +def u8(in_bytes): + return int.from_bytes(in_bytes, byteorder='little') + +def print_color(message, color, end='\n'): + print(f"\033[{color}m{message}\033[0m", end=end) + +has_crashed = False +rip_overwritten = False +rbp_overwritten = False +canary_overwritten = False +fortify_failed = False + +# Constants +MAX_AMD64_INSTRUCTION_LENGTH = 15 +MAX_TEST_LEN = 128 + +########################################## +# ----------- Initialization ----------- # +########################################## + +# Initialize a formatter +formatter = iced.Formatter(iced.FormatterSyntax.INTEL) + +# Assumption: Input is given from stdin instead of a file or a command line argument +# Different scenarios can easily be handled by changing the input source + +# Create the parser +parser = argparse.ArgumentParser(description='Find vulnerabilities in an AMD64 Linux ELF') + +# Add the --maxlen argument +parser.add_argument('--maxlen', type=int, help='maximum length of the input to test') + +# Add the positional argument for the file name +parser.add_argument('filename', type=str, help='the path to the file to process') + +# Parse the arguments +args = parser.parse_args() + +# Print the arguments +print(f'File name: {args.filename}') +if args.maxlen is not None: + print(f'Max length: {args.maxlen}') + +# Get the ELF file +ELF_PATH = args.filename +MAX_TEST_LEN = args.maxlen if args.maxlen is not None else MAX_TEST_LEN + +# Check if the ELF file exists and is valid +if not os.path.exists(ELF_PATH): + print(f"File {ELF_PATH} does not exist.") + exit(1) +elif not os.access(ELF_PATH, os.R_OK): + print(f"File {ELF_PATH} is not readable.") + exit(1) +elif 'ELF 64-bit LSB pie executable, x86-64' not in magic.from_file(ELF_PATH): + print(f"File {ELF_PATH} is not a 64-bit ELF file.") + exit(1) + +######################################### +# ------ Step 1 - "Fuzz" the ELF ------ # +######################################### + +for test_padding_len in range(0, MAX_TEST_LEN, 4): + print(f"[+] Testing payload length {test_padding_len}...") + + d = debugger(ELF_PATH) + + test_payload = b'A' * test_padding_len + + pipe = d.run() + + # Break on check stack canary and fortify fail + check_stack_fail_br = d.breakpoint('__stack_chk_fail', file='libc.so.6') + check_fortify_fail_br = d.breakpoint('__fortify_fail', file='libc.so.6') + + # Catch SIGSEGV and SIGABRT + sig1_hdlr = d.catch_signal(signal='SIGSEGV') + sig2_hdlr = d.catch_signal(signal='SIGABRT') + + d.cont() + + pipe.sendline(test_payload) + + d.wait() + + if sig1_hdlr.hit_on(d): + print(">> Crashed with payload: ", test_payload) + print_color(f">> Received signal: SIGSEGV", color=LT_COLOR_RED) + has_crashed = True + elif sig2_hdlr.hit_on(d): + print(">> Crashed with payload: ", test_payload) + print_color(f">> Received signal: SIGABRT", color=LT_COLOR_RED) + has_crashed = True + elif check_stack_fail_br.hit_on(d): + print(">> Crashed with payload: ", test_payload) + print_color(f">> Stack Canary check failed", color=LT_COLOR_RED) + has_crashed = True + canary_overwritten = True + elif check_fortify_fail_br.hit_on(d): + print(">> Crashed with payload: ", test_payload) + print_color(f">> Fortify check failed", color=LT_COLOR_RED) + has_crashed = True + fortify_failed = True + + if has_crashed: + print_color(f"[+] Crash detected with payload length {test_padding_len}", color=LT_COLOR_YELLOW) + print_color('[+] Post-mortem analysis initiated', color=LT_COLOR_YELLOW) + + curr_rip = d.regs.rip + + # Check for RIP overwrite + if '4141' in hex(d.regs.rip): + print_color("--> RIP is overwritten with AAAA <--", color=LT_COLOR_RED) + rip_overwritten = True + else: + print(f"RIP is at {hex(curr_rip)}") + print("Disassembling the instruction at RIP...") + + # Dump instruction at rip + window_from_rip = d.memory[curr_rip, MAX_AMD64_INSTRUCTION_LENGTH, 'absolute'] + + # Disassemble the instruction (ignoring bytes that are not part of the instruction) + decoder = iced.Decoder(64, window_from_rip) + decoder.ip = curr_rip + + instruction = decoder.decode() + + # Get the instruction bytes and convert to hex bytes separated by spaces + instruction_bytes = window_from_rip[:instruction.len] + instruction_bytes_str = " ".join(f"{b:02X}" for b in instruction_bytes) + + # If the current rip corresponds to a known symbol, print the symbol + try: + symbol = resolve_address_in_maps(curr_rip, d.maps()) + + if not symbol.startswith("0x"): + print_color(f"<{symbol}> ", color=LT_COLOR_CYAN, end="") + except ValueError: + pass + + # Decode and print each instruction + asm = formatter.format(instruction) + print_color(f"{hex(instruction.ip)}: {instruction_bytes_str.ljust(2*4, ' ')} | {asm}", color=LT_COLOR_CYAN) + + # Check for RBP overwrite + if not rip_overwritten and '4141' in hex(d.regs.rbp): + print_color("--> RBP is overwritten with AAAA <--", color=LT_COLOR_RED) + print_color("Stack pivot detected", color=LT_COLOR_RED) + rbp_overwritten = True + else: + print(f"RBP is at {hex(d.regs.rbp)}") + + # Shut up the warnings + libcontext.general_logger = 'SILENT' + + # Stack trace + print_color('\nStack trace:', color=LT_COLOR_RED) + d.print_backtrace() + + d.kill() + d.terminate() + print() + + # We found the input to inspect, but let's see if we find also a way to control RIP + if has_crashed and rip_overwritten: + break + +if not has_crashed: + print_color("[+] No crash detected. Exiting", color=LT_COLOR_GREEN) + exit(0) + +if canary_overwritten: + print_color("[+] Stack canary overwritten", color=LT_COLOR_YELLOW) + print_color("[+] It is possible that an exploit is feasible given a leak of the canary", color=LT_COLOR_YELLOW) + exit(0) + +if fortify_failed: + print_color("[+] Fortify check failed", color=LT_COLOR_YELLOW) + print_color("[+] This mitigation can prevent a lot of exploits, but some workarounds are possible", color=LT_COLOR_YELLOW) + exit(0) + +if not rip_overwritten: + print_color("[+] RIP is not overwritten", color=LT_COLOR_YELLOW) + + if rbp_overwritten: + print_color("[+] However, the analyzer was able to detect a stack pivot", color=LT_COLOR_YELLOW) + print_color("[+] Given a leak, we could use the stack pivot take control of the execution", color=LT_COLOR_YELLOW) + + exit(0) + +# If we reach this point, we have a crash and RIP is overwritten +# We can now proceed to the part where we check which part of the input is used to overwrite RIP + +print('\n-----------------------------------\n\n') + +print("[+] A payload length that overwrites RIP has been found.") +print("[+] Starting taint analysis to find the setup.") + +######################################### +# ------ Step 2 - Taint Analysis ------ # +######################################### + +has_found_setup = False +taint_offset = -1 + +for taint_start_index in range(0, test_padding_len + 8, 8): + taint_end_index = taint_start_index + 8 + + TAINT = p64(0xdeadc0de) + + print(f"[+] Analyzing taint from {taint_start_index} to {taint_end_index}...") + + test_payload = b'A' * taint_start_index + TAINT + b'A' * (test_padding_len - taint_end_index) + + print(f">> Testing payload: {test_payload}") + + d = debugger('bof') + + pipe = d.run() + + sigsegv_checker = d.catch_signal(signal='SIGSEGV') + + d.cont() + + pipe.sendline(test_payload) + + d.wait() + + if sigsegv_checker.hit_on(d): + # Search registers for traces of the taint + print_color("Received SIGSEGV", color=LT_COLOR_RED) + + # Dump registers + print_color("Registers:", color=LT_COLOR_RED) + print_color(f"RAX: {hex(d.regs.rax)}", color=LT_COLOR_RED) + print_color(f"RBX: {hex(d.regs.rbx)}", color=LT_COLOR_RED) + print_color(f"RCX: {hex(d.regs.rcx)}", color=LT_COLOR_RED) + print_color(f"RDX: {hex(d.regs.rdx)}", color=LT_COLOR_RED) + print_color(f"RDI: {hex(d.regs.rdi)}", color=LT_COLOR_RED) + print_color(f"RSI: {hex(d.regs.rsi)}", color=LT_COLOR_RED) + print_color(f"RBP: {hex(d.regs.rbp)}", color=LT_COLOR_RED) + print_color(f"RSP: {hex(d.regs.rsp)}", color=LT_COLOR_RED) + print_color(f"RIP: {hex(d.regs.rip)}", color=LT_COLOR_RED) + + # Say if any of the registers contain the taint + if d.regs.rax == u64(TAINT): + print("RAX contains the taint.") + if d.regs.rbx == u64(TAINT): + print("RBX contains the taint.") + if d.regs.rcx == u64(TAINT): + print("RCX contains the taint.") + if d.regs.rdx == u64(TAINT): + print("RDX contains the taint.") + if d.regs.rdi == u64(TAINT): + print("RDI contains the taint.") + if d.regs.rsi == u64(TAINT): + print("RSI contains the taint.") + if d.regs.rbp == u64(TAINT): + print("RBP contains the taint.") + if d.regs.rsp == u64(TAINT): + print("RSP contains the taint.") + if d.regs.rip == u64(TAINT): + print("RIP contains the taint.") + has_found_setup = True + taint_offset = taint_start_index + break + + # Searching memory for the taint + print("Searching memory for the taint...") + + for map in d.maps(): + # Skip non-writable maps (e.g., vsyscall) + if 'w' not in map.permissions: + continue + + pid = d.threads[0].process_id + + with open(f'/proc/{pid}/mem', 'rb', 0) as mem: + mem.seek(map.start) + memory_content = mem.read(map.end - map.start) + + if TAINT in memory_content: + print_color(f">> Taint found in {map.backing_file} at address {hex(map.start + memory_content.index(TAINT))}", color=LT_COLOR_RED) + elif d.dead: + print(f">> Program exited with code {d.exit_code} and signal {d.exit_signal}") + + d.kill() + d.terminate() + +if not has_found_setup: + print_color("[+] No setup found. Exiting...", color=LT_COLOR_GREEN) + exit(0) + +print_color(f"[+] Found setup at offset {taint_offset}", color=LT_COLOR_CYAN) +print_color("[+] This vulnerability can be exploited on systems with Intel CET disabled", color=LT_COLOR_CYAN) + +# Future expansion: proxy the output to check for useful leaks (e.g, libc addresses, stack addresses, etc.) \ No newline at end of file diff --git a/examples/bof_detection/usage.txt b/examples/bof_detection/usage.txt new file mode 100644 index 00000000..f4ebbc75 --- /dev/null +++ b/examples/bof_detection/usage.txt @@ -0,0 +1,2 @@ +To run this use case, write +$ python detect_bof.py --maxlen 128 bof \ No newline at end of file diff --git a/examples/codecov/coverage.py b/examples/codecov/coverage.py new file mode 100644 index 00000000..fb4ee600 --- /dev/null +++ b/examples/codecov/coverage.py @@ -0,0 +1,284 @@ +# +# This file is part of libdebug Python library (https://github.com/libdebug/libdebug). +# Copyright (c) 2024 Roberto Alessandro Bertolini. All rights reserved. +# Licensed under the MIT license. See LICENSE file in the project root for details. +# + +# The following example demonstrates how libdebug can be used, in conjunction with +# other libraries, to efficiently perform unit testing and code coverage analysis on +# a compiled executable. + +import base64 +import capstone +import libdebug +import pwn + + +# We use pwntools to load the binary and extract the function we want to test +# This can be performed manually using pyelftools or similar libraries +main = pwn.ELF("./main", checksec=False) +function_name = "long_from_base64_decimal_str" +function = main.functions[function_name] +function_asm = main.read(function.address, function.size) + + +# This function uses the Capstone disassembler to find all branches in a specific function +# It does so by looking for all conditional jumps (jcc) and computing the target address +# It is not perfect, as some binaries may use register-based jumps, or other more complex +# control flow structures, but it works well for most cases +def detect_function_branches(function_asm: str) -> list: + """Find all branches in a function.""" + md = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64) + md.detail = True + branches = set() + for i in md.disasm(function_asm, function.address): + if i.mnemonic.startswith("j") and i.mnemonic != "jmp": + try: + branches.add((i.address, i.operands[0].imm, i.address + i.size)) + except: + print(f"Failed to parse branch at {hex(i.address)}") + + return branches + + +branches = detect_function_branches(function_asm) + + +# This function sets up a debugger instance, as provided by libdebug, and sets up +# breakpoints on all branches detected in the function. It returns the debugger, +# along with the pipe to the target process and a list of breakpoints set. +def setup_debugger(): + """Setup the debugger.""" + def empty_callback(_, __): pass + + debugger = libdebug.debugger("main") + pipe = debugger.run() + breakpoints = [] + for b in branches: + bp_conditional = debugger.breakpoint(b[0], callback=empty_callback) + bp_target = debugger.breakpoint(b[1], callback=empty_callback) + bp_non_hit = debugger.breakpoint(b[2], callback=empty_callback) + breakpoints.append((bp_conditional, bp_target, bp_non_hit)) + + return (debugger, pipe, breakpoints) + + +# We use a global variable to store the coverage information +# This is a dictionary where the key is the branch address and the value is a set +# containing two elements: whether the conditional branch was hit and whether the +# target branch was not hit. This allows us to calculate the coverage of the function +# by counting the number of branches that were hit. +coverage = {branch[1]: set() for branch in branches} + + +# This function is used to register the coverage information for a specific set of breakpoints +# It iterates over all breakpoints and checks whether the conditional branch was hit and whether +# the target branch was not hit. It then updates the coverage dictionary accordingly. +def register_coverage(bps): + for bp in bps: + # Check that we hit a conditional jump instruction + if bp[0].hit_count > 0: + # Check that the took the jump + coverage[bp[1].address].add(bp[1].hit_count > 0) + # Check that we did not take the jump + coverage[bp[1].address].add(bp[2].hit_count == 0) + +# This function calculates the coverage of the function by counting the number of branches +# that were hit. It does so by summing the number of branches that were hit and dividing +# by the total number of branches. +def calculate_coverage(): + covered = sum(len(coverage[branch]) for branch in coverage) + total = len(coverage) * 2 + return covered / total + + +def test_correct_input(): + number = base64.b64encode(b"1234567890") + debugger, pipe, breakpoints = setup_debugger() + debugger.cont() + pipe.recvline() + pipe.sendline(number) + debugger.wait() + register_coverage(breakpoints) + assert pipe.recvline().strip() == b"1234567890" + debugger.terminate() + +def test_empty_string(): + debugger, pipe, breakpoints = setup_debugger() + debugger.cont() + pipe.recvline() + pipe.sendline(b"") + debugger.wait() + register_coverage(breakpoints) + assert pipe.recvline().strip() == b"Invalid input string" + debugger.terminate() + +def test_invalid_length_base64(): + number = base64.b64encode(b"1234567890")[:-1] + debugger, pipe, breakpoints = setup_debugger() + debugger.cont() + pipe.recvline() + pipe.sendline(number) + debugger.wait() + register_coverage(breakpoints) + assert pipe.recvline().strip() == b"Invalid input string" + debugger.terminate() + +def test_invalid_base64_characters(): + number = base64.b64encode(b"1234567890")[:-1] + b"\xf0" + debugger, pipe, breakpoints = setup_debugger() + debugger.cont() + pipe.recvline() + pipe.sendline(number) + debugger.wait() + register_coverage(breakpoints) + assert pipe.recvline().strip() == b"Invalid input string" + debugger.terminate() + +def test_out_of_range_base64_characters_1(): + number = b"::::" + debugger, pipe, breakpoints = setup_debugger() + debugger.cont() + pipe.recvline() + pipe.sendline(number) + debugger.wait() + register_coverage(breakpoints) + assert pipe.recvline().strip() == b"Invalid input string" + debugger.terminate() + +def test_out_of_range_base64_characters_2(): + number = b"!!!!" + debugger, pipe, breakpoints = setup_debugger() + debugger.cont() + pipe.recvline() + pipe.sendline(number) + debugger.wait() + register_coverage(breakpoints) + assert pipe.recvline().strip() == b"Invalid input string" + debugger.terminate() + +def test_out_of_range_base64_characters_3(): + number = b"//++" + debugger, pipe, breakpoints = setup_debugger() + debugger.cont() + pipe.recvline() + pipe.sendline(number) + debugger.wait() + register_coverage(breakpoints) + assert pipe.recvline().strip() == b"0" + debugger.terminate() + +def test_out_of_range_base64_characters_4(): + number = b"{{}}" + debugger, pipe, breakpoints = setup_debugger() + debugger.cont() + pipe.recvline() + pipe.sendline(number) + debugger.wait() + register_coverage(breakpoints) + assert pipe.recvline().strip() == b"Invalid input string" + debugger.terminate() + +def test_out_of_range_base64_characters_5(): + number = b"\x1f\x1f\x1f\x1f" + debugger, pipe, breakpoints = setup_debugger() + debugger.cont() + pipe.recvline() + pipe.sendline(number) + debugger.wait() + register_coverage(breakpoints) + assert pipe.recvline().strip() == b"Invalid input string" + debugger.terminate() + +# This test validates that the function correctly handles null input +# This is a tricky condition to hit, as the compiled binary will not +# pass a null pointer to the function, but we can manually set the +# rdi register to 0 to simulate this condition and check that the +# function correctly handles it. +def test_null_input(): + number = base64.b64encode(b"1234567890") + debugger, pipe, breakpoints = setup_debugger() + + # Set an additional breakpoint at the beginning of the function to test + debugger.breakpoint("long_from_base64_decimal_str") + + debugger.cont() + pipe.sendline(number) + + # We are in the prologue of the function, set the first argument to 0 + # to simulate a null pointer being passed + debugger.regs.rdi = 0 + + debugger.cont() + pipe.recvline() + debugger.wait() + register_coverage(breakpoints) + assert pipe.recvline().strip() == b"Invalid input string" + debugger.terminate() + +# This test validates that the function correctly handles a malloc failure +# We can simulate this condition by setting the size of the allocation to +# the maximum value of a 64-bit integer, which will cause the malloc function +# to fail and return NULL. We can then check that the function correctly handles +# this condition and returns an error message. +def test_malloc_failure(): + number = base64.b64encode(b"1234567890") + debugger, pipe, breakpoints = setup_debugger() + + # Set a breakpoint on malloc and simulate a failure + def bad_malloc(t, _): + # Make the malloc call fail if we are trying to allocate the input string + if t.regs.rdi == len(number): + t.regs.rdi = 2**64 - 1 + + # Set a breakpoint on malloc, located in libc + debugger.breakpoint("malloc", callback=bad_malloc, file="libc") + + debugger.cont() + pipe.recvline() + pipe.sendline(number) + debugger.wait() + register_coverage(breakpoints) + assert pipe.recvline().strip() == b"Invalid input string" + debugger.terminate() + +def test_integer_overflow(): + number = base64.b64encode(b"123456789012345678901234567890") + debugger, pipe, breakpoints = setup_debugger() + debugger.cont() + pipe.recvline() + pipe.sendline(number) + debugger.wait() + register_coverage(breakpoints) + assert pipe.recvline().strip() == b"Invalid input string" + debugger.terminate() + +if __name__ == "__main__": + print("Testing correct input") + test_correct_input() + print(f"Coverage: {calculate_coverage()}") + print("Testing invalid input: empty string") + test_empty_string() + print(f"Coverage: {calculate_coverage()}") + print("Testing invalid input: bad base64 length") + test_invalid_length_base64() + print(f"Coverage: {calculate_coverage()}") + print("Testing invalid input: bad base64 characters") + test_invalid_base64_characters() + print(f"Coverage: {calculate_coverage()}") + print("Testing all valid base64 characters") + test_out_of_range_base64_characters_1() + test_out_of_range_base64_characters_2() + test_out_of_range_base64_characters_3() + test_out_of_range_base64_characters_4() + test_out_of_range_base64_characters_5() + print(f"Coverage: {calculate_coverage()}") + print("Testing invalid input: null input") + test_null_input() + print(f"Coverage: {calculate_coverage()}") + print("Testing malloc failure") + test_malloc_failure() + print(f"Coverage: {calculate_coverage()}") + print("Testing integer overflow") + test_integer_overflow() + print(f"Coverage: {calculate_coverage()}") diff --git a/examples/codecov/main b/examples/codecov/main new file mode 100755 index 00000000..df9fad03 Binary files /dev/null and b/examples/codecov/main differ diff --git a/examples/codecov/main.c b/examples/codecov/main.c new file mode 100644 index 00000000..f49fbbdb --- /dev/null +++ b/examples/codecov/main.c @@ -0,0 +1,111 @@ +// +// This file is part of libdebug Python library (https://github.com/libdebug/libdebug). +// Copyright (c) 2024 Roberto Alessandro Bertolini. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for details. +// + +#include +#include +#include +#include + +int long_from_base64_decimal_str(char *str) +{ + // Validate that the input string is not NULL + if (str == NULL) { + return -1; + } + + size_t len = strlen(str); + + // Validate that the input string is not empty + if (len == 0) { + return -1; + } + + // Validate that the input string is a multiple of 4 + if (len % 4 != 0) { + return -1; + } + + // Validate that the input string is a valid base64 string + for (size_t i = 0; i < len; i++) { + if (str[i] < 0x20 || str[i] > 0x7E) { + return -1; + } + } + + // Convert the base64 string to its byte values + for (size_t i = 0; i < len; i++) { + if (str[i] >= 'A' && str[i] <= 'Z') { + str[i] = str[i] - 'A'; + } else if (str[i] >= 'a' && str[i] <= 'z') { + str[i] = str[i] - 'a' + 26; + } else if (str[i] >= '0' && str[i] <= '9') { + str[i] = str[i] - '0' + 52; + } else if (str[i] == '+') { + str[i] = 62; + } else if (str[i] == '/') { + str[i] = 63; + } else if (str[i] == '=') { + str[i] = 0; + } else { + return -1; + } + } + + // Allocate memory for the decoded string + char *dec_str = (char *)malloc(len); + + // Validate that the memory allocation was successful + if (dec_str == NULL) { + return -1; + } + + // Decode the base64 string + size_t j = 0; + for (size_t i = 0; i < len; i += 4, j += 3) { + dec_str[j] = (str[i] << 2) | (str[i + 1] >> 4); + dec_str[j + 1] = (str[i + 1] << 4) | (str[i + 2] >> 2); + dec_str[j + 2] = (str[i + 2] << 6) | str[i + 3]; + } + + long result = 0; + + // Clear errno + errno = 0; + + // Convert the decoded string to a long + result = strtol(dec_str, NULL, 10); + + // Check for errors during the conversion + if (errno != 0) { + // Free the memory allocated for the decoded string + free(dec_str); + return -1; + } + + // Free the memory allocated for the decoded string + free(dec_str); + + return result; +} + +int main() +{ + puts("Please enter a base64 decimal string:"); + + char str[256]; + fgets(str, sizeof(str), stdin); + + // Remove the newline character from the input string + str[strcspn(str, "\n")] = '\0'; + + long result = long_from_base64_decimal_str(str); + + if (result == -1) { + puts("Invalid input string"); + } else { + printf("%ld\n", result); + } +} \ No newline at end of file diff --git a/examples/python_bytecode_debugging/libpython3.12.so.1.0 b/examples/python_bytecode_debugging/libpython3.12.so.1.0 new file mode 100755 index 00000000..10d8ef0e Binary files /dev/null and b/examples/python_bytecode_debugging/libpython3.12.so.1.0 differ diff --git a/examples/python_bytecode_debugging/output b/examples/python_bytecode_debugging/output new file mode 100644 index 00000000..8169d111 --- /dev/null +++ b/examples/python_bytecode_debugging/output @@ -0,0 +1,21 @@ +Executed opcode: 0x97 - RESUME +Executed opcode: 0x64 - LOAD_CONST +Executed opcode: 0x5a - STORE_NAME +Executed opcode: 0x64 - LOAD_CONST +Executed opcode: 0x5a - STORE_NAME +Executed opcode: 0x64 - LOAD_CONST +Executed opcode: 0x5a - STORE_NAME +Executed opcode: 0x65 - LOAD_NAME +Executed opcode: 0x65 - LOAD_NAME +Executed opcode: 0x7a - BINARY_OP +Executed opcode: 0x65 - LOAD_NAME +Executed opcode: 0x7a - BINARY_OP +Executed opcode: 0x5a - STORE_NAME +Executed opcode: 0x2 - PUSH_NULL +Executed opcode: 0x65 - LOAD_NAME +Executed opcode: 0x65 - LOAD_NAME +Executed opcode: 0xab - CALL +Executed opcode: 0x1 - POP_TOP +Executed opcode: 0x79 - RETURN_CONST +Executed opcode: 0x3 - INTERPRETER_EXIT +The result printed by the python script is: -40 diff --git a/examples/python_bytecode_debugging/python3.12 b/examples/python_bytecode_debugging/python3.12 new file mode 100755 index 00000000..e8cf2b6b Binary files /dev/null and b/examples/python_bytecode_debugging/python3.12 differ diff --git a/examples/python_bytecode_debugging/python_bytecode_dumper.py b/examples/python_bytecode_debugging/python_bytecode_dumper.py new file mode 100644 index 00000000..c1eb59d9 --- /dev/null +++ b/examples/python_bytecode_debugging/python_bytecode_dumper.py @@ -0,0 +1,146 @@ +# +# This file is part of libdebug Python library (https://github.com/libdebug/libdebug). +# Copyright (c) 2024 Gabriele Digregorio. All rights reserved. +# Licensed under the MIT license. See LICENSE file in the project root for details. +# + +from libdebug import debugger, libcontext +from capstone import Cs, CS_ARCH_X86, CS_MODE_64 +from elftools.elf.elffile import ELFFile + + +############################################################## +###### Automatically find the offsets of interest ###### +############################################################## + +def find_patterns_in_section(section, md): + """Find patterns in the given section. We want to find every DISPATCHER inlined in the libpython shared library.""" + jmp_rax_offsets = [] + prev_instructions = [] + + for i in md.disasm(section.data(), section['sh_addr']): + prev_instructions.append((i.mnemonic, i.op_str)) + if len(prev_instructions) > 3: + prev_instructions.pop(0) + + if len(prev_instructions) == 3: + if (prev_instructions[0] == ('movzx', 'eax, r15b') and + (prev_instructions[1] == ('mov', 'rax, qword ptr [rdi + rax*8]') + or + prev_instructions[1] == ('mov', 'rax, qword ptr [rbx + rax*8]') + or + prev_instructions[1] == ('mov', 'rax, qword ptr [rdx + rax*8]') + or + prev_instructions[1] == ('mov', 'rax, qword ptr [rcx + rax*8]') + or + prev_instructions[1] == ('mov', 'rax, qword ptr [r8 + rax*8]') + or + prev_instructions[1] == ('mov', 'rax, qword ptr [r9 + rax*8]') + or + prev_instructions[1] == ('mov', 'rax, qword ptr [rsi + rax*8]') + ) + and + prev_instructions[2] == ('jmp', 'rax')): + jmp_rax_offsets.append(i.address) + elif (prev_instructions[0] == ('mov', 'r15, rax') and + (prev_instructions[1] == ('mov', 'rax, qword ptr [rdi + rax*8]') + or + prev_instructions[1] == ('mov', 'rax, qword ptr [rbx + rax*8]') + or + prev_instructions[1] == ('mov', 'rax, qword ptr [rdx + rax*8]') + or + prev_instructions[1] == ('mov', 'rax, qword ptr [rcx + rax*8]') + or + prev_instructions[1] == ('mov', 'rax, qword ptr [r8 + rax*8]') + or + prev_instructions[1] == ('mov', 'rax, qword ptr [r9 + rax*8]') + or + prev_instructions[1] == ('mov', 'rax, qword ptr [rsi + rax*8]') + ) + and + prev_instructions[2] == ('jmp', 'rax')): + jmp_rax_offsets.append(i.address) + + return jmp_rax_offsets + +with open('./libpython3.12.so.1.0', 'rb') as f: + elf = ELFFile(f) + md = Cs(CS_ARCH_X86, CS_MODE_64) + + # Collect all offsets from executable sections + all_offsets = [] + for section in elf.iter_sections(): + if section['sh_flags'] & 0x4: # SHF_EXECINSTR + offsets = find_patterns_in_section(section, md) + all_offsets.extend(offsets) + + +############################################################## +###### Dump the executed python opcodes with libdebug ###### +############################################################## + +# Create a dictionary with python opcodes as keys and mnemonics as values. +# The dict is hardcoded to avoid differences in the opcode values between +# the python versions used to dump the opcodes and the python version of +# the python interpreter used to execute the python script under analysis. +opcode_to_mnemonic = { + 0x97: "RESUME", + 0x64: "LOAD_CONST", + 0x5a: "STORE_NAME", + 0x65: "LOAD_NAME", + 0x7a: "BINARY_OP", + 0x2: "PUSH_NULL", + 0xab: "CALL", + 0x1: "POP_TOP", + 0x79: "RETURN_CONST", + 0x3: "INTERPRETER_EXIT" +} + +def dumper(t,_): + """Callback function to dump the executed python opcode.""" + print(f"Executed opcode: {t.regs.r15:#x} - {opcode_to_mnemonic.get(t.regs.r15, 'UNKNOWN')}") + +d = debugger(["./python3.12", "python_script.py"], env={"LD_LIBRARY_PATH": "."}) + +# Set the symbol level to 5, this will enable the debugger to resolve symbols using debuinfod files +libcontext.sym_lvl = 5 + +r = d.run() + +# This function is executed before each chunk of python bytecode is interpreted +bp_run_mod = d.breakpoint("run_mod", file = "libpython3", hardware=True) + +# Start the execution +d.cont() + +# Wait for the breakpoint to be hit +d.wait() + +while bp_run_mod.hit_on(d): + if bp_run_mod.hit_count == 2: + # At this point of the execution, the python bytecode releted to the python script + # is executed. We can now set breakpoints on the offsets we found in the shared library + for addr in all_offsets: + d.breakpoint(addr, callback=dumper, file = "libpython3") + # Set a breakpoint on the _PyArena_Free function, executed when the python script ends + bp_py_arena_free = d.breakpoint("_PyArena_Free", file = "libpython3") + # Set a breakpoint on the binary operation function + bp_binary_op = d.breakpoint(0x189f93, file = "libpython3") + d.cont() + d.wait() + +while bp_binary_op.hit_on(d): + # Transform the binary operation into a subtraction + d.regs.rdx = 0xa # 0xa is the subtraction + d.cont() + d.wait() + +if bp_py_arena_free.hit_on(d): + # The python script has ended, we can now disable the breakpoints + bp_py_arena_free.disable() + for bp in d.breakpoints.values(): + bp.disable() + +print("The result printed by the python script is:", r.recvline().decode()) + +d.kill() \ No newline at end of file diff --git a/examples/python_bytecode_debugging/python_script.py b/examples/python_bytecode_debugging/python_script.py new file mode 100644 index 00000000..9c7c185b --- /dev/null +++ b/examples/python_bytecode_debugging/python_script.py @@ -0,0 +1,5 @@ +a = 10 +b = 20 +c = 30 +d = a + b + c +print(d) \ No newline at end of file diff --git a/libdebug/architectures/amd64/amd64_stack_unwinder.py b/libdebug/architectures/amd64/amd64_stack_unwinder.py index a07e077a..0a82e8b4 100644 --- a/libdebug/architectures/amd64/amd64_stack_unwinder.py +++ b/libdebug/architectures/amd64/amd64_stack_unwinder.py @@ -9,10 +9,13 @@ from typing import TYPE_CHECKING from libdebug.architectures.stack_unwinding_manager import StackUnwindingManager +from libdebug.liblog import logging if TYPE_CHECKING: from libdebug.state.thread_context import ThreadContext + + class Amd64StackUnwinder(StackUnwindingManager): """Class that provides stack unwinding for the x86_64 architecture.""" @@ -48,6 +51,18 @@ def unwind(self: Amd64StackUnwinder, target: ThreadContext) -> list: except (OSError, ValueError): break + # If we are in the prolouge of a function, we need to get the return address from the stack + # using a slightly more complex method + try: + first_return_address = self.get_return_address(target) + + if first_return_address != stack_trace[1]: + stack_trace.insert(1, first_return_address) + except (OSError, ValueError): + logging.WARNING( + "Failed to get the return address from the stack. Check stack frame registers (e.g., base pointer). The stack trace may be incomplete.", + ) + return stack_trace def get_return_address(self: Amd64StackUnwinder, target: ThreadContext) -> int: diff --git a/libdebug/cffi/ptrace_cffi_source.c b/libdebug/cffi/ptrace_cffi_source.c index f843b250..8ee7da4e 100644 --- a/libdebug/cffi/ptrace_cffi_source.c +++ b/libdebug/cffi/ptrace_cffi_source.c @@ -554,9 +554,15 @@ void enable_breakpoint(struct global_state *state, uint64_t address) while (b != NULL) { if (b->addr == address) { b->enabled = 1; + break; } b = b->next; } + + // Patch the instruction with the breakpoint + if (b != NULL) { + ptrace(PTRACE_POKEDATA, state->t_HEAD->tid, (void *)address, b->patched_instruction); + } } void disable_breakpoint(struct global_state *state, uint64_t address) @@ -566,9 +572,15 @@ void disable_breakpoint(struct global_state *state, uint64_t address) while (b != NULL) { if (b->addr == address) { b->enabled = 0; + break; } b = b->next; } + + // Restore the original instruction + if (b != NULL) { + ptrace(PTRACE_POKEDATA, state->t_HEAD->tid, (void *)address, b->instruction); + } } void free_breakpoints(struct global_state *state) diff --git a/libdebug/data/memory_view.py b/libdebug/data/memory_view.py index 64c0c9de..44ef6793 100644 --- a/libdebug/data/memory_view.py +++ b/libdebug/data/memory_view.py @@ -151,23 +151,29 @@ def _manage_memory_read_type(self: MemoryView, key: int | slice | str | tuple, f the "binary" map file). """ if isinstance(key, int): - address = self._internal_debugger.resolve_address(key, file) - return self.read(address, 1) + address = self._internal_debugger.resolve_address(key, file, skip_absolute_address_validation=True) + try: + return self.read(address, 1) + except OSError as e: + raise ValueError("Invalid address.") from e elif isinstance(key, slice): if isinstance(key.start, str): start = self._internal_debugger.resolve_symbol(key.start, file) else: - start = self._internal_debugger.resolve_address(key.start, file) + start = self._internal_debugger.resolve_address(key.start, file, skip_absolute_address_validation=True) if isinstance(key.stop, str): stop = self._internal_debugger.resolve_symbol(key.stop, file) else: - stop = self._internal_debugger.resolve_address(key.stop, file) + stop = self._internal_debugger.resolve_address(key.stop, file, skip_absolute_address_validation=True) if stop < start: raise ValueError("Invalid slice range.") - return self.read(start, stop - start) + try: + return self.read(start, stop - start) + except OSError as e: + raise ValueError("Invalid address.") from e elif isinstance(key, str): address = self._internal_debugger.resolve_symbol(key, file) @@ -207,11 +213,14 @@ def _manage_memory_read_tuple(self: MemoryView, key: tuple) -> bytes: if isinstance(address, str): address = self._internal_debugger.resolve_symbol(address, file) elif isinstance(address, int): - address = self._internal_debugger.resolve_address(address, file) + address = self._internal_debugger.resolve_address(address, file, skip_absolute_address_validation=True) else: raise TypeError("Invalid type for the address. Expected int or string.") - return self.read(address, size) + try: + return self.read(address, size) + except OSError as e: + raise ValueError("Invalid address.") from e def _manage_memory_write_type( self: MemoryView, @@ -229,19 +238,26 @@ def _manage_memory_write_type( the "binary" map file). """ if isinstance(key, int): - address = self._internal_debugger.resolve_address(key, file) - self.write(address, value) + address = self._internal_debugger.resolve_address(key, file, skip_absolute_address_validation=True) + try: + self.write(address, value) + except OSError as e: + raise ValueError("Invalid address.") from e elif isinstance(key, slice): if isinstance(key.start, str): start = self._internal_debugger.resolve_symbol(key.start, file) else: - start = self._internal_debugger.resolve_address(key.start, file) + start = self._internal_debugger.resolve_address(key.start, file, skip_absolute_address_validation=True) if key.stop is not None: if isinstance(key.stop, str): stop = self._internal_debugger.resolve_symbol(key.stop, file) else: - stop = self._internal_debugger.resolve_address(key.stop, file) + stop = self._internal_debugger.resolve_address( + key.stop, + file, + skip_absolute_address_validation=True, + ) if stop < start: raise ValueError("Invalid slice range") @@ -249,7 +265,10 @@ def _manage_memory_write_type( if len(value) != stop - start: liblog.warning(f"Mismatch between slice width and value size, writing {len(value)} bytes.") - self.write(start, value) + try: + self.write(start, value) + except OSError as e: + raise ValueError("Invalid address.") from e elif isinstance(key, str): address = self._internal_debugger.resolve_symbol(key, file) @@ -292,14 +311,17 @@ def _manage_memory_write_tuple(self: MemoryView, key: tuple, value: bytes) -> No if isinstance(address, str): address = self._internal_debugger.resolve_symbol(address, file) elif isinstance(address, int): - address = self._internal_debugger.resolve_address(address, file) + address = self._internal_debugger.resolve_address(address, file, skip_absolute_address_validation=True) else: raise TypeError("Invalid type for the address. Expected int or string.") if len(value) != size: liblog.warning(f"Mismatch between specified size and actual value size, writing {len(value)} bytes.") - self.write(address, value) + try: + self.write(address, value) + except OSError as e: + raise ValueError("Invalid address.") from e def __delitem__(self: MemoryView, key: int | slice | str | tuple) -> None: """MemoryView doesn't support deletion.""" diff --git a/libdebug/debugger/internal_debugger.py b/libdebug/debugger/internal_debugger.py index 25a7e386..107d1cc9 100644 --- a/libdebug/debugger/internal_debugger.py +++ b/libdebug/debugger/internal_debugger.py @@ -207,7 +207,7 @@ def start_processing_thread(self: InternalDebugger) -> None: ) self.__polling_thread.start() - def _background_invalid_call(self: InternalDebugger) -> None: + def _background_invalid_call(self: InternalDebugger, *_: ..., **__: ...) -> None: """Raises an error when an invalid call is made in background mode.""" raise RuntimeError("This method is not available in a callback.") @@ -994,12 +994,18 @@ def get_thread_by_id(self: InternalDebugger, thread_id: int) -> ThreadContext: return None - def resolve_address(self: InternalDebugger, address: int, backing_file: str) -> int: + def resolve_address( + self: InternalDebugger, + address: int, + backing_file: str, + skip_absolute_address_validation: bool = False, + ) -> int: """Normalizes and validates the specified address. Args: address (int): The address to normalize and validate. backing_file (str): The backing file to resolve the address in. + skip_absolute_address_validation (bool, optional): Whether to skip bounds checking for absolute addresses. Defaults to False. Returns: int: The normalized and validated address. @@ -1007,6 +1013,9 @@ def resolve_address(self: InternalDebugger, address: int, backing_file: str) -> Raises: ValueError: If the substring `backing_file` is present in multiple backing files. """ + if skip_absolute_address_validation and backing_file == "absolute": + return address + maps = self.debugging_interface.maps() if backing_file in ["hybrid", "absolute"]: @@ -1285,12 +1294,9 @@ def __threaded_migrate_from_gdb(self: InternalDebugger) -> None: self.debugging_interface.migrate_from_gdb() def __threaded_peek_memory(self: InternalDebugger, address: int) -> bytes | BaseException: - try: - value = self.debugging_interface.peek_memory(address) - # TODO: this is only for amd64 - return value.to_bytes(8, "little") - except BaseException as e: - return e + value = self.debugging_interface.peek_memory(address) + # TODO: this is only for amd64 + return value.to_bytes(8, "little") def __threaded_poke_memory(self: InternalDebugger, address: int, data: bytes) -> None: int_data = int.from_bytes(data, "little") diff --git a/libdebug/liblog.py b/libdebug/liblog.py index b118b350..0e377047 100644 --- a/libdebug/liblog.py +++ b/libdebug/liblog.py @@ -32,12 +32,16 @@ def __init__(self: LibLog) -> None: if self._initialized: return + # Add custom log levels + logging.addLevelName(60, "SILENT") + logging.SILENT = 60 + # General logger self.general_logger = self._setup_logger("libdebug", logging.INFO) # Component-specific loggers - self.debugger_logger = self._setup_logger("debugger", logging.INFO) - self.pipe_logger = self._setup_logger("pipe", logging.INFO) + self.debugger_logger = self._setup_logger("debugger", logging.SILENT) + self.pipe_logger = self._setup_logger("pipe", logging.SILENT) self._initialized = True diff --git a/libdebug/ptrace/ptrace_status_handler.py b/libdebug/ptrace/ptrace_status_handler.py index dbdc0bf2..2717ac13 100644 --- a/libdebug/ptrace/ptrace_status_handler.py +++ b/libdebug/ptrace/ptrace_status_handler.py @@ -77,37 +77,33 @@ def _handle_breakpoints(self: PtraceStatusHandler, thread_id: int) -> bool: bp: None | Breakpoint - enabled_breakpoints = {} - for bp in self.internal_debugger.breakpoints.values(): - if bp.enabled and not bp._disabled_for_step: - enabled_breakpoints[bp.address] = bp - - bp = None - - if ip in enabled_breakpoints: + bp = self.internal_debugger.breakpoints.get(ip) + if bp and bp.enabled and not bp._disabled_for_step: # Hardware breakpoint hit liblog.debugger("Hardware breakpoint hit at 0x%x", ip) - bp = self.internal_debugger.breakpoints[ip] else: # If the trap was caused by a software breakpoint, we need to restore the original instruction # and set the instruction pointer to the previous instruction. ip -= software_breakpoint_byte_size() - if ip in enabled_breakpoints: + bp = self.internal_debugger.breakpoints.get(ip) + if bp and bp.enabled and not bp._disabled_for_step: # Software breakpoint hit liblog.debugger("Software breakpoint hit at 0x%x", ip) - bp = self.internal_debugger.breakpoints[ip] # Set the instruction pointer to the previous instruction thread.instruction_pointer = ip # Link the breakpoint to the thread, so that we can step over it bp._linked_thread_ids.append(thread_id) + else: + # If the breakpoint has been hit but is not enabled, we need to reset the bp variable + bp = None # Manage watchpoints - if bp is None: + if not bp: bp = self.ptrace_interface.hardware_bp_helpers[thread_id].is_watchpoint_hit() - if bp is not None: + if bp: liblog.debugger("Watchpoint hit at 0x%x", bp.address) if bp: diff --git a/libdebug/state/thread_context.py b/libdebug/state/thread_context.py index f0b5d193..c65d0e0b 100644 --- a/libdebug/state/thread_context.py +++ b/libdebug/state/thread_context.py @@ -13,6 +13,7 @@ ) from libdebug.liblog import liblog from libdebug.utils.debugging_utils import resolve_address_in_maps +from libdebug.utils.print_style import PrintStyle from libdebug.utils.signal_utils import resolve_signal_name, resolve_signal_number if TYPE_CHECKING: @@ -173,20 +174,44 @@ def signal(self: ThreadContext, signal: str | int) -> None: self._signal_number = signal self._internal_debugger.resume_context.threads_with_signals_to_forward.append(self.thread_id) - def backtrace(self: ThreadContext) -> list: - """Returns the current backtrace of the thread.""" - internal_debugger = self._internal_debugger - internal_debugger._ensure_process_stopped() + def backtrace(self: ThreadContext, as_symbols: bool = False) -> list: + """Returns the current backtrace of the thread. + + Args: + as_symbols (bool, optional): Whether to return the backtrace as symbols + """ + self._internal_debugger._ensure_process_stopped() + stack_unwinder = stack_unwinding_provider() + backtrace = stack_unwinder.unwind(self) + if as_symbols: + maps = self._internal_debugger.debugging_interface.maps() + backtrace = [resolve_address_in_maps(x, maps) for x in backtrace] + return backtrace + + def print_backtrace(self: ThreadContext) -> None: + """Prints the current backtrace of the thread.""" + self._internal_debugger._ensure_process_stopped() stack_unwinder = stack_unwinding_provider() backtrace = stack_unwinder.unwind(self) - maps = internal_debugger.debugging_interface.maps() - return [resolve_address_in_maps(x, maps) for x in backtrace] + maps = self._internal_debugger.debugging_interface.maps() + for return_address in backtrace: + return_address_symbol = resolve_address_in_maps(return_address, maps) + if return_address_symbol[:2] == "0x": + print(f"{PrintStyle.RED}{return_address:#x} {PrintStyle.RESET}") + else: + print(f"{PrintStyle.RED}{return_address:#x} <{return_address_symbol}> {PrintStyle.RESET}") def current_return_address(self: ThreadContext) -> int: """Returns the return address of the current function.""" self._internal_debugger._ensure_process_stopped() stack_unwinder = stack_unwinding_provider() - return stack_unwinder.get_return_address(self) + + try: + return_address = stack_unwinder.get_return_address(self) + except (OSError, ValueError) as e: + raise ValueError("Failed to get the return address. Check stack frame registers (e.g., base pointer).") from e + + return return_address def step(self: ThreadContext) -> None: """Executes a single instruction of the process.""" diff --git a/libdebug/utils/elf_utils.py b/libdebug/utils/elf_utils.py index f441040e..f01e989f 100644 --- a/libdebug/utils/elf_utils.py +++ b/libdebug/utils/elf_utils.py @@ -53,6 +53,7 @@ def _debuginfod(buildid: str) -> Path: debuginfod_path = Path.home() / ".cache" / "debuginfod_client" / buildid / "debuginfo" if not debuginfod_path.exists(): + liblog.info(f"Downloading debuginfo file for buildid {buildid}") _download_debuginfod(buildid, debuginfod_path) return debuginfod_path diff --git a/libdebug/utils/libcontext.py b/libdebug/utils/libcontext.py index 22029f33..f502f9cc 100644 --- a/libdebug/utils/libcontext.py +++ b/libdebug/utils/libcontext.py @@ -17,6 +17,9 @@ class LibContext: """A class that holds the global context of the library.""" _instance = None + _pipe_logger_levels: list[str] + _debugger_logger_levels: list[str] + _general_logger_levels: list[str] def __new__(cls: type): """Create a new instance of the class if it does not exist yet. @@ -34,10 +37,13 @@ def __init__(self: LibContext) -> None: if self._initialized: return + self._pipe_logger_levels = ["DEBUG", "SILENT"] + self._debugger_logger_levels = ["DEBUG", "SILENT"] + self._general_logger_levels = ["DEBUG", "INFO", "WARNING", "SILENT"] self._sym_lvl = 3 - self._debugger_logger = "INFO" - self._pipe_logger = "INFO" + self._debugger_logger = "SILENT" + self._pipe_logger = "SILENT" self._general_logger = "INFO" # Adjust log levels based on command-line arguments @@ -95,12 +101,14 @@ def debugger_logger(self: LibContext) -> str: @debugger_logger.setter def debugger_logger(self: LibContext, value: str) -> None: - """Property setter for debugger_logger, ensuring it's a valid logging level.""" - if value in ["DEBUG", "INFO"]: + """Property setter for debugger_logger, ensuring it's a supported logging level.""" + if value in self._debugger_logger_levels: self._debugger_logger = value liblog.debugger_logger.setLevel(value) else: - raise ValueError("debugger_logger must be a valid logging level") + raise ValueError( + f"debugger_logger must be a supported logging level. The supported levels are: {self._debugger_logger_levels}", + ) @property def pipe_logger(self: LibContext) -> str: @@ -113,12 +121,14 @@ def pipe_logger(self: LibContext) -> str: @pipe_logger.setter def pipe_logger(self: LibContext, value: str) -> None: - """Property setter for pipe_logger, ensuring it's a valid logging level.""" - if value in ["DEBUG", "INFO"]: + """Property setter for pipe_logger, ensuring it's a supported logging level.""" + if value in self._pipe_logger_levels: self._pipe_logger = value liblog.pipe_logger.setLevel(value) else: - raise ValueError("pipe_logger must be a valid logging level") + raise ValueError( + f"pipe_logger must be a supported logging level. The supported levels are: {self._pipe_logger_levels}", + ) @property def general_logger(self: LibContext) -> str: @@ -131,12 +141,14 @@ def general_logger(self: LibContext) -> str: @general_logger.setter def general_logger(self: LibContext, value: str) -> None: - """Property setter for general_logger, ensuring it's a valid logging level.""" - if value in ["DEBUG", "INFO"]: + """Property setter for general_logger, ensuring it's a supported logging level.""" + if value in self._general_logger_levels: self._general_logger = value liblog.general_logger.setLevel(value) else: - raise ValueError("general_logger must be a valid logging level") + raise ValueError( + f"general_logger must be a supported logging level. The supported levels are: {self._general_logger_levels}", + ) @property def arch(self: LibContext) -> str: diff --git a/setup.py b/setup.py index 48ae5a7f..2eff8455 100644 --- a/setup.py +++ b/setup.py @@ -68,8 +68,8 @@ def get_outputs(self): setup( name="libdebug", - version="0.5.3", - author="JinBlack", + version="0.5.4", + author="JinBlack, Io_no, MrIndeciso, Frank01001", description="A library to debug binary programs", packages=find_packages(include=["libdebug", "libdebug.*"]), install_requires=["capstone", "pyelftools", "cffi", "requests", "psutil"], diff --git a/test/Makefile b/test/Makefile index 2694586e..1be7d276 100644 --- a/test/Makefile +++ b/test/Makefile @@ -28,6 +28,8 @@ all: $(CC) $(CFLAGS) $(SRC_DIR)/signals_multithread_det_test.c -o $(BIN_DIR)/signals_multithread_det_test $(LDFLAGS) $(CC) $(CFLAGS) $(SRC_DIR)/segfault_test.c -o $(BIN_DIR)/segfault_test $(LDFLAGS) $(CC) $(CFLAGS) $(SRC_DIR)/executable_section_test.c -o $(BIN_DIR)/executable_section_test $(LDFLAGS) + $(CC) $(CFLAGS) $(SRC_DIR)/math_loop_test.c -lm -fno-pie -no-pie -o $(BIN_DIR)/math_loop_test $(LDFLAGS) + # Clean rule to remove compiled files diff --git a/test/benchmarks/benchmarks.md b/test/benchmarks/benchmarks.md new file mode 100644 index 00000000..9eabd610 --- /dev/null +++ b/test/benchmarks/benchmarks.md @@ -0,0 +1,50 @@ +# libdebug VS GDB Benchmarks +The benchmarks were run on libdebug 0.5.4 and GDB 15.1 + +## System Information +``` +$ uname -a +Linux 6.9.9-arch1-1 #1 SMP PREEMPT_DYNAMIC Fri, 12 Jul 2024 00:06:53 +0000 x86_64 GNU/Linux +``` + +
+                   -`                    Neofetch Results
+                  .o+`                   ---------------------------
+                 `ooo/                   OS: Arch Linux x86_64
+                `+oooo:                  Host: XPS 14 9440
+               `+oooooo:                 Kernel: 6.9.9-arch1-1
+               -+oooooo+:                Uptime: 15 mins
+             `/:-:++oooo+:               Packages: 1038 (pacman), 6 (flatpak)
+            `/++++/+++++++:              Shell: zsh 5.9
+           `/++++++++++++++:             Resolution: 1920x1200
+          `/+++ooooooooooooo/`           DE: GNOME 46.3.1
+         ./ooosssso++osssssso+`          WM: Mutter
+        .oossssso-````/ossssss+          WM Theme: Adwaita
+       -osssssso.      :ssssssso.        Theme: Adwaita [GTK2/3]
+      :osssssss/        osssso+++.       Icons: Adwaita [GTK2/3]
+     /ossssssss/        +ssssooo/-       Terminal: alacritty
+   `/ossssso+/:-        -:/+osssso+-     CPU: Intel Ultra 7 155H (22) @ 4.500GHz
+  `+sso+:-`                 `.-/+oso:    GPU: Intel Arc Graphics
+ `++:.                           `-/+/   GPU: NVIDIA GeForce RTX 4050 Max-Q / Mobile
+ .`                                 `/   Memory: 3809MiB / 63749MiB
+
+ +## Folder structure +In this folder, you will find all python scripts to run experiments on both libdebug and GDB. The available benchmarks are on breakpoint hits and syscall handling. + +The *results* folder contains Python pickles of the lists of time required for each run as well as the extracted boxplots for the distributions. + +## Replicating the benchmarks + +### GDB Scripts +GDB is not designed to be scriptable. However, it is possible to implement some custom commands to be run once GDB is loaded. Because of this, the benchmark will include user logs and other overhead. To run a GDB test you need to open GDB with the test script loaded and then run the associated command. E.g., +```bash +gdb -q -x breakpoint_gdb.py +(gdb) breakpoint_gdb +``` + +### libdebug scripts +Once you have the exact same version of libdebug installed, run the script like any other Python script. E.g., +```bash +python breakpoint_libdebug.py +``` \ No newline at end of file diff --git a/test/benchmarks/breakpoint_gdb.py b/test/benchmarks/breakpoint_gdb.py new file mode 100644 index 00000000..1ae41a6a --- /dev/null +++ b/test/benchmarks/breakpoint_gdb.py @@ -0,0 +1,72 @@ +# +# This file is part of libdebug Python library (https://github.com/libdebug/libdebug). +# Copyright (c) 2024 Gabriele Digregorio. All rights reserved. +# Licensed under the MIT license. See LICENSE file in the project root for details. +# + +import gdb +from time import perf_counter +import pickle + +class MyBreakpoint(gdb.Breakpoint): + """ Class to handle the breakpoint action """ + def __init__(self, spec): + """ Initialize a hardware breakpoint """ + super(MyBreakpoint, self).__init__(spec, gdb.BP_HARDWARE_BREAKPOINT) + self.silent = True + + def stop(self): + """ Callback function to be called at each breakpoint hit """ + pass + +class Debugger(gdb.Command): + """ Class to handle the debugging session """ + def __init__(self): + super(Debugger, self).__init__("breakpoint_gdb", gdb.COMMAND_USER) + + def test(self): + """ This test includes the time to: + - run the debugged process from the entrypoint, + - hit the breakpoint 1000 times, + - each time the breakpoint is hit, execute an empty callback, + - wait the process to end. + """ + gdb.execute("set pagination off") + + # Start the process (it will stop at the entrypoint) + gdb.execute("start") + + # Set the hardware breakpoint + MyBreakpoint("*0x401302") + + # Start the timer + start = perf_counter() + + # Continue the process from the entrypoint and wait for the process to end + gdb.execute("continue") + + # Stop the timer + end = perf_counter() + + # Delete the breakpoints + gdb.execute("del breakpoints") + + self.results.append(end-start) + + def invoke(self, arg, from_tty): + # Initialize the results + self.results = [] + + # Load the binary + binary = "../binaries/math_loop_test" + gdb.execute(f"file {binary}") + + for _ in range(1000): + self.test() + + # Save the result in a pickle file + with open("breakpoint_gdb.pkl", "wb") as f: + pickle.dump(self.results, f) + # print("Results:", self.results) + +Debugger() \ No newline at end of file diff --git a/test/benchmarks/breakpoint_libdebug.py b/test/benchmarks/breakpoint_libdebug.py new file mode 100644 index 00000000..790dfd82 --- /dev/null +++ b/test/benchmarks/breakpoint_libdebug.py @@ -0,0 +1,62 @@ +# +# This file is part of libdebug Python library (https://github.com/libdebug/libdebug). +# Copyright (c) 2024 Gabriele Digregorio. All rights reserved. +# Licensed under the MIT license. See LICENSE file in the project root for details. +# + +from time import perf_counter +import pickle +from libdebug import debugger + + +def callback(t,b): + """ Callback function to be called at each breakpoint hit """ + pass + +def test(): + """ This test includes the time to: + - run the debugged process from the entrypoint, + - hit the breakpoint 1000 times, + - each time the breakpoint is hit, execute an empty callback, + - wait the process to end. + """ + # Start the process (it will stop at the entrypoint) + d.run() + + # Set the hardware breakpoint + d.breakpoint(0x401302, callback=callback, hardware=True, file="absolute") + + # Start the timer + start = perf_counter() + + # Continue the process from the entrypoint + d.cont() + + # Wait for the process to end + d.wait() + + # Stop the timer + end = perf_counter() + + # Kill for a clean exit + d.kill() + + results.append(end-start) + +# Initialize the results +results = [] + +# Initialize the debugger +d = debugger("../binaries/math_loop_test") + +for _ in range(1000): + test() + +# Terminate the debugger +d.terminate() + +# Save the result in a pickle file +with open("breakpoint_libdebug.pkl", "wb") as f: + pickle.dump(results, f) + +# print("Results:", results) \ No newline at end of file diff --git a/test/benchmarks/results/breakpoint_benchmark.svg b/test/benchmarks/results/breakpoint_benchmark.svg new file mode 100644 index 00000000..8997fcc5 --- /dev/null +++ b/test/benchmarks/results/breakpoint_benchmark.svg @@ -0,0 +1,509 @@ + + + + + + + + 2024-07-31T13:33:43.180664 + image/svg+xml + + + Matplotlib v3.7.1, https://matplotlib.org/ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/test/benchmarks/results/breakpoint_gdb.pkl b/test/benchmarks/results/breakpoint_gdb.pkl new file mode 100644 index 00000000..aeca4d03 Binary files /dev/null and b/test/benchmarks/results/breakpoint_gdb.pkl differ diff --git a/test/benchmarks/results/breakpoint_libdebug.pkl b/test/benchmarks/results/breakpoint_libdebug.pkl new file mode 100644 index 00000000..4b0e66fb Binary files /dev/null and b/test/benchmarks/results/breakpoint_libdebug.pkl differ diff --git a/test/benchmarks/results/syscall_benchmark.svg b/test/benchmarks/results/syscall_benchmark.svg new file mode 100644 index 00000000..38916074 --- /dev/null +++ b/test/benchmarks/results/syscall_benchmark.svg @@ -0,0 +1,945 @@ + + + + + + + + 2024-07-31T13:33:43.413632 + image/svg+xml + + + Matplotlib v3.7.1, https://matplotlib.org/ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/test/benchmarks/results/syscall_gdb.pkl b/test/benchmarks/results/syscall_gdb.pkl new file mode 100644 index 00000000..14302b5e Binary files /dev/null and b/test/benchmarks/results/syscall_gdb.pkl differ diff --git a/test/benchmarks/results/syscall_libdebug.pkl b/test/benchmarks/results/syscall_libdebug.pkl new file mode 100644 index 00000000..e91884e9 Binary files /dev/null and b/test/benchmarks/results/syscall_libdebug.pkl differ diff --git a/test/benchmarks/syscall_gdb.py b/test/benchmarks/syscall_gdb.py new file mode 100644 index 00000000..1dddc870 --- /dev/null +++ b/test/benchmarks/syscall_gdb.py @@ -0,0 +1,104 @@ +# +# This file is part of libdebug Python library (https://github.com/libdebug/libdebug). +# Copyright (c) 2024 Gabriele Digregorio. All rights reserved. +# Licensed under the MIT license. See LICENSE file in the project root for details. +# + +######################################################################################## +# This script requires GDB 15.1 or later to run due to the use of the new Python API # +######################################################################################## + +import gdb +from time import perf_counter +import pickle + + +# Initialize the global variables +results = [] + +def stop_handler(event): + """ Callback function to be called at each stop event """ + if isinstance(event, gdb.StopEvent): + # Check if the stop is due the desired syscall + if event.details["reason"] == "syscall-entry" and event.details["syscall-number"] == 39: + gdb.post_event(lambda: gdb.execute("continue")) + elif event.details["reason"] == "syscall-return" and event.details["syscall-number"] == 39: + gdb.post_event(lambda: gdb.execute("continue")) + +class StartBreakpoint(gdb.Breakpoint): + """ Class to handle the breakpoint set just before the main loop """ + def __init__(self, spec): + """ Initialize a hardware breakpoint """ + super(StartBreakpoint, self).__init__(spec, gdb.BP_HARDWARE_BREAKPOINT) + self.silent = True + + def stop(self): + """ Callback function to be called at each breakpoint hit """ + global start + # Start the timer + start = perf_counter() + +class EndBreakpoint(gdb.Breakpoint): + """ Class to handle the breakpoint set right after the main loop """ + def __init__(self, spec): + """ Initialize a hardware breakpoint """ + super(EndBreakpoint, self).__init__(spec, gdb.BP_HARDWARE_BREAKPOINT) + self.silent = True + + def stop(self): + """ Callback function to be called at each breakpoint hit """ + # Stop the timer + end = perf_counter() + + # Kill the process + gdb.execute("kill") + + if len(results) < 1000: + results.append(end-start) + # Restart the process + gdb.execute("run") + else: + # Save the results and quit + with open("syscall_gdb.pkl", "wb") as f: + pickle.dump(results, f) + # print("Results:", results) + gdb.execute("quit") + +class Debugger(gdb.Command): + """ Class to handle the debugging session """ + def __init__(self): + super(Debugger, self).__init__("syscall_gdb", gdb.COMMAND_USER) + + def invoke(self, arg, from_tty): + """ This test includes the time to: + - run the debugged process from the breakpoint just before the main loop, + - manage 1000 calls to the syscall getpid (handled by the stop_handler), + - reach the breakpoint right after the main loop, + """ + + # Load the binary + binary = "../binaries/math_loop_test" + gdb.execute(f"file {binary}") + + gdb.execute("set confirm off") + gdb.execute("set pagination off") + + # Catch the syscall getpid + gdb.execute("catch syscall getpid") + + # Connect the stop_handler to the stop event + # It will be called at each stop event and will check + # if the syscall getpid is called + gdb.events.stop.connect(stop_handler) + + # Start the process (it will stop at the entrypoint) + gdb.execute("start") + + # Set the breakpoints before and after the main loop + StartBreakpoint("*0x401243") + EndBreakpoint("*0x401332") + + # Continue the process from the entrypoint + gdb.execute("continue") + +Debugger() \ No newline at end of file diff --git a/test/benchmarks/syscall_libdebug.py b/test/benchmarks/syscall_libdebug.py new file mode 100644 index 00000000..6dd4e794 --- /dev/null +++ b/test/benchmarks/syscall_libdebug.py @@ -0,0 +1,78 @@ +# +# This file is part of libdebug Python library (https://github.com/libdebug/libdebug). +# Copyright (c) 2024 Gabriele Digregorio. All rights reserved. +# Licensed under the MIT license. See LICENSE file in the project root for details. +# + +from time import perf_counter +import pickle +from libdebug import debugger + + +def callback_on_enter(t,b): + """ Callback function to be called on syscall entry """ + pass + +def callback_on_exit(t,b): + """ Callback function to be called on syscall exit """ + pass + +def start_breakpoint(t,b): + """ Callback function to be called each time the breakpoint set just before + the main loop is hit + """ + global start + # Start the timer + start = perf_counter() + +def end_breakpoint(t,b): + """ Callback function to be called each time the breakpoint set right after + the main loop is hit + """ + # Stop the timer + end = perf_counter() + # Update the results + results.append(end-start) + +def test(): + """ This test includes the time to: + - run the debugged process from the breakpoint just before the main loop, + - manage 1000 calls to the syscall getpid (each call is handled by the callback functions), + - reach the breakpoint right after the main loop, + """ + # Start the process (it will stop at the entrypoint) + d.run() + + # Set the breakpoints before and after the main loop + d.breakpoint(0x401243, hardware=True, callback=start_breakpoint, file="absolute") + d.breakpoint(0x401332, hardware=True, callback=end_breakpoint, file="absolute") + + # Handle the syscall getpid, install the callbacks + d.handle_syscall("getpid", on_enter=callback_on_enter, on_exit=callback_on_exit) + + # Continue the process from the entrypoint + d.cont() + + # Wait for the process to end + d.wait() + + # Kill the process + d.kill() + +# Initialize the results +results = [] + +# Initialize the debugger +d = debugger("../binaries/math_loop_test") + +for _ in range(1000): + test() + +# Terminate the debugger +d.terminate() + +# Save the result in a pickle file +with open("syscall_libdebug.pkl", "wb") as f: + pickle.dump(results, f) + +# print("Results:", results) diff --git a/test/binaries/math_loop_test b/test/binaries/math_loop_test new file mode 100755 index 00000000..eb209937 Binary files /dev/null and b/test/binaries/math_loop_test differ diff --git a/test/run_suite.py b/test/run_suite.py index 8f48b72a..27486054 100644 --- a/test/run_suite.py +++ b/test/run_suite.py @@ -52,6 +52,10 @@ def fast_suite(): suite.addTest(BreakpointTest("test_bp_disable_reenable_hw")) suite.addTest(BreakpointTest("test_bps_running")) suite.addTest(BreakpointTest("test_bp_backing_file")) + suite.addTest(BreakpointTest("test_bp_disable_on_creation")) + suite.addTest(BreakpointTest("test_bp_disable_on_creation_2")) + suite.addTest(BreakpointTest("test_bp_disable_on_creation_hardware")) + suite.addTest(BreakpointTest("test_bp_disable_on_creation_2_hardware")) suite.addTest(MemoryTest("test_memory")) suite.addTest(MemoryTest("test_mem_access_libs")) suite.addTest(MemoryTest("test_memory_access_methods_backing_file")) @@ -61,6 +65,7 @@ def fast_suite(): suite.addTest(MemoryTest("test_memory_access_methods")) suite.addTest(HwBasicTest("test_basic")) suite.addTest(HwBasicTest("test_registers")) + suite.addTest(BacktraceTest("test_backtrace_as_symbols")) suite.addTest(BacktraceTest("test_backtrace")) suite.addTest(AttachDetachTest("test_attach")) suite.addTest(AttachDetachTest("test_attach_and_detach_1")) @@ -111,6 +116,8 @@ def fast_suite(): suite.addTest(AutoWaitingNlinks("test_nlinks")) suite.addTest(WatchpointTest("test_watchpoint")) suite.addTest(WatchpointTest("test_watchpoint_callback")) + suite.addTest(WatchpointTest("test_watchpoint_disable")) + suite.addTest(WatchpointTest("test_watchpoint_disable_reenable")) suite.addTest(WatchpointAliasTest("test_watchpoint_alias")) suite.addTest(WatchpointAliasTest("test_watchpoint_callback")) suite.addTest(HandleSyscallTest("test_handles")) diff --git a/test/scripts/backtrace_test.py b/test/scripts/backtrace_test.py index 29cf9237..b2acdae7 100644 --- a/test/scripts/backtrace_test.py +++ b/test/scripts/backtrace_test.py @@ -13,7 +13,7 @@ class BacktraceTest(unittest.TestCase): def setUp(self): self.d = debugger("binaries/backtrace_test") - def test_backtrace(self): + def test_backtrace_as_symbols(self): d = self.d d.run() @@ -29,28 +29,28 @@ def test_backtrace(self): d.cont() self.assertTrue(d.regs.rip == bp0.address) - backtrace = d.backtrace() + backtrace = d.backtrace(as_symbols=True) self.assertIn("_start", backtrace.pop()) self.assertEqual(backtrace[:1], ["main+8"]) d.cont() self.assertTrue(d.regs.rip == bp1.address) - backtrace = d.backtrace() + backtrace = d.backtrace(as_symbols=True) self.assertIn("_start", backtrace.pop()) self.assertEqual(backtrace[:2], ["function1+8", "main+16"]) d.cont() self.assertTrue(d.regs.rip == bp2.address) - backtrace = d.backtrace() + backtrace = d.backtrace(as_symbols=True) self.assertIn("_start", backtrace.pop()) self.assertEqual(backtrace[:3], ["function2+8", "function1+12", "main+16"]) d.cont() self.assertTrue(d.regs.rip == bp3.address) - backtrace = d.backtrace() + backtrace = d.backtrace(as_symbols=True) self.assertIn("_start", backtrace.pop()) self.assertEqual( backtrace[:4], ["function3+8", "function2+1c", "function1+12", "main+16"] @@ -59,7 +59,7 @@ def test_backtrace(self): d.cont() self.assertTrue(d.regs.rip == bp4.address) - backtrace = d.backtrace() + backtrace = d.backtrace(as_symbols=True) self.assertIn("_start", backtrace.pop()) self.assertEqual( backtrace[:5], @@ -69,7 +69,7 @@ def test_backtrace(self): d.cont() self.assertTrue(d.regs.rip == bp5.address) - backtrace = d.backtrace() + backtrace = d.backtrace(as_symbols=True) self.assertIn("_start", backtrace.pop()) self.assertEqual( backtrace[:6], @@ -86,7 +86,7 @@ def test_backtrace(self): d.cont() self.assertTrue(d.regs.rip == bp6.address) - backtrace = d.backtrace() + backtrace = d.backtrace(as_symbols=True) self.assertIn("_start", backtrace.pop()) self.assertEqual( backtrace[:7], @@ -103,6 +103,96 @@ def test_backtrace(self): d.kill() + def test_backtrace(self): + d = self.d + + d.run() + + bp0 = d.breakpoint("main+8") + bp1 = d.breakpoint("function1+8") + bp2 = d.breakpoint("function2+8") + bp3 = d.breakpoint("function3+8") + bp4 = d.breakpoint("function4+8") + bp5 = d.breakpoint("function5+8") + bp6 = d.breakpoint("function6+8") + + d.cont() + + self.assertTrue(d.regs.rip == bp0.address) + backtrace = d.backtrace() + backtrace.pop() + self.assertEqual(backtrace[:1], [0x555555555151]) + + d.cont() + + self.assertTrue(d.regs.rip == bp1.address) + backtrace = d.backtrace() + backtrace.pop() + self.assertEqual(backtrace[:2], [0x55555555518a, 0x55555555515f]) + + d.cont() + + self.assertTrue(d.regs.rip == bp2.address) + backtrace = d.backtrace() + backtrace.pop() + self.assertEqual(backtrace[:3], [0x55555555519e, 0x555555555194, 0x55555555515f]) + + d.cont() + + self.assertTrue(d.regs.rip == bp3.address) + backtrace = d.backtrace() + backtrace.pop() + self.assertEqual( + backtrace[:4], [0x5555555551bc, 0x5555555551b2, 0x555555555194, 0x55555555515f] + ) + + d.cont() + + self.assertTrue(d.regs.rip == bp4.address) + backtrace = d.backtrace() + backtrace.pop() + self.assertEqual( + backtrace[:5], + [0x5555555551da, 0x5555555551d0, 0x5555555551b2, 0x555555555194, 0x55555555515f], + ) + + d.cont() + + self.assertTrue(d.regs.rip == bp5.address) + backtrace = d.backtrace() + backtrace.pop() + self.assertEqual( + backtrace[:6], + [ + 0x5555555551f8, + 0x5555555551ee, + 0x5555555551d0, + 0x5555555551b2, + 0x555555555194, + 0x55555555515f, + ], + ) + + d.cont() + + self.assertTrue(d.regs.rip == bp6.address) + backtrace = d.backtrace() + backtrace.pop() + self.assertEqual( + backtrace[:7], + [ + 0x555555555216, + 0x55555555520c, + 0x5555555551ee, + 0x5555555551d0, + 0x5555555551b2, + 0x555555555194, + 0x55555555515f, + ], + ) + + d.kill() + if __name__ == "__main__": unittest.main() diff --git a/test/scripts/breakpoint_test.py b/test/scripts/breakpoint_test.py index 9d77c66b..ea36547b 100644 --- a/test/scripts/breakpoint_test.py +++ b/test/scripts/breakpoint_test.py @@ -378,6 +378,75 @@ def test_bp_backing_file(self): d.kill() + def test_bp_disable_on_creation(self): + d = debugger("binaries/breakpoint_test") + + d.run() + + bp1 = d.bp("random_function") + bp2 = d.bp(0x40119c) + bp1.disable() + + d.cont() + + assert not bp1.hit_on(d) + assert bp2.hit_on(d) + + d.kill() + d.terminate() + + def test_bp_disable_on_creation_2(self): + d = debugger("binaries/breakpoint_test") + + d.run() + + bp = d.bp("random_function") + + bp.disable() + + d.cont() + d.wait() + + # Validate we didn't segfault + assert d.dead and d.exit_signal is None + + d.kill() + d.terminate() + + def test_bp_disable_on_creation_hardware(self): + d = debugger("binaries/breakpoint_test") + + d.run() + + bp1 = d.bp("random_function", hardware=True) + bp2 = d.bp(0x40119c) + bp1.disable() + + d.cont() + + assert not bp1.hit_on(d) + assert bp2.hit_on(d) + + d.kill() + d.terminate() + + def test_bp_disable_on_creation_2_hardware(self): + d = debugger("binaries/breakpoint_test") + + d.run() + + bp = d.bp("random_function", hardware=True) + + bp.disable() + + d.cont() + d.wait() + + # Validate we didn't segfault + assert d.dead and d.exit_signal is None + + d.kill() + d.terminate() if __name__ == "__main__": unittest.main() diff --git a/test/scripts/watchpoint_test.py b/test/scripts/watchpoint_test.py index ceece1bf..a29b5ae4 100644 --- a/test/scripts/watchpoint_test.py +++ b/test/scripts/watchpoint_test.py @@ -15,31 +15,46 @@ def test_watchpoint(self): d.run() - d.breakpoint("global_char", hardware=True, condition="rw", length=1) - d.breakpoint("global_int", hardware=True, condition="w", length=4) - d.breakpoint("global_long", hardware=True, condition="rw", length=8) + wp_char = d.breakpoint("global_char", hardware=True, condition="rw", length=1) + wp_int = d.breakpoint("global_int", hardware=True, condition="w", length=4) + wp_long = d.breakpoint("global_long", hardware=True, condition="rw", length=8) d.cont() self.assertEqual(d.regs.rip, 0x401111) # mov byte ptr [global_char], 0x1 + self.assertEqual(wp_char.hit_count, 1) + self.assertEqual(wp_int.hit_count, 0) + self.assertEqual(wp_long.hit_count, 0) d.cont() self.assertEqual(d.regs.rip, 0x401124) # mov dword ptr [global_int], 0x4050607 + self.assertEqual(wp_char.hit_count, 1) + self.assertEqual(wp_int.hit_count, 1) + self.assertEqual(wp_long.hit_count, 0) d.cont() self.assertEqual( d.regs.rip, 0x401135 ) # mov qword ptr [global_long], 0x8090a0b0c0d0e0f + self.assertEqual(wp_char.hit_count, 1) + self.assertEqual(wp_int.hit_count, 1) + self.assertEqual(wp_long.hit_count, 1) d.cont() self.assertEqual(d.regs.rip, 0x401155) # movzx eax, byte ptr [global_char] + self.assertEqual(wp_char.hit_count, 2) + self.assertEqual(wp_int.hit_count, 1) + self.assertEqual(wp_long.hit_count, 1) d.cont() self.assertEqual(d.regs.rip, 0x401173) # mov rax, qword ptr [global_long] + self.assertEqual(wp_char.hit_count, 2) + self.assertEqual(wp_int.hit_count, 1) + self.assertEqual(wp_long.hit_count, 2) d.cont() @@ -120,3 +135,97 @@ def watchpoint_global_long(t, b): # There is one extra hit performed by the exit routine of libc self.assertEqual(wp3.hit_count, 3) + + def test_watchpoint_disable(self): + d = debugger("binaries/watchpoint_test", auto_interrupt_on_command=False) + + d.run() + + wp_char = d.breakpoint("global_char", hardware=True, condition="rw", length=1) + wp_int = d.breakpoint("global_int", hardware=True, condition="w", length=4) + wp_long = d.breakpoint("global_long", hardware=True, condition="rw", length=8) + + d.cont() + + self.assertEqual(d.regs.rip, 0x401111) # mov byte ptr [global_char], 0x1 + self.assertEqual(wp_char.hit_count, 1) + self.assertEqual(wp_int.hit_count, 0) + self.assertEqual(wp_long.hit_count, 0) + + d.cont() + + self.assertEqual(d.regs.rip, 0x401124) # mov dword ptr [global_int], 0x4050607 + self.assertEqual(wp_char.hit_count, 1) + self.assertEqual(wp_int.hit_count, 1) + self.assertEqual(wp_long.hit_count, 0) + + d.cont() + + self.assertEqual( + d.regs.rip, 0x401135 + ) # mov qword ptr [global_long], 0x8090a0b0c0d0e0f + self.assertEqual(wp_char.hit_count, 1) + self.assertEqual(wp_int.hit_count, 1) + self.assertEqual(wp_long.hit_count, 1) + + # disable watchpoint + wp_char.disable() + + d.cont() + + self.assertEqual(d.regs.rip, 0x401173) # mov rax, qword ptr [global_long] + self.assertEqual(wp_char.hit_count, 1) + self.assertEqual(wp_int.hit_count, 1) + self.assertEqual(wp_long.hit_count, 2) + + d.cont() + + d.kill() + + def test_watchpoint_disable_reenable(self): + d = debugger("binaries/watchpoint_test", auto_interrupt_on_command=False) + + d.run() + + wp_char = d.breakpoint("global_char", hardware=True, condition="rw", length=1) + wp_int = d.breakpoint("global_int", hardware=True, condition="w", length=4) + wp_long = d.breakpoint("global_long", hardware=True, condition="rw", length=8) + + d.cont() + + self.assertEqual(d.regs.rip, 0x401111) # mov byte ptr [global_char], 0x1 + self.assertEqual(wp_char.hit_count, 1) + self.assertEqual(wp_int.hit_count, 0) + self.assertEqual(wp_long.hit_count, 0) + + d.cont() + + self.assertEqual(d.regs.rip, 0x401124) # mov dword ptr [global_int], 0x4050607 + self.assertEqual(wp_char.hit_count, 1) + self.assertEqual(wp_int.hit_count, 1) + self.assertEqual(wp_long.hit_count, 0) + + # disable watchpoint + wp_long.disable() + + d.cont() + + + self.assertEqual(d.regs.rip, 0x401155) # movzx eax, byte ptr [global_char] + self.assertEqual(wp_char.hit_count, 2) + self.assertEqual(wp_int.hit_count, 1) + self.assertEqual(wp_long.hit_count, 0) + + # re-enable watchpoint + wp_long.enable() + + d.cont() + + self.assertEqual(d.regs.rip, 0x401173) # mov rax, qword ptr [global_long] + self.assertEqual(wp_char.hit_count, 2) + self.assertEqual(wp_int.hit_count, 1) + self.assertEqual(wp_long.hit_count, 1) + + d.cont() + + d.kill() \ No newline at end of file diff --git a/test/srcs/math_loop_test.c b/test/srcs/math_loop_test.c new file mode 100644 index 00000000..918abb8e --- /dev/null +++ b/test/srcs/math_loop_test.c @@ -0,0 +1,29 @@ +// +// This file is part of libdebug Python library (https://github.com/libdebug/libdebug). +// Copyright (c) 2024 Gabriele Digregorio. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for details. +// + +#include +#include +#include + +int main() { + setvbuf(stdout, NULL, _IONBF, 0); + setvbuf(stdin, NULL, _IONBF, 0); + setvbuf(stderr, NULL, _IONBF, 0); + + double result; + int pid; + + for (int i = 0; i < 1000; i++) { + // Perform a complex mathematical operation + result += sin(i) * log(i + 1) * sqrt(i + 1) + cos(i) / (i + 1); + + // Call a harmless syscall + pid = getpid(); + result += pid; + } + + return 0; +} \ No newline at end of file