-
Notifications
You must be signed in to change notification settings - Fork 149
/
Copy pathdebugcore.py
2204 lines (1856 loc) · 85.1 KB
/
debugcore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
"""
Copyright (C) 2016-2017 Korcan Karaokçu <[email protected]>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from threading import Lock, Thread, Condition
from time import sleep, time
from collections import OrderedDict, defaultdict
import pexpect, os, sys, ctypes, pickle, shelve, re, struct, io, traceback
from . import utils, typedefs, regexes
from .libscanmem.scanmem import Scanmem
from .libptrscan.ptrscan import PointerScan
self_pid = os.getpid()
libc = ctypes.CDLL("libc.so.6")
system_endianness = typedefs.ENDIANNESS.LITTLE if sys.byteorder == "little" else typedefs.ENDIANNESS.BIG
scanmem = Scanmem(os.path.join(utils.get_libpince_directory(), "libscanmem", "libscanmem.so"))
ptrscan = PointerScan(os.path.join(utils.get_libpince_directory(), "libptrscan", "libptrscan.so"))
ptrscan.set_pointer_offset_symbol("->")
# A boolean value. True if gdb is initialized, False if not
gdb_initialized = False
# An integer. Can be a member of typedefs.INFERIOR_ARCH
inferior_arch = int
# An integer. Can be a member of typedefs.INFERIOR_STATUS
inferior_status = -1
# An integer. PID of the current attached/created process
currentpid = -1
# An integer. Can be a member of typedefs.STOP_REASON
stop_reason = int
# A dictionary. Holds breakpoint numbers and what to do on hit
# Format: {bp_num1:on_hit1, bp_num2:on_hit2, ...}
breakpoint_on_hit_dict = {}
# A dictionary. Holds address and aob of instructions that were nop'ed out
# Format: {address1:orig_instruction1_aob, address2:orig_instruction2_aob, ...}
modified_instructions_dict = {}
# If an action such as deletion or condition modification happens in one of the breakpoints in a list, others in the
# same list will get affected as well
# Format: [[[address1, size1], [address2, size2], ...], [[address1, size1], ...], ...]
chained_breakpoints = []
child = object # this object will be used with pexpect operations
# This Lock is used by the function send_command to ensure synchronous execution
lock_send_command = Lock()
# This condition is notified whenever status of the inferior changes
# Use the variable inferior_status to get information about inferior's status
# See CheckInferiorStatus class for an example
status_changed_condition = Condition()
# This condition is notified if the current inferior gets terminated
# See AwaitProcessExit class for an example
process_exited_condition = Condition()
# This condition is notified if gdb starts to wait for the prompt output
# See function send_command for an example
gdb_waiting_for_prompt_condition = Condition()
# A string. Stores the output of the last command
gdb_output = ""
# An instance of typedefs.RegisterQueue. Updated whenever GDB receives an async event such as breakpoint modification
# See AwaitAsyncOutput class for an example of usage
gdb_async_output = typedefs.RegisterQueue()
# A boolean value. Used to cancel the last gdb command sent
# Use the function cancel_last_command to make use of this variable
# Return value of the current send_command call will be an empty string
cancel_send_command = False
# A boolean value. Used by state_observe_thread to check if a trace session is active
active_trace = False
# A string. Holds the last command sent to gdb
last_gdb_command = ""
# A list of booleans. Used to adjust gdb output
# Use the function set_gdb_output_mode to make use of this variable
gdb_output_mode = typedefs.gdb_output_mode(True, True, True)
# A string. memory file of the currently attached/created process
mem_file = "/proc/" + str(currentpid) + "/mem"
# A string. Determines which signal to use to interrupt the process
interrupt_signal = "SIGINT"
"""
When PINCE was first launched, it used gdb 7.7.1, which is a very outdated version of gdb
interpreter-exec mi command of gdb showed some buggy behaviour at that time
Because of that, PINCE couldn't support gdb/mi commands for a while
But PINCE is now updated with the new versions of gdb as much as possible and the interpreter-exec works much better
So, old parts of codebase still get their required information by parsing gdb console output
New parts can try to rely on gdb/mi output
"""
"""
Functions that require breakpoint commands, such as track_watchpoint and track_breakpoint, requires process to be
stopped beforehand. If the process is running before we give the breakpoint its commands, there's a chance that the
breakpoint will be triggered before we give it commands. The process must be stopped to avoid this race condition
It's also necessary to stop the process to run commands like "watch"
"""
def set_gdb_output_mode(output_mode_tuple):
"""Adjusts gdb output
Args:
output_mode_tuple (typedefs.gdb_output_mode): Setting any field True will enable the output that's associated
with that field. Setting it False will disable the associated output
"""
global gdb_output_mode
gdb_output_mode = output_mode_tuple
def cancel_last_command():
"""Cancels the last gdb command sent if it's still present"""
if lock_send_command.locked():
global cancel_send_command
cancel_send_command = True
def send_command(
command, control=False, cli_output=False, send_with_file=False, file_contents_send=None, recv_with_file=False
):
"""Issues the command sent, raises an exception if gdb isn't initiated
Args:
command (str): The command that'll be sent
control (bool): This param should be True if the command sent is ctrl+key instead of the regular command
cli_output (bool): If True, returns a readable cli output instead of gdb/mi output
send_with_file (bool): Custom commands declared in gdbextensions.py requires file communication. If
called command has any parameters, pass this as True
file_contents_send (any): Arguments for the custom gdb command called
recv_with_file (bool): Pass this as True if the called custom gdb command returns something
Examples:
send_command(c,control=True)--> Sends ctrl+c instead of the str "c"
send_command("pince-read-addresses", file_contents_send=nested_list, recv_file=True)--> This line calls the
custom gdb command "pince-read-addresses" with parameter nested_list and since that gdb command returns the
addresses read as a list, we also pass the parameter recv_file as True
Returns:
str: Result of the command sent, commands in the form of "ctrl+key" always returns a null string
???: If recv_with_file is True. Content of the returned thing depends on the command sent
Note:
TODO:This bug doesn't seem like to exist anymore. Remove the unnecessary file communication layer of IPC
File communication system is used to avoid BEL emitting bug of pexpect. If you send more than a certain amount
of characters to gdb, the input will be sheared at somewhere and gdb won't be receiving all of the input
Visit this page for more information-->http://pexpect.readthedocs.io/en/stable/commonissues.html
You don't have to write interpreter-exec while sending a gdb/mi command. Just pass the gdb/mi command as itself.
This function will convert it automatically.
"""
global child
global gdb_output
global cancel_send_command
global last_gdb_command
with lock_send_command:
if gdb_output_mode.command_info:
time0 = time()
if not gdb_initialized:
raise typedefs.GDBInitializeException
gdb_output = ""
if send_with_file:
send_file = utils.get_from_pince_file(currentpid)
pickle.dump(file_contents_send, open(send_file, "wb"))
if recv_with_file or cli_output:
recv_file = utils.get_to_pince_file(currentpid)
# Truncating the recv_file because we wouldn't like to see output of previous command in case of errors
open(recv_file, "w").close()
command = str(command)
command = 'interpreter-exec mi "' + command + '"' if command.startswith("-") else command
last_gdb_command = command if not control else "Ctrl+" + command
if gdb_output_mode.command_info:
print("Last command: " + last_gdb_command)
if control:
child.sendcontrol(command)
else:
command_file = utils.get_gdb_command_file(currentpid)
command_fd = open(command_file, "r+")
command_fd.truncate()
command_fd.write(command)
command_fd.close()
if not cli_output:
child.sendline("source " + command_file)
else:
child.sendline("cli-output source " + command_file)
if not control:
while not gdb_output:
sleep(typedefs.CONST_TIME.GDB_INPUT_SLEEP)
if cancel_send_command:
break
if not cancel_send_command:
if recv_with_file or cli_output:
output = pickle.load(open(recv_file, "rb"))
else:
output = gdb_output
else:
output = ""
child.sendcontrol("c")
with gdb_waiting_for_prompt_condition:
gdb_waiting_for_prompt_condition.wait()
else:
output = ""
if gdb_output_mode.command_info:
time1 = time()
try:
print(time1 - time0)
except NameError:
pass
cancel_send_command = False
return output
def state_observe_thread():
"""
Observes the state of gdb, uses conditions to inform other functions and threads about gdb's state
Also generates output for send_command function
Should be called by creating a thread. Usually called in initialization process by attach function
"""
def check_inferior_status():
matches = regexes.gdb_state_observe.findall(child.before)
if len(matches) > 0:
global stop_reason
global inferior_status
old_status = inferior_status
for match in matches:
if match[0].startswith('stopped,reason="exited'):
with process_exited_condition:
detach()
print(f"Process terminated (PID:{currentpid})")
process_exited_condition.notify_all()
return
# For multiline outputs, only the last async event is important
# Get the last match only to optimize parsing
stop_info = matches[-1][0]
if stop_info:
stop_reason = typedefs.STOP_REASON.DEBUG
inferior_status = typedefs.INFERIOR_STATUS.STOPPED
else:
inferior_status = typedefs.INFERIOR_STATUS.RUNNING
bp_num = regexes.breakpoint_number.search(stop_info)
# Return -1 for invalid breakpoints to ignore racing conditions
if not (
old_status == inferior_status
or (bp_num and breakpoint_on_hit_dict.get(bp_num.group(1), -1) != typedefs.BREAKPOINT_ON_HIT.BREAK)
or active_trace
):
with status_changed_condition:
status_changed_condition.notify_all()
global child
global gdb_output
try:
while True:
child.expect_exact("\r\n") # A new line for TTY devices
child.before = child.before.strip()
if not child.before:
continue
check_inferior_status()
command_file = re.escape(utils.get_gdb_command_file(currentpid))
if regexes.gdb_command_source(command_file).search(child.before):
child.expect_exact("(gdb)")
child.before = child.before.strip()
check_inferior_status()
gdb_output = child.before
with gdb_waiting_for_prompt_condition:
gdb_waiting_for_prompt_condition.notify_all()
if gdb_output_mode.command_output:
print(child.before)
else:
if gdb_output_mode.async_output:
print(child.before)
gdb_async_output.broadcast_message(child.before)
except (OSError, ValueError, pexpect.EOF) as e:
if isinstance(e, pexpect.EOF):
print("\nEOF exception caught within pexpect, here's the contents of child.before:\n" + child.before)
print("Exiting state_observe_thread")
def execute_func_temporary_interruption(func, *args, **kwargs):
"""Interrupts the inferior before executing the given function, continues inferior's execution after calling the
given function
!!!WARNING!!! This function is NOT thread-safe. Use it with caution!
Args:
func (function): The function that'll be called between interrupt&continue routine
*args (args): Arguments for the function that'll be called
**kwargs (kwargs): Keyword arguments for the function that'll be called
Returns:
???: Result of the given function. Return type depends on the given function
"""
old_status = inferior_status
if old_status == typedefs.INFERIOR_STATUS.RUNNING:
interrupt_inferior(typedefs.STOP_REASON.PAUSE)
result = func(*args, **kwargs)
if old_status == typedefs.INFERIOR_STATUS.RUNNING:
continue_inferior()
return result
def execute_with_temporary_interruption(func):
"""Decorator version of execute_func_temporary_interruption"""
def wrapper(*args, **kwargs):
return execute_func_temporary_interruption(func, *args, **kwargs)
return wrapper
def can_attach(pid):
"""Check if we can attach to the target
Args:
pid (int,str): PID of the process that'll be attached
Returns:
bool: True if attaching is successful, False otherwise
"""
result = libc.ptrace(16, int(pid), 0, 0) # 16 is PTRACE_ATTACH, check ptrace.h for details
if result == -1:
return False
os.waitpid(int(pid), 0)
libc.ptrace(17, int(pid), 0, 17) # 17 is PTRACE_DETACH, check ptrace.h for details
sleep(0.01)
return True
def wait_for_stop(timeout=0):
"""Block execution till the inferior stops
Args:
timeout (float): Timeout time in seconds, passing 0 will wait for stop indefinitely
"""
remaining_time = timeout
while inferior_status == typedefs.INFERIOR_STATUS.RUNNING:
sleep(0.0001)
if timeout == 0:
continue
remaining_time -= 0.0001
if remaining_time < 0:
break
def interrupt_inferior(interrupt_reason=typedefs.STOP_REASON.DEBUG):
"""Interrupt the inferior
Args:
interrupt_reason (int): Just changes the global variable stop_reason. Can be a member of typedefs.STOP_REASON
"""
if currentpid == -1:
return
global stop_reason
if interrupt_signal == "SIGINT":
send_command("interrupt")
elif inferior_status == typedefs.INFERIOR_STATUS.RUNNING:
sig_num = interrupt_signal[3:]
if sig_num.isnumeric():
os.system(f"kill -{sig_num} {currentpid}")
else:
os.system(f"kill -s {interrupt_signal} {currentpid}")
wait_for_stop(1)
stop_reason = interrupt_reason
def continue_inferior():
"""Continue the inferior"""
if currentpid == -1:
return
send_command("c&")
def step_instruction():
"""Step one assembly instruction"""
send_command("stepi&")
def step_over_instruction():
"""Step over one assembly instruction"""
send_command("nexti&")
def execute_till_return():
"""Continues inferior till current stack frame returns"""
send_command("finish&")
def set_interrupt_signal(signal_name):
"""Decides on what signal to use to stop the process
Args:
signal_name (str): Name of the signal
"""
global interrupt_signal
handle_signal(signal_name, True, False)
interrupt_signal = signal_name
def handle_signal(signal_name: str, stop: bool, pass_to_program: bool) -> None:
"""Decides on what will GDB do when the process recieves a signal
Args:
signal_name (str): Name of the signal
stop (bool): Stop the program and print to the console
pass_to_program (bool): Pass signal to program
"""
params = [[signal_name, stop, pass_to_program]]
send_command("pince-handle-signals", send_with_file=True, file_contents_send=params)
def handle_signals(signal_list):
"""Optimized version of handle_signal for multiple signals
Args:
signal_list (list): A list of the parameters of handle_signal
"""
send_command("pince-handle-signals", send_with_file=True, file_contents_send=signal_list)
def init_gdb(gdb_path=utils.get_default_gdb_path()):
"""Spawns gdb and initializes/resets some of the global variables
Args:
gdb_path (str): Path of the gdb binary
Returns:
bool: True if initialization is successful, False otherwise
Note:
Calling init_gdb() will reset the current session
"""
global child
global gdb_initialized
global breakpoint_on_hit_dict
global chained_breakpoints
global gdb_output
global cancel_send_command
global last_gdb_command
utils.init_user_files()
detach()
# Temporary IPC_PATH, this little hack is needed because send_command requires a valid IPC_PATH
utils.create_ipc_path(currentpid)
utils.create_tmp_path(currentpid)
breakpoint_on_hit_dict.clear()
chained_breakpoints.clear()
gdb_output = ""
cancel_send_command = False
last_gdb_command = ""
libpince_dir = utils.get_libpince_directory()
is_appimage = os.environ.get("APPDIR")
python_home_env = f"PYTHONHOME={os.environ.get('PYTHONHOME')}" if is_appimage else ""
child = pexpect.spawn(
f"sudo -E --preserve-env=PATH LC_NUMERIC=C {python_home_env} {gdb_path} --nx --interpreter=mi",
cwd=libpince_dir,
env=os.environ,
encoding="utf-8",
)
child.setecho(False)
child.delaybeforesend = 0
child.timeout = None
try:
child.expect_exact("(gdb)")
except pexpect.EOF:
print("\nEOF exception caught within pexpect, here's the contents of child.before:\n" + child.before)
return False
status_thread = Thread(target=state_observe_thread)
status_thread.daemon = True
status_thread.start()
gdb_initialized = True
set_logging(False)
if not is_appimage:
send_command("source ./gdbinit_venv")
set_pince_paths()
send_command("source " + utils.get_user_path(typedefs.USER_PATHS.GDBINIT))
utils.execute_script(utils.get_user_path(typedefs.USER_PATHS.PINCEINIT))
return True
def set_logging(state):
"""Sets logging on or off
Args:
state (bool): Sets logging on if True, off if False
"""
send_command("set logging enabled off")
send_command("set logging file " + utils.get_logging_file(currentpid))
if state:
send_command("set logging enabled on")
def set_pince_paths():
"""Initializes $PINCE_PATH and $GDBINIT_AA_PATH convenience variables to make commands in gdbextensions.py
and gdbutils.py work. GDB scripts need to know libpince and .config directories, unfortunately they don't start
from the place where script exists
"""
libpince_dir = utils.get_libpince_directory()
pince_dir = os.path.dirname(libpince_dir)
gdbinit_aa_dir = utils.get_user_path(typedefs.USER_PATHS.GDBINIT_AA)
send_command("set $GDBINIT_AA_PATH=" + '"' + gdbinit_aa_dir + '"')
send_command("set $PINCE_PATH=" + '"' + pince_dir + '"')
send_command("source gdb_python_scripts/gdbextensions.py")
def init_referenced_dicts(pid):
"""Initializes referenced dict shelve databases
Args:
pid (int,str): PID of the attached process
"""
shelve.open(utils.get_referenced_strings_file(pid), "c")
shelve.open(utils.get_referenced_jumps_file(pid), "c")
shelve.open(utils.get_referenced_calls_file(pid), "c")
def attach(pid, gdb_path=utils.get_default_gdb_path()):
"""Attaches gdb to the target and initializes some of the global variables
Args:
pid (int,str): PID of the process that'll be attached to
gdb_path (str): Path of the gdb binary
Returns:
int: A member of typedefs.ATTACH_RESULT
Note:
If gdb is already initialized, gdb_path will be ignored
"""
global currentpid
pid = int(pid)
traced_by = utils.is_traced(pid)
pid_control_list = [
# Attaching PINCE to itself makes PINCE freeze immediately because gdb freezes the target on attach
(lambda: pid == self_pid, typedefs.ATTACH_RESULT.ATTACH_SELF),
(lambda: not utils.is_process_valid(pid), typedefs.ATTACH_RESULT.PROCESS_NOT_VALID),
(lambda: pid == currentpid, typedefs.ATTACH_RESULT.ALREADY_DEBUGGING),
(lambda: traced_by is not None, typedefs.ATTACH_RESULT.ALREADY_TRACED),
(lambda: not can_attach(pid), typedefs.ATTACH_RESULT.PERM_DENIED),
]
for control_func, attach_result in pid_control_list:
if control_func():
return attach_result
if currentpid != -1 or not gdb_initialized:
init_gdb(gdb_path)
global inferior_arch
global mem_file
currentpid = pid
mem_file = "/proc/" + str(currentpid) + "/mem"
utils.create_ipc_path(pid)
utils.create_tmp_path(pid)
send_command("attach " + str(pid))
set_pince_paths()
init_referenced_dicts(pid)
inferior_arch = get_inferior_arch()
utils.execute_script(utils.get_user_path(typedefs.USER_PATHS.PINCEINIT_AA))
return typedefs.ATTACH_RESULT.SUCCESSFUL
def create_process(process_path, args="", ld_preload_path="", gdb_path=utils.get_default_gdb_path()):
"""Creates a new process for debugging and initializes some of the global variables
Current process will be detached even if the create_process call fails
Make sure to save your data before calling this monstrosity
Args:
process_path (str): Absolute path of the target binary
args (str): Arguments of the inferior, optional
ld_preload_path (str): Path of the preloaded .so file, optional
gdb_path (str): Path of the gdb binary
Returns:
bool: True if the process has been created successfully, False otherwise
Note:
If gdb is already initialized, gdb_path will be ignored
"""
global currentpid
global inferior_arch
global mem_file
if currentpid != -1 or not gdb_initialized:
init_gdb(gdb_path)
output = send_command("file " + process_path)
if regexes.gdb_error.search(output):
print("An error occurred while trying to create process from the file at " + process_path)
detach()
return False
send_command("starti")
wait_for_stop()
entry_point = find_entry_point()
if entry_point:
send_command("tbreak *" + entry_point)
else:
send_command("tbreak _start")
send_command("set args " + args)
if ld_preload_path:
send_command("set exec-wrapper env 'LD_PRELOAD=" + ld_preload_path + "'")
send_command("run")
# We have to wait till breakpoint hits
wait_for_stop()
pid = get_inferior_pid()
currentpid = int(pid)
mem_file = "/proc/" + str(currentpid) + "/mem"
utils.create_ipc_path(pid)
utils.create_tmp_path(pid)
set_pince_paths()
init_referenced_dicts(pid)
inferior_arch = get_inferior_arch()
utils.execute_script(utils.get_user_path(typedefs.USER_PATHS.PINCEINIT_AA))
return True
def detach():
"""See you, space cowboy"""
global gdb_initialized
global currentpid
old_pid = currentpid
if gdb_initialized:
global child
global inferior_status
currentpid = -1
inferior_status = -1
gdb_initialized = False
child.close()
if old_pid != -1:
utils.delete_ipc_path(old_pid)
print("Detached from the process with PID:" + str(old_pid))
def toggle_attach():
"""Detaches from the current process without ending the season if currently attached. Attaches back if detached
Returns:
int: The new state of the process as a member of typedefs.TOGGLE_ATTACH
None: If detaching or attaching fails
"""
if currentpid == -1:
return
if is_attached():
if regexes.gdb_error.search(send_command("phase-out")):
return
return typedefs.TOGGLE_ATTACH.DETACHED
if regexes.gdb_error.search(send_command("phase-in")):
return
return typedefs.TOGGLE_ATTACH.ATTACHED
def is_attached():
"""Checks if gdb is attached to the current process
Returns:
bool: True if attached, False if not
"""
if regexes.gdb_error.search(send_command("info proc")):
return False
return True
def inject_with_advanced_injection(library_path):
"""Injects the given .so file to current process
Args:
library_path (str): Path to the .so file that'll be injected
Returns:
bool: Result of the injection
Note:
This function was reserved for linux-inject and since linux-inject is no more(F to pay respects), I'll leave
this function as a template for now
"""
raise NotImplementedError
def inject_with_dlopen_call(library_path):
"""Injects the given .so file to current process
This is a variant of the function inject_with_advanced_injection
This function won't break the target process unlike other complex injection methods
The downside is it fails if the target doesn't support dlopen calls or simply doesn't have the library
Args:
library_path (str): Path to the .so file that'll be injected
Returns:
bool: Result of the injection
"""
# TODO: Merge injection functions and rename them to inject_so once advanced injection is implemented
injectionpath = '"' + library_path + '"'
result = call_function_from_inferior("dlopen(" + injectionpath + ", 1)")[1]
if result == "0" or not result:
new_result = call_function_from_inferior("__libc_dlopen_mode(" + injectionpath + ", 1)")[1]
if new_result == "0" or not new_result:
return False
return True
return True
def read_pointer_chain(pointer_request: typedefs.PointerChainRequest) -> typedefs.PointerChainResult | None:
"""Reads the addresses pointed by this pointer chain
Args:
pointer_request (typedefs.PointerChainRequest): class containing a base_address and an offsets list
Returns:
typedefs.PointerChainResult: Class containing every pointer dereference result while walking the chain
None: If an error occurs while reading the given pointer chain
"""
if not isinstance(pointer_request, typedefs.PointerChainRequest):
raise TypeError("Passed non-PointerChainRequest type to read_pointer_chain!")
if inferior_arch == typedefs.INFERIOR_ARCH.ARCH_32:
value_index = typedefs.VALUE_INDEX.INT32
else:
value_index = typedefs.VALUE_INDEX.INT64
# Simple addresses first, examine_expression takes much longer time, especially for larger tables
try:
start_address = int(pointer_request.base_address, 0)
except (ValueError, TypeError):
start_address = examine_expression(pointer_request.base_address).address
pointer_results: typedefs.PointerChainResult = typedefs.PointerChainResult()
try:
with memory_handle() as mem_handle:
# Dereference the first address which is the base or (base + offset)
deref_address = read_memory(start_address, value_index, mem_handle=mem_handle)
if deref_address is None:
# Simply return None because no point reading further if base is not valid
return None
pointer_results.pointer_chain.append(deref_address)
for index, offset in enumerate(pointer_request.offsets_list):
# If deref_address is 0, we found an invalid read in the chain
# so we can just keep adding 0 until the end of offsets list
if deref_address == 0:
pointer_results.pointer_chain.append(0)
continue
offset_address = deref_address + offset
if index != len(pointer_request.offsets_list) - 1: # CE derefs every offset except for the last one
deref_address = read_memory(offset_address, value_index, mem_handle=mem_handle)
if deref_address is None:
deref_address = 0
else:
deref_address = offset_address
pointer_results.pointer_chain.append(deref_address)
except OSError:
return None
return pointer_results
def memory_handle():
"""
Acquire the handle of the currently attached process
Returns:
BinaryIO: A file handle that points to the memory file of the current process
"""
return open(mem_file, "rb")
def read_memory(
address: str | int,
value_index: int,
length: int = 0,
zero_terminate: bool = True,
value_repr: int = typedefs.VALUE_REPR.UNSIGNED,
endian: int = typedefs.ENDIANNESS.HOST,
mem_handle: io.BufferedReader | None = None,
) -> str | float | int | None:
"""Reads value from the given address
Args:
address (str, int): Can be a hex string or an integer.
value_index (int): Determines the type of data read. Can be a member of typedefs.VALUE_INDEX
length (int): Length of the data that'll be read. Must be greater than 0. Only used when the value_index is
STRING or AOB. Ignored otherwise
zero_terminate (bool): If True, data will be split when a null character has been read. Only used when
value_index is STRING. Ignored otherwise
value_repr (int): Can be a member of typedefs.VALUE_REPR. Only usable with integer types
endian (int): Can be a member of typedefs.ENDIANNESS
mem_handle (io.BufferedReader, None): A file handle that points to the memory file of the current process
This parameter is used for optimization, See memory_handle
Don't forget to close the handle after you're done if you use this parameter manually
Returns:
str: If the value_index is STRING or AOB, also when value_repr is HEX
float: If the value_index is FLOAT32 or FLOAT64
int: If the value_index is anything else
None: If an error occurs while reading the given address
"""
try:
value_index = int(value_index)
except:
# print(str(value_index) + " is not a valid value index")
return
if not type(address) == int:
try:
address = int(address, 0)
except:
# print(str(address) + " is not a valid address")
return
packed_data = typedefs.index_to_valuetype_dict.get(value_index, -1)
if typedefs.VALUE_INDEX.is_string(value_index):
try:
length = int(length)
except:
# print(str(length) + " is not a valid length")
return
if not length > 0:
# print("length must be greater than 0")
return
expected_length = length * typedefs.string_index_to_multiplier_dict.get(value_index, 1)
elif value_index is typedefs.VALUE_INDEX.AOB:
try:
expected_length = int(length)
except:
# print(str(length) + " is not a valid length")
return
if not expected_length > 0:
# print("length must be greater than 0")
return
else:
expected_length = packed_data[0]
data_type = packed_data[1]
try:
if not mem_handle:
mem_handle = open(mem_file, "rb")
mem_handle.seek(address)
data_read = mem_handle.read(expected_length)
if endian != typedefs.ENDIANNESS.HOST and system_endianness != endian:
data_read = data_read[::-1]
except (OSError, ValueError):
# TODO (read/write error output)
# Disabled read error printing. If needed, find a way to implement error logging with this function
# I've initially thought about enabling it on demand via a parameter but this function already has too many
# Maybe creating a function that toggles logging on and off? Other functions could use it too
# print("Can't access the memory at address " + hex(address) + " or offset " + hex(address + expected_length))
return
if typedefs.VALUE_INDEX.is_string(value_index):
encoding, option = typedefs.string_index_to_encoding_dict[value_index]
returned_string = data_read.decode(encoding, option)
if zero_terminate:
if returned_string.startswith("\x00"):
returned_string = "\x00"
else:
returned_string = returned_string.split("\x00")[0]
return returned_string[0:length]
elif value_index is typedefs.VALUE_INDEX.AOB:
return " ".join(format(n, "02x") for n in data_read)
else:
is_integer = typedefs.VALUE_INDEX.is_integer(value_index)
if is_integer and value_repr == typedefs.VALUE_REPR.SIGNED:
data_type = data_type.lower()
result = struct.unpack_from(data_type, data_read)[0]
if is_integer and value_repr == typedefs.VALUE_REPR.HEX:
return hex(result)
return result
def write_memory(
address: str | int,
value_index: int,
value: str | int | float | list[int],
zero_terminate=True,
endian=typedefs.ENDIANNESS.HOST,
):
"""Sets the given value to the given address
If any errors occurs while setting value to the according address, it'll be ignored but the information about
error will be printed to the terminal.
Args:
address (str, int): Can be a hex string or an integer
value_index (int): Can be a member of typedefs.VALUE_INDEX
value (str, int, float, list): The value that'll be written to the given address
zero_terminate (bool): If True, appends a null byte to the value. Only used when value_index is STRING
endian (int): Can be a member of typedefs.ENDIANNESS
Notes:
TODO: Implement a mem_handle parameter for optimization, check read_memory for an example
If a file handle fails to write to an address, it becomes unusable
You have to reopen the file to continue writing
"""
if not type(address) == int:
try:
address = int(address, 0)
except:
# print(str(address) + " is not a valid address")
return
if isinstance(value, str):
write_data = utils.parse_string(value, value_index)
if write_data is None:
return
else:
write_data = value
encoding, option = typedefs.string_index_to_encoding_dict.get(value_index, (None, None))
if encoding is None:
if value_index is typedefs.VALUE_INDEX.AOB:
write_data = bytearray(write_data)
else:
data_type = typedefs.index_to_struct_pack_dict.get(value_index, -1)
write_data = struct.pack(data_type, write_data)
else:
write_data = write_data.encode(encoding, option)
if zero_terminate:
write_data += b"\x00"
if endian != typedefs.ENDIANNESS.HOST and system_endianness != endian:
write_data = write_data[::-1]
FILE = open(mem_file, "rb+")
try:
FILE.seek(address)
FILE.write(write_data)
FILE.close()
except (OSError, ValueError):
# Refer to TODO (read/write error output)
# print("Can't access the memory at address " + hex(address) + " or offset " + hex(address + len(write_data)))
return
def disassemble(expression, offset_or_address):
"""Disassembles the address evaluated by the given expression
Args:
expression (str): Any gdb expression
offset_or_address (str): If you pass this parameter as an offset, you should add "+" in front of it
(e.g "+42" or "+0x42"). If you pass this parameter as an hex address, the address range between the expression
and the secondary address is disassembled
If the second parameter is an address, it always should be bigger than the first address
Returns:
list: A list of str values in this format-->[(address1, bytes1, opcodes1), (address2, ...), ...]
"""
output = send_command("disas /r " + expression + "," + offset_or_address)
disas_data = []
for line in output.splitlines():
result = regexes.disassemble_output.search(line)
if result:
disas_data.append(result.groups())
return disas_data
def convert_to_hex(expression):
"""Converts numeric values in the expression into their hex equivalents
Respects edge cases like indexed maps and keeps indexes as decimals
Args:
expression (str): Any gdb expression
Returns:
str: Converted str
"""
# TODO (lldb): We'll most likely write our own expression parser once we switch to lldb
# Merge this function with examine_expression and gdbutils.examine_expression once that happens
return regexes.expression_with_hex.sub(
lambda m: "0x" + m.group(1) if m.group(1) and not examine_expression(m.group(1)).symbol else m.group(0),
expression,
)
def examine_expression(expression):
"""Evaluates the given expression and returns evaluated value, address and symbol
Args:
expression (str): Any gdb expression
Returns:
typedefs.tuple_examine_expression: Evaluated value, address and symbol in a tuple
Any erroneous field will be returned as None instead of str
"""
if currentpid == -1:
return typedefs.tuple_examine_expression(None, None, None)
return send_command(