# -*- coding: utf-8 -*-
"""
Copyright (C) 2016-2017 Korcan Karaokçu <korcankaraokcu@gmail.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from threading import Lock, Thread, Condition
from time import sleep, time
from collections import OrderedDict, defaultdict
import pexpect, os, sys, ctypes, pickle, shelve, re, struct, io, traceback
from . import utils, typedefs, regexes
self_pid = os.getpid()
libc = ctypes.CDLL("libc.so.6")
system_endianness = typedefs.ENDIANNESS.LITTLE if sys.byteorder == "little" else typedefs.ENDIANNESS.BIG
# A boolean value. True if gdb is initialized, False if not
gdb_initialized = False
# An integer. Can be a member of typedefs.INFERIOR_ARCH
inferior_arch = int
# An integer. Can be a member of typedefs.INFERIOR_STATUS
inferior_status = -1
# An integer. PID of the current attached/created process
currentpid = -1
# An integer. Can be a member of typedefs.STOP_REASON
stop_reason = int
# A dictionary. Holds breakpoint numbers and what to do on hit
# Format: {bp_num1:on_hit1, bp_num2:on_hit2, ...}
breakpoint_on_hit_dict = {}
# A dictionary. Holds address and aob of instructions that were nop'ed out
# Format: {address1:orig_instruction1_aob, address2:orig_instruction2_aob, ...}
modified_instructions_dict = {}
# If an action such as deletion or condition modification happens in one of the breakpoints in a list, others in the
# same list will get affected as well
# Format: [[[address1, size1], [address2, size2], ...], [[address1, size1], ...], ...]
chained_breakpoints = []
child = object # this object will be used with pexpect operations
# This Lock is used by the function send_command to ensure synchronous execution
lock_send_command = Lock()
# This condition is notified whenever status of the inferior changes
# Use the variable inferior_status to get information about inferior's status
# See PINCE's CheckInferiorStatus class for an example
status_changed_condition = Condition()
# This condition is notified if the current inferior gets terminated
# See PINCE's AwaitProcessExit class for an example
process_exited_condition = Condition()
# This condition is notified if gdb starts to wait for the prompt output
# See function send_command for an example
gdb_waiting_for_prompt_condition = Condition()
# A string. Stores the output of the last command
gdb_output = ""
# An instance of typedefs.RegisterQueue. Updated whenever GDB receives an async event such as breakpoint modification
# See PINCE's AwaitAsyncOutput class for an example of usage
gdb_async_output = typedefs.RegisterQueue()
# A boolean value. Used to cancel the last gdb command sent
# Use the function cancel_last_command to make use of this variable
# Return value of the current send_command call will be an empty string
cancel_send_command = False
# A boolean value. Used by state_observe_thread to check if a trace session is active
active_trace = False
# A string. Holds the last command sent to gdb
last_gdb_command = ""
# A list of booleans. Used to adjust gdb output
# Use the function set_gdb_output_mode to make use of this variable
gdb_output_mode = typedefs.gdb_output_mode(True, True, True)
# A string. memory file of the currently attached/created process
mem_file = "/proc/" + str(currentpid) + "/mem"
# A string. Determines which signal to use to interrupt the process
interrupt_signal = "SIGINT"
"""
When PINCE was first launched, it used gdb 7.7.1, which is a very outdated version of gdb
interpreter-exec mi command of gdb showed some buggy behaviour at that time
Because of that, PINCE couldn't support gdb/mi commands for a while
But PINCE is now updated with the new versions of gdb as much as possible and the interpreter-exec works much better
So, old parts of codebase still get their required information by parsing gdb console output
New parts can try to rely on gdb/mi output
"""
"""
Functions that require breakpoint commands, such as track_watchpoint and track_breakpoint, requires process to be
stopped beforehand. If the process is running before we give the breakpoint its commands, there's a chance that the
breakpoint will be triggered before we give it commands. The process must be stopped to avoid this race condition
It's also necessary to stop the process to run commands like "watch"
"""
[docs]
def set_gdb_output_mode(output_mode_tuple):
"""Adjusts gdb output
Args:
output_mode_tuple (typedefs.gdb_output_mode): Setting any field True will enable the output that's associated
with that field. Setting it False will disable the associated output
"""
global gdb_output_mode
gdb_output_mode = output_mode_tuple
[docs]
def cancel_last_command():
"""Cancels the last gdb command sent if it's still present"""
if lock_send_command.locked():
global cancel_send_command
cancel_send_command = True
[docs]
def send_command(
command, control=False, cli_output=False, send_with_file=False, file_contents_send=None, recv_with_file=False
):
"""Issues the command sent, raises an exception if gdb isn't initiated
Args:
command (str): The command that'll be sent
control (bool): This param should be True if the command sent is ctrl+key instead of the regular command
cli_output (bool): If True, returns a readable cli output instead of gdb/mi output
send_with_file (bool): Custom commands declared in gdbextensions.py requires file communication. If
called command has any parameters, pass this as True
file_contents_send (any): Arguments for the custom gdb command called
recv_with_file (bool): Pass this as True if the called custom gdb command returns something
Examples:
send_command(c,control=True)--> Sends ctrl+c instead of the str "c"
send_command("pince-read-addresses", file_contents_send=nested_list, recv_file=True)--> This line calls the
custom gdb command "pince-read-addresses" with parameter nested_list and since that gdb command returns the
addresses read as a list, we also pass the parameter recv_file as True
Returns:
str: Result of the command sent, commands in the form of "ctrl+key" always returns a null string
???: If recv_with_file is True. Content of the returned thing depends on the command sent
Note:
TODO:This bug doesn't seem like to exist anymore. Remove the unnecessary file communication layer of IPC
File communication system is used to avoid BEL emitting bug of pexpect. If you send more than a certain amount
of characters to gdb, the input will be sheared at somewhere and gdb won't be receiving all of the input
Visit this page for more information-->http://pexpect.readthedocs.io/en/stable/commonissues.html
You don't have to write interpreter-exec while sending a gdb/mi command. Just pass the gdb/mi command as itself.
This function will convert it automatically.
"""
global child
global gdb_output
global cancel_send_command
global last_gdb_command
with lock_send_command:
if gdb_output_mode.command_info:
time0 = time()
if not gdb_initialized:
raise typedefs.GDBInitializeException
gdb_output = ""
if send_with_file:
send_file = utils.get_from_pince_file(currentpid)
pickle.dump(file_contents_send, open(send_file, "wb"))
if recv_with_file or cli_output:
recv_file = utils.get_to_pince_file(currentpid)
# Truncating the recv_file because we wouldn't like to see output of previous command in case of errors
open(recv_file, "w").close()
command = str(command)
command = 'interpreter-exec mi "' + command + '"' if command.startswith("-") else command
last_gdb_command = command if not control else "Ctrl+" + command
if gdb_output_mode.command_info:
print("Last command: " + last_gdb_command)
if control:
child.sendcontrol(command)
else:
command_file = utils.get_gdb_command_file(currentpid)
command_fd = open(command_file, "r+")
command_fd.truncate()
command_fd.write(command)
command_fd.close()
if not cli_output:
child.sendline("source " + command_file)
else:
child.sendline("cli-output source " + command_file)
if not control:
while not gdb_output:
sleep(typedefs.CONST_TIME.GDB_INPUT_SLEEP)
if cancel_send_command:
break
if not cancel_send_command:
if recv_with_file or cli_output:
output = pickle.load(open(recv_file, "rb"))
else:
output = gdb_output
else:
output = ""
child.sendcontrol("c")
with gdb_waiting_for_prompt_condition:
gdb_waiting_for_prompt_condition.wait()
else:
output = ""
if gdb_output_mode.command_info:
time1 = time()
try:
print(time1 - time0)
except NameError:
pass
cancel_send_command = False
return output
[docs]
def state_observe_thread():
"""
Observes the state of gdb, uses conditions to inform other functions and threads about gdb's state
Also generates output for send_command function
Should be called by creating a thread. Usually called in initialization process by attach function
"""
def check_inferior_status():
matches = regexes.gdb_state_observe.findall(child.before)
if len(matches) > 0:
global stop_reason
global inferior_status
old_status = inferior_status
for match in matches:
if match[0].startswith('stopped,reason="exited'):
with process_exited_condition:
detach()
print(f"Process terminated (PID:{currentpid})")
process_exited_condition.notify_all()
return
# For multiline outputs, only the last async event is important
# Get the last match only to optimize parsing
stop_info = matches[-1][0]
if stop_info:
stop_reason = typedefs.STOP_REASON.DEBUG
inferior_status = typedefs.INFERIOR_STATUS.STOPPED
else:
inferior_status = typedefs.INFERIOR_STATUS.RUNNING
bp_num = regexes.breakpoint_number.search(stop_info)
# Return -1 for invalid breakpoints to ignore racing conditions
if not (
old_status == inferior_status
or (bp_num and breakpoint_on_hit_dict.get(bp_num.group(1), -1) != typedefs.BREAKPOINT_ON_HIT.BREAK)
or active_trace
):
with status_changed_condition:
status_changed_condition.notify_all()
global child
global gdb_output
try:
while True:
child.expect_exact("\r\n") # A new line for TTY devices
child.before = child.before.strip()
if not child.before:
continue
check_inferior_status()
command_file = re.escape(utils.get_gdb_command_file(currentpid))
if regexes.gdb_command_source(command_file).search(child.before):
child.expect_exact("(gdb)")
child.before = child.before.strip()
check_inferior_status()
gdb_output = child.before
with gdb_waiting_for_prompt_condition:
gdb_waiting_for_prompt_condition.notify_all()
if gdb_output_mode.command_output:
print(child.before)
else:
if gdb_output_mode.async_output:
print(child.before)
gdb_async_output.broadcast_message(child.before)
except (OSError, ValueError, pexpect.EOF) as e:
if isinstance(e, pexpect.EOF):
print("\nEOF exception caught within pexpect, here's the contents of child.before:\n" + child.before)
print("Exiting state_observe_thread")
[docs]
def execute_func_temporary_interruption(func, *args, **kwargs):
"""Interrupts the inferior before executing the given function, continues inferior's execution after calling the
given function
!!!WARNING!!! This function is NOT thread-safe. Use it with caution!
Args:
func (function): The function that'll be called between interrupt&continue routine
*args (args): Arguments for the function that'll be called
**kwargs (kwargs): Keyword arguments for the function that'll be called
Returns:
???: Result of the given function. Return type depends on the given function
"""
old_status = inferior_status
if old_status == typedefs.INFERIOR_STATUS.RUNNING:
interrupt_inferior(typedefs.STOP_REASON.PAUSE)
result = func(*args, **kwargs)
if old_status == typedefs.INFERIOR_STATUS.RUNNING:
continue_inferior()
return result
[docs]
def execute_with_temporary_interruption(func):
"""Decorator version of execute_func_temporary_interruption"""
def wrapper(*args, **kwargs):
return execute_func_temporary_interruption(func, *args, **kwargs)
return wrapper
[docs]
def can_attach(pid):
"""Check if we can attach to the target
Args:
pid (int,str): PID of the process that'll be attached
Returns:
bool: True if attaching is successful, False otherwise
"""
result = libc.ptrace(16, int(pid), 0, 0) # 16 is PTRACE_ATTACH, check ptrace.h for details
if result == -1:
return False
os.waitpid(int(pid), 0)
libc.ptrace(17, int(pid), 0, 17) # 17 is PTRACE_DETACH, check ptrace.h for details
sleep(0.01)
return True
[docs]
def wait_for_stop(timeout=0):
"""Block execution till the inferior stops
Args:
timeout (float): Timeout time in seconds, passing 0 will wait for stop indefinitely
"""
remaining_time = timeout
while inferior_status == typedefs.INFERIOR_STATUS.RUNNING:
sleep(0.0001)
if timeout == 0:
continue
remaining_time -= 0.0001
if remaining_time < 0:
break
[docs]
def interrupt_inferior(interrupt_reason=typedefs.STOP_REASON.DEBUG):
"""Interrupt the inferior
Args:
interrupt_reason (int): Just changes the global variable stop_reason. Can be a member of typedefs.STOP_REASON
"""
if currentpid == -1:
return
global stop_reason
if interrupt_signal == "SIGINT":
send_command("interrupt")
elif inferior_status == typedefs.INFERIOR_STATUS.RUNNING:
sig_num = interrupt_signal[3:]
if sig_num.isnumeric():
os.system(f"kill -{sig_num} {currentpid}")
else:
os.system(f"kill -s {interrupt_signal} {currentpid}")
wait_for_stop(1)
stop_reason = interrupt_reason
[docs]
def continue_inferior():
"""Continue the inferior"""
if currentpid == -1:
return
send_command("c&")
[docs]
def step_instruction():
"""Step one assembly instruction"""
send_command("stepi&")
[docs]
def step_over_instruction():
"""Step over one assembly instruction"""
send_command("nexti&")
[docs]
def execute_till_return():
"""Continues inferior till current stack frame returns"""
send_command("finish&")
[docs]
def set_interrupt_signal(signal_name):
"""Decides on what signal to use to stop the process
Args:
signal_name (str): Name of the signal
"""
global interrupt_signal
handle_signal(signal_name, True, False)
interrupt_signal = signal_name
[docs]
def handle_signal(signal_name: str, stop: bool, pass_to_program: bool) -> None:
"""Decides on what will GDB do when the process recieves a signal
Args:
signal_name (str): Name of the signal
stop (bool): Stop the program and print to the console
pass_to_program (bool): Pass signal to program
"""
params = [[signal_name, stop, pass_to_program]]
send_command("pince-handle-signals", send_with_file=True, file_contents_send=params)
[docs]
def handle_signals(signal_list):
"""Optimized version of handle_signal for multiple signals
Args:
signal_list (list): A list of the parameters of handle_signal
"""
send_command("pince-handle-signals", send_with_file=True, file_contents_send=signal_list)
[docs]
def init_gdb(gdb_path=utils.get_default_gdb_path()):
"""Spawns gdb and initializes/resets some of the global variables
Args:
gdb_path (str): Path of the gdb binary
Returns:
bool: True if initialization is successful, False otherwise
Note:
Calling init_gdb() will reset the current session
"""
global child
global gdb_initialized
global breakpoint_on_hit_dict
global chained_breakpoints
global gdb_output
global cancel_send_command
global last_gdb_command
utils.init_user_files()
detach()
# Temporary IPC_PATH, this little hack is needed because send_command requires a valid IPC_PATH
utils.create_ipc_path(currentpid)
utils.create_tmp_path(currentpid)
breakpoint_on_hit_dict.clear()
chained_breakpoints.clear()
gdb_output = ""
cancel_send_command = False
last_gdb_command = ""
libpince_dir = utils.get_libpince_directory()
is_appimage = os.environ.get("APPDIR")
python_home_env = f"PYTHONHOME={os.environ.get('PYTHONHOME')}" if is_appimage else ""
child = pexpect.spawn(
f"sudo -E --preserve-env=PATH LC_NUMERIC=C {python_home_env} {gdb_path} --nx --interpreter=mi",
cwd=libpince_dir,
env=os.environ,
encoding="utf-8",
)
child.setecho(False)
child.delaybeforesend = 0
child.timeout = None
try:
child.expect_exact("(gdb)")
except pexpect.EOF:
print("\nEOF exception caught within pexpect, here's the contents of child.before:\n" + child.before)
return False
status_thread = Thread(target=state_observe_thread)
status_thread.daemon = True
status_thread.start()
gdb_initialized = True
set_logging(False)
if not is_appimage:
send_command("source ./gdbinit_venv")
set_pince_paths()
send_command("source " + utils.get_user_path(typedefs.USER_PATHS.GDBINIT))
utils.execute_script(utils.get_user_path(typedefs.USER_PATHS.PINCEINIT))
return True
[docs]
def set_logging(state):
"""Sets logging on or off
Args:
state (bool): Sets logging on if True, off if False
"""
send_command("set logging enabled off")
send_command("set logging file " + utils.get_logging_file(currentpid))
if state:
send_command("set logging enabled on")
[docs]
def set_pince_paths():
"""Initializes $PINCE_PATH and $GDBINIT_AA_PATH convenience variables to make commands in gdbextensions.py
and gdbutils.py work. GDB scripts need to know libpince and .config directories, unfortunately they don't start
from the place where script exists
"""
libpince_dir = utils.get_libpince_directory()
pince_dir = os.path.dirname(libpince_dir)
gdbinit_aa_dir = utils.get_user_path(typedefs.USER_PATHS.GDBINIT_AA)
send_command("set $GDBINIT_AA_PATH=" + '"' + gdbinit_aa_dir + '"')
send_command("set $PINCE_PATH=" + '"' + pince_dir + '"')
send_command("source gdb_python_scripts/gdbextensions.py")
[docs]
def init_referenced_dicts(pid):
"""Initializes referenced dict shelve databases
Args:
pid (int,str): PID of the attached process
"""
shelve.open(utils.get_referenced_strings_file(pid), "c")
shelve.open(utils.get_referenced_jumps_file(pid), "c")
shelve.open(utils.get_referenced_calls_file(pid), "c")
[docs]
def attach(pid, gdb_path=utils.get_default_gdb_path()):
"""Attaches gdb to the target and initializes some of the global variables
Args:
pid (int,str): PID of the process that'll be attached to
gdb_path (str): Path of the gdb binary
Returns:
int: A member of typedefs.ATTACH_RESULT
Note:
If gdb is already initialized, gdb_path will be ignored
"""
global currentpid
pid = int(pid)
traced_by = utils.is_traced(pid)
pid_control_list = [
# Attaching PINCE to itself makes PINCE freeze immediately because gdb freezes the target on attach
(lambda: pid == self_pid, typedefs.ATTACH_RESULT.ATTACH_SELF),
(lambda: not utils.is_process_valid(pid), typedefs.ATTACH_RESULT.PROCESS_NOT_VALID),
(lambda: pid == currentpid, typedefs.ATTACH_RESULT.ALREADY_DEBUGGING),
(lambda: traced_by is not None, typedefs.ATTACH_RESULT.ALREADY_TRACED),
(lambda: not can_attach(pid), typedefs.ATTACH_RESULT.PERM_DENIED),
]
for control_func, attach_result in pid_control_list:
if control_func():
return attach_result
if currentpid != -1 or not gdb_initialized:
init_gdb(gdb_path)
global inferior_arch
global mem_file
currentpid = pid
mem_file = "/proc/" + str(currentpid) + "/mem"
utils.create_ipc_path(pid)
utils.create_tmp_path(pid)
send_command("attach " + str(pid))
set_pince_paths()
init_referenced_dicts(pid)
inferior_arch = get_inferior_arch()
utils.execute_script(utils.get_user_path(typedefs.USER_PATHS.PINCEINIT_AA))
return typedefs.ATTACH_RESULT.SUCCESSFUL
[docs]
def create_process(process_path, args="", ld_preload_path="", gdb_path=utils.get_default_gdb_path()):
"""Creates a new process for debugging and initializes some of the global variables
Current process will be detached even if the create_process call fails
Make sure to save your data before calling this monstrosity
Args:
process_path (str): Absolute path of the target binary
args (str): Arguments of the inferior, optional
ld_preload_path (str): Path of the preloaded .so file, optional
gdb_path (str): Path of the gdb binary
Returns:
bool: True if the process has been created successfully, False otherwise
Note:
If gdb is already initialized, gdb_path will be ignored
"""
global currentpid
global inferior_arch
global mem_file
if currentpid != -1 or not gdb_initialized:
init_gdb(gdb_path)
output = send_command("file " + process_path)
if regexes.gdb_error.search(output):
print("An error occurred while trying to create process from the file at " + process_path)
detach()
return False
send_command("starti")
wait_for_stop()
entry_point = find_entry_point()
if entry_point:
send_command("tbreak *" + entry_point)
else:
send_command("tbreak _start")
send_command("set args " + args)
if ld_preload_path:
send_command("set exec-wrapper env 'LD_PRELOAD=" + ld_preload_path + "'")
send_command("run")
# We have to wait till breakpoint hits
wait_for_stop()
pid = get_inferior_pid()
currentpid = int(pid)
mem_file = "/proc/" + str(currentpid) + "/mem"
utils.create_ipc_path(pid)
utils.create_tmp_path(pid)
set_pince_paths()
init_referenced_dicts(pid)
inferior_arch = get_inferior_arch()
utils.execute_script(utils.get_user_path(typedefs.USER_PATHS.PINCEINIT_AA))
return True
[docs]
def detach():
"""See you, space cowboy"""
global gdb_initialized
global currentpid
old_pid = currentpid
if gdb_initialized:
global child
global inferior_status
currentpid = -1
inferior_status = -1
gdb_initialized = False
child.close()
if old_pid != -1:
utils.delete_ipc_path(old_pid)
print("Detached from the process with PID:" + str(old_pid))
[docs]
def toggle_attach():
"""Detaches from the current process without ending the season if currently attached. Attaches back if detached
Returns:
int: The new state of the process as a member of typedefs.TOGGLE_ATTACH
None: If detaching or attaching fails
"""
if currentpid == -1:
return
if is_attached():
if regexes.gdb_error.search(send_command("phase-out")):
return
return typedefs.TOGGLE_ATTACH.DETACHED
if regexes.gdb_error.search(send_command("phase-in")):
return
return typedefs.TOGGLE_ATTACH.ATTACHED
[docs]
def is_attached():
"""Checks if gdb is attached to the current process
Returns:
bool: True if attached, False if not
"""
if regexes.gdb_error.search(send_command("info proc")):
return False
return True
[docs]
def inject_with_advanced_injection(library_path):
"""Injects the given .so file to current process
Args:
library_path (str): Path to the .so file that'll be injected
Returns:
bool: Result of the injection
Note:
This function was reserved for linux-inject and since linux-inject is no more(F to pay respects), I'll leave
this function as a template for now
"""
raise NotImplementedError
[docs]
def inject_with_dlopen_call(library_path):
"""Injects the given .so file to current process
This is a variant of the function inject_with_advanced_injection
This function won't break the target process unlike other complex injection methods
The downside is it fails if the target doesn't support dlopen calls or simply doesn't have the library
Args:
library_path (str): Path to the .so file that'll be injected
Returns:
bool: Result of the injection
"""
# TODO: Merge injection functions and rename them to inject_so once advanced injection is implemented
injectionpath = '"' + library_path + '"'
result = call_function_from_inferior("dlopen(" + injectionpath + ", 1)")[1]
if result == "0" or not result:
new_result = call_function_from_inferior("__libc_dlopen_mode(" + injectionpath + ", 1)")[1]
if new_result == "0" or not new_result:
return False
return True
return True
[docs]
def read_pointer_chain(pointer_request: typedefs.PointerChainRequest) -> typedefs.PointerChainResult | None:
"""Reads the addresses pointed by this pointer chain
Args:
pointer_request (typedefs.PointerChainRequest): class containing a base_address and an offsets list
Returns:
typedefs.PointerChainResult: Class containing every pointer dereference result while walking the chain
None: If an error occurs while reading the given pointer chain
"""
if not isinstance(pointer_request, typedefs.PointerChainRequest):
raise TypeError("Passed non-PointerChainRequest type to read_pointer_chain!")
if inferior_arch == typedefs.INFERIOR_ARCH.ARCH_32:
value_index = typedefs.VALUE_INDEX.INT32
else:
value_index = typedefs.VALUE_INDEX.INT64
# Simple addresses first, examine_expression takes much longer time, especially for larger tables
try:
start_address = int(pointer_request.base_address, 0)
except (ValueError, TypeError):
start_address = examine_expression(pointer_request.base_address).address
pointer_results: typedefs.PointerChainResult = typedefs.PointerChainResult()
try:
with memory_handle() as mem_handle:
# Dereference the first address which is the base or (base + offset)
deref_address = read_memory(start_address, value_index, mem_handle=mem_handle)
if deref_address is None:
# Simply return None because no point reading further if base is not valid
return None
pointer_results.pointer_chain.append(deref_address)
for index, offset in enumerate(pointer_request.offsets_list):
# If deref_address is 0, we found an invalid read in the chain
# so we can just keep adding 0 until the end of offsets list
if deref_address == 0:
pointer_results.pointer_chain.append(0)
continue
offset_address = deref_address + offset
if index != len(pointer_request.offsets_list) - 1: # CE derefs every offset except for the last one
deref_address = read_memory(offset_address, value_index, mem_handle=mem_handle)
if deref_address is None:
deref_address = 0
else:
deref_address = offset_address
pointer_results.pointer_chain.append(deref_address)
except OSError:
return None
return pointer_results
[docs]
def memory_handle():
"""
Acquire the handle of the currently attached process
Returns:
BinaryIO: A file handle that points to the memory file of the current process
"""
return open(mem_file, "rb")
[docs]
def read_memory(
address: str | int,
value_index: int,
length: int = 0,
zero_terminate: bool = True,
value_repr: int = typedefs.VALUE_REPR.UNSIGNED,
endian: int = typedefs.ENDIANNESS.HOST,
mem_handle: io.BufferedReader | None = None,
) -> str | float | int | None:
"""Reads value from the given address
Args:
address (str, int): Can be a hex string or an integer.
value_index (int): Determines the type of data read. Can be a member of typedefs.VALUE_INDEX
length (int): Length of the data that'll be read. Must be greater than 0. Only used when the value_index is
STRING or AOB. Ignored otherwise
zero_terminate (bool): If True, data will be split when a null character has been read. Only used when
value_index is STRING. Ignored otherwise
value_repr (int): Can be a member of typedefs.VALUE_REPR. Only usable with integer types
endian (int): Can be a member of typedefs.ENDIANNESS
mem_handle (io.BufferedReader, None): A file handle that points to the memory file of the current process
This parameter is used for optimization, See memory_handle
Don't forget to close the handle after you're done if you use this parameter manually
Returns:
str: If the value_index is STRING or AOB, also when value_repr is HEX
float: If the value_index is FLOAT32 or FLOAT64
int: If the value_index is anything else
None: If an error occurs while reading the given address
"""
try:
value_index = int(value_index)
except:
# print(str(value_index) + " is not a valid value index")
return
if not type(address) == int:
try:
address = int(address, 0)
except:
# print(str(address) + " is not a valid address")
return
packed_data = typedefs.index_to_valuetype_dict.get(value_index, -1)
if typedefs.VALUE_INDEX.is_string(value_index):
try:
length = int(length)
except:
# print(str(length) + " is not a valid length")
return
if not length > 0:
# print("length must be greater than 0")
return
expected_length = length * typedefs.string_index_to_multiplier_dict.get(value_index, 1)
elif value_index is typedefs.VALUE_INDEX.AOB:
try:
expected_length = int(length)
except:
# print(str(length) + " is not a valid length")
return
if not expected_length > 0:
# print("length must be greater than 0")
return
else:
expected_length = packed_data[0]
data_type = packed_data[1]
try:
if not mem_handle:
mem_handle = open(mem_file, "rb")
mem_handle.seek(address)
data_read = mem_handle.read(expected_length)
if endian != typedefs.ENDIANNESS.HOST and system_endianness != endian:
data_read = data_read[::-1]
except (OSError, ValueError):
# TODO (read/write error output)
# Disabled read error printing. If needed, find a way to implement error logging with this function
# I've initially thought about enabling it on demand via a parameter but this function already has too many
# Maybe creating a function that toggles logging on and off? Other functions could use it too
# print("Can't access the memory at address " + hex(address) + " or offset " + hex(address + expected_length))
return
if typedefs.VALUE_INDEX.is_string(value_index):
encoding, option = typedefs.string_index_to_encoding_dict[value_index]
returned_string = data_read.decode(encoding, option)
if zero_terminate:
if returned_string.startswith("\x00"):
returned_string = "\x00"
else:
returned_string = returned_string.split("\x00")[0]
return returned_string[0:length]
elif value_index is typedefs.VALUE_INDEX.AOB:
return " ".join(format(n, "02x") for n in data_read)
else:
is_integer = typedefs.VALUE_INDEX.is_integer(value_index)
if is_integer and value_repr == typedefs.VALUE_REPR.SIGNED:
data_type = data_type.lower()
result = struct.unpack_from(data_type, data_read)[0]
if is_integer and value_repr == typedefs.VALUE_REPR.HEX:
return hex(result)
return result
[docs]
def write_memory(
address: str | int,
value_index: int,
value: str | int | float | list[int],
zero_terminate=True,
endian=typedefs.ENDIANNESS.HOST,
):
"""Sets the given value to the given address
If any errors occurs while setting value to the according address, it'll be ignored but the information about
error will be printed to the terminal.
Args:
address (str, int): Can be a hex string or an integer
value_index (int): Can be a member of typedefs.VALUE_INDEX
value (str, int, float, list): The value that'll be written to the given address
zero_terminate (bool): If True, appends a null byte to the value. Only used when value_index is STRING
endian (int): Can be a member of typedefs.ENDIANNESS
Notes:
TODO: Implement a mem_handle parameter for optimization, check read_memory for an example
If a file handle fails to write to an address, it becomes unusable
You have to reopen the file to continue writing
"""
if not type(address) == int:
try:
address = int(address, 0)
except:
# print(str(address) + " is not a valid address")
return
if isinstance(value, str):
write_data = utils.parse_string(value, value_index)
if write_data is None:
return
else:
write_data = value
encoding, option = typedefs.string_index_to_encoding_dict.get(value_index, (None, None))
if encoding is None:
if value_index is typedefs.VALUE_INDEX.AOB:
write_data = bytearray(write_data)
else:
data_type = typedefs.index_to_struct_pack_dict.get(value_index, -1)
write_data = struct.pack(data_type, write_data)
else:
write_data = write_data.encode(encoding, option)
if zero_terminate:
write_data += b"\x00"
if endian != typedefs.ENDIANNESS.HOST and system_endianness != endian:
write_data = write_data[::-1]
FILE = open(mem_file, "rb+")
try:
FILE.seek(address)
FILE.write(write_data)
FILE.close()
except (OSError, ValueError):
# Refer to TODO (read/write error output)
# print("Can't access the memory at address " + hex(address) + " or offset " + hex(address + len(write_data)))
return
[docs]
def disassemble(expression, offset_or_address):
"""Disassembles the address evaluated by the given expression
Args:
expression (str): Any gdb expression
offset_or_address (str): If you pass this parameter as an offset, you should add "+" in front of it
(e.g "+42" or "+0x42"). If you pass this parameter as an hex address, the address range between the expression
and the secondary address is disassembled
If the second parameter is an address, it always should be bigger than the first address
Returns:
list: A list of str values in this format-->[(address1, bytes1, opcodes1), (address2, ...), ...]
"""
output = send_command("disas /r " + expression + "," + offset_or_address)
disas_data = []
for line in output.splitlines():
result = regexes.disassemble_output.search(line)
if result:
disas_data.append(result.groups())
return disas_data
[docs]
def convert_to_hex(expression):
"""Converts numeric values in the expression into their hex equivalents
Respects edge cases like indexed maps and keeps indexes as decimals
Args:
expression (str): Any gdb expression
Returns:
str: Converted str
"""
# TODO (lldb): We'll most likely write our own expression parser once we switch to lldb
# Merge this function with examine_expression and gdbutils.examine_expression once that happens
return regexes.expression_with_hex.sub(
lambda m: "0x" + m.group(1) if m.group(1) and not examine_expression(m.group(1)).symbol else m.group(0),
expression,
)
[docs]
def examine_expression(expression):
"""Evaluates the given expression and returns evaluated value, address and symbol
Args:
expression (str): Any gdb expression
Returns:
typedefs.tuple_examine_expression: Evaluated value, address and symbol in a tuple
Any erroneous field will be returned as None instead of str
"""
if currentpid == -1:
return typedefs.tuple_examine_expression(None, None, None)
return send_command(
"pince-examine-expressions", send_with_file=True, file_contents_send=[expression], recv_with_file=True
)[0]
[docs]
def examine_expressions(expression_list):
"""Optimized version of examine_expression for multiple inputs
Args:
expression_list (list): List of gdb expressions as str
Returns:
list: List of typedefs.tuple_examine_expression
"""
if not expression_list:
return []
if currentpid == -1:
return [typedefs.tuple_examine_expression(None, None, None) for _ in range(len(expression_list))]
return send_command(
"pince-examine-expressions", send_with_file=True, file_contents_send=expression_list, recv_with_file=True
)
[docs]
def parse_and_eval(expression, cast=str):
"""Calls gdb.parse_and_eval with the given expression and returns the value after casting with the given type
Use examine_expression if your data can be expressed as an address or a symbol, use this function otherwise
Unlike examine_expression, this function can read data that has void type or multiple type representations
For instance:
- $eflags has both str and int reprs
- $_siginfo is a struct with many fields
- x64 register convenience vars such as $rax are void if the process is x86
Args:
expression (str): Any gdb expression
cast (type): Evaluated value will be cast to this type in gdb
Returns:
cast: Self-explanatory
None: If casting fails
"""
return send_command(
"pince-parse-and-eval", send_with_file=True, file_contents_send=(expression, cast), recv_with_file=True
)
[docs]
def get_thread_info():
"""Invokes "info threads" command and returns the line corresponding to the current thread
Returns:
str: Current thread information
None: If the output doesn't fit the regex
"""
thread_info = send_command("info threads")
return re.sub(r'\\"', r'"', regexes.thread_info.search(thread_info).group(1))
[docs]
def find_closest_instruction_address(address, instruction_location="next", instruction_count=1):
"""Finds address of the closest instruction next to the given address, assuming that the given address is valid
Args:
address (str): Hex address or any gdb expression that can be used in disas command
instruction_location (str): If it's "next", instructions coming after the address is searched
If it's "previous", the instructions coming before the address is searched instead
instruction_count (int): Number of the instructions that'll be looked for
Returns:
str: The address found as hex string. If starting/ending of a valid memory range is reached, starting/ending
address is returned instead as hex string.
Note:
From gdb version 7.12 and onwards, inputting negative numbers in x command are supported(x/-3i for instance)
So, modifying this function according to the changes in 7.12 may speed up things a little bit but also breaks
the backwards compatibility. The speed gain is not much of a big deal compared to backwards compatibility, so
I'm not changing this function for now
"""
assert instruction_location in ["next", "previous"], "invalid instruction_location"
if instruction_location == "next":
offset = "+" + str(instruction_count * 30)
disas_data = disassemble(address, address + offset)
else:
offset = "-" + str(instruction_count * 30)
disas_data = disassemble(address + offset, address)
if not disas_data:
if instruction_location != "next":
start_address = hex(utils.get_region_info(currentpid, address).start)
disas_data = disassemble(start_address, address)
if instruction_location == "next":
try:
return utils.extract_address(disas_data[instruction_count][0])
except IndexError:
return hex(utils.get_region_info(currentpid, address).end)
else:
try:
return utils.extract_address(disas_data[-instruction_count][0])
except IndexError:
try:
return start_address
except UnboundLocalError:
return hex(utils.get_region_info(currentpid, address).start)
[docs]
def get_address_info(expression):
"""Runs the gdb command "info symbol" for given expression and returns the result of it
Args:
expression (str): Any gdb expression
Returns:
str: The result of the command "info symbol" for given expression
"""
return send_command("info symbol " + expression, cli_output=True)
[docs]
def get_symbol_info(expression):
"""Runs the gdb command "info address" for given expression and returns the result of it
Args:
expression (str): Any gdb expression
Returns:
str: The result of the command "info address" for given expression
"""
return send_command("info address " + expression, cli_output=True)
[docs]
def search_functions(expression, case_sensitive=False):
"""Runs the gdb command "info functions" for given expression and returns the result of it
Args:
expression (str): Any gdb expression
case_sensitive (bool): If True, search will be case sensitive
Returns:
list: A list of str-->[(address1, symbol1), (address2, symbol2), ...]
address will be None if the corresponding symbol is in defined category
Todo:
GDB-MI wiki points out to the command -symbol-list-functions but apparently it isn't implemented yet
If the feature below gets implemented, use it instead
https://sourceware.org/bugzilla/show_bug.cgi?id=23796
Add ability to show addresses of defined symbols when it gets implemented by gdb
Please don't try to write a symbol parser for every single language out there, it's an overkill
https://sourceware.org/bugzilla/show_bug.cgi?id=23899
"""
return send_command(
"pince-search-functions",
send_with_file=True,
file_contents_send=(expression, case_sensitive),
recv_with_file=True,
)
[docs]
def get_inferior_pid():
"""Get pid of the current inferior
Returns:
str: pid
"""
output = send_command("info inferior")
return regexes.inferior_pid.search(output).group(1)
[docs]
def get_inferior_arch():
"""Returns the architecture of the current inferior
Returns:
int: A member of typedefs.INFERIOR_ARCH
"""
if parse_and_eval("$rax") == "void":
return typedefs.INFERIOR_ARCH.ARCH_32
return typedefs.INFERIOR_ARCH.ARCH_64
[docs]
def read_registers():
"""Returns the current registers
Returns:
dict: A dict that holds general, flag and segment registers. Check typedefs.REGISTERS for the full list
"""
return send_command("pince-read-registers", recv_with_file=True)
[docs]
def read_float_registers() -> OrderedDict[str, str]:
"""Returns the current floating point registers
Returns:
OrderedDict[str, str]: A dict that holds floating point registers. Check typedefs.REGISTERS.FLOAT for the full list
Note:
Returned xmm values are based on xmm.v4_float
"""
return send_command("pince-read-float-registers", recv_with_file=True)
[docs]
def set_convenience_variable(variable, value):
"""Sets given convenience variable to given value
Can be also used for modifying registers directly
Args:
variable (str): Any gdb convenience variable(with "$" character removed)
value (str): Anything
"""
send_command("set $" + variable + "=" + value)
[docs]
def set_register_flag(flag, value):
"""Sets given register flag to given value
Args:
flag (str): A member of typedefs.REGISTERS.FLAG
value (Union[int,str]): 0 or 1
"""
registers = read_registers()
value = str(value)
registers[flag] = value
if value != "0" and value != "1":
raise Exception(value + " isn't valid value. It can be only 0 or 1")
if flag not in typedefs.REGISTERS.FLAG:
raise Exception(flag + " isn't a valid flag, must be a member of typedefs.REGISTERS.FLAG")
eflags_hex_value = hex(
int(
registers["of"]
+ registers["df"]
+ registers["if"]
+ registers["tf"]
+ registers["sf"]
+ registers["zf"]
+ "0"
+ registers["af"]
+ "0"
+ registers["pf"]
+ "0"
+ registers["cf"],
2,
)
)
set_convenience_variable("eflags", eflags_hex_value)
[docs]
def get_stacktrace_info():
"""Returns information about current stacktrace
Returns:
list: A list of str values in this format-->[[return_address_info1,frame_address_info1],[info2, ...], ...]
return_address_info looks like this-->Return address of frame+symbol-->0x40c431 <_start>
frame_address_info looks like this-->Beginning of frame+distance from stack pointer-->0x7ffe1e989a40(rsp+0x100)
"""
return send_command("pince-get-stack-trace-info", recv_with_file=True)
[docs]
def get_stack_info(from_base_pointer: bool = False) -> list[str]:
"""Returns information about current stack
Also can view stack from EBP or RBP register
Returns:
list: A list of str values in this format--▼
[[stack_pointer_info1,hex_value1,pointer_info1],[stack_pointer_info2, ...], ...]
stack_pointer_info looks like this-->Hex address+distance from stack pointer-->0x7ffd0d232f88(rsp+0xff8)
hex_value looks like this-->Value hold by corresponding address-->0x1bfda20
pointer_info shows the value hold by hex_value address. It looks like this--▼
if points to a string-->(str)Some String
if points to a symbol-->(ptr)<function_name>
pointer_info becomes a null string if pointer isn't valid
"""
if from_base_pointer:
return send_command("pince-get-stack-info from-base-pointer", recv_with_file=True)
else:
return send_command("pince-get-stack-info", recv_with_file=True)
[docs]
def get_stack_frame_return_addresses():
"""Returns return addresses of stack frames
Returns:
list: A list of str values in this format-->[return_address_info1,return_address_info2, ...]
return_address_info looks like this-->Return address of frame+symbol-->0x40c431 <_start>
"""
return send_command("pince-get-frame-return-addresses", recv_with_file=True)
[docs]
def get_stack_frame_info(index):
"""Returns information about stack by the given index
Args:
index (int,str): Index of the frame
Returns:
str: Information that looks like this::
Stack level 0, frame at 0x7ffc5f87f6a0:
rip = 0x7fd1d639412d in poll (../sysdeps/unix/syscall-template.S:81); saved rip = 0x7fd1d27fcfe4
called by frame at 0x7ffc5f87f700
source language asm.
Arglist at 0x7ffc5f87f688, args:
Locals at 0x7ffc5f87f688, Previous frame's sp is 0x7ffc5f87f6a0
Saved registers:
rip at 0x7ffc5f87f698
"""
return send_command("pince-get-frame-info", send_with_file=True, file_contents_send=str(index), recv_with_file=True)
[docs]
def hex_dump(address, offset):
"""Returns hex dump of range (address to address+offset)
Args:
address (int): Self-explanatory
offset (int): The range that'll be read
Returns:
list: List of strings read as str. If an error occurs while reading a memory cell, that cell is returned as "??"
An empty list is returned if an error occurs
Examples:
returned list-->["??","??","??","7f","43","67","40","??","??, ...]
"""
hex_byte_list = []
with open(mem_file, "rb") as FILE:
try:
FILE.seek(address)
except (OSError, ValueError):
pass
for item in range(offset):
try:
current_item = " ".join(format(n, "02x") for n in FILE.read(1))
except OSError:
current_item = "??"
try:
FILE.seek(1, io.SEEK_CUR) # Necessary since read() failed to execute
except (OSError, ValueError):
pass
hex_byte_list.append(utils.upper_hex(current_item))
return hex_byte_list
[docs]
def get_modified_instructions():
"""Returns currently modified instructions
Returns:
dict: A dictionary where the key is the start address of instruction and value is the aob before modifying
"""
global modified_instructions_dict
return modified_instructions_dict
[docs]
def nop_instruction(start_address, length_of_instr):
"""Replaces an instruction's opcodes with NOPs
Args:
start_address (int): Self-explanatory
length_of_instr (int): Length of the instruction that'll be NOP'ed
Returns:
None
"""
old_aob = " ".join(hex_dump(start_address, length_of_instr))
global modified_instructions_dict
if start_address not in modified_instructions_dict:
modified_instructions_dict[start_address] = old_aob
nop_aob = "90 " * length_of_instr
write_memory(start_address, typedefs.VALUE_INDEX.AOB, nop_aob)
[docs]
def modify_instruction(start_address, array_of_bytes):
"""Replaces an instruction's opcodes with a new AOB
Args:
start_address (int): Self-explanatory
array_of_bytes (str): String that contains the replacement bytes of the instruction
Returns:
None
"""
length = len(array_of_bytes.split())
old_aob = " ".join(hex_dump(start_address, length))
global modified_instructions_dict
if start_address not in modified_instructions_dict:
modified_instructions_dict[start_address] = old_aob
write_memory(start_address, typedefs.VALUE_INDEX.AOB, array_of_bytes)
[docs]
def restore_instruction(start_address):
"""Restores a modified instruction to it's original opcodes
Args:
start_address (int): Self-explanatory
Returns:
None
"""
global modified_instructions_dict
array_of_bytes = modified_instructions_dict.pop(start_address)
write_memory(start_address, typedefs.VALUE_INDEX.AOB, array_of_bytes)
[docs]
def get_breakpoint_info() -> list[typedefs.tuple_breakpoint_info]:
"""Returns current breakpoint/watchpoint list
Returns:
list: A list of typedefs.tuple_breakpoint_info where;
number is the gdb breakpoint number
breakpoint_type is the breakpoint type
disp shows what will be done after breakpoint hits
enabled shows if the breakpoint enabled or disabled
address is the address of breakpoint
size is the size of breakpoint
on_hit is the action that'll happen when the breakpoint is reached
hit_count shows how many times the breakpoint has been hit
enable_count shows how many times the breakpoint will get hit before it gets disabled
condition is the condition of breakpoint
size-->int
everything else-->str
Note:
GDB's python API can't detect hardware breakpoints, that's why we are using parser for this job
"""
returned_list = []
multiple_break_data = OrderedDict()
raw_info = send_command("-break-list")
# Temporary fix for https://sourceware.org/bugzilla/show_bug.cgi?id=9659
# TODO:Delete this line when gdb or pygdbmi fixes the problem
raw_info = re.sub(r"script={(.*?)}", r"script=[\g<1>]", raw_info) # Please refer to issue #53
for item in utils.parse_response(raw_info)["payload"]["BreakpointTable"]["body"]:
item = defaultdict(lambda: "", item)
number, breakpoint_type, disp, enabled, address, what, condition, hit_count, enable_count = (
item["number"],
item["type"],
item["disp"],
item["enabled"],
item["addr"],
item["what"],
item["cond"],
item["times"],
item["enable"],
)
if address == "<MULTIPLE>":
multiple_break_data[number] = (breakpoint_type, disp, condition, hit_count)
continue
if not breakpoint_type:
number = number.split(".")[0]
breakpoint_type, disp, condition, hit_count = multiple_break_data[number]
if what:
address = utils.extract_address(what)
if not address:
address = examine_expression(what).address
on_hit_dict_value = breakpoint_on_hit_dict.get(number, typedefs.BREAKPOINT_ON_HIT.BREAK)
on_hit = typedefs.on_hit_to_text_dict.get(on_hit_dict_value, "Unknown")
if breakpoint_type.find("breakpoint") >= 0:
size = 1
else:
possible_size = regexes.breakpoint_size.search(what)
if possible_size:
size = int(possible_size.group(1))
else:
size = 1
returned_list.append(
typedefs.tuple_breakpoint_info(
number, breakpoint_type, disp, enabled, address, size, on_hit, hit_count, enable_count, condition
)
)
return returned_list
[docs]
def get_breakpoints_in_range(address: str | int, length: int = 1) -> list[typedefs.tuple_breakpoint_info]:
"""Checks if given address exists in breakpoint list
Args:
address (str,int): Start address of the range, hex address or an int
length (int): If this parameter is bigger than 1, the range between address and address+length-1 will be
checked instead of just the address itself
Returns:
list: A list of typedefs.tuple_breakpoint_info, info of the existing breakpoints for given address range
"""
breakpoint_list = []
if type(address) != int:
address = int(address, 0)
max_address = max(address, address + length - 1)
min_address = min(address, address + length - 1)
breakpoint_info = get_breakpoint_info()
for item in breakpoint_info:
breakpoint_address = int(item.address, 16)
if not (max_address < breakpoint_address or min_address > breakpoint_address + item.size - 1):
breakpoint_list.append(item)
return breakpoint_list
[docs]
def hardware_breakpoint_available() -> bool:
"""Checks if there is an available hardware breakpoint slot
Returns:
bool: True if there is at least one available slot, False if not
Todo:
Check debug registers to determine hardware breakpoint state rather than relying on gdb output because inferior
might modify its own debug registers
"""
breakpoint_info = get_breakpoint_info()
hw_bp_total = 0
for item in breakpoint_info:
if regexes.hw_breakpoint_count.search(item.breakpoint_type):
hw_bp_total += 1
# Maximum number of hardware breakpoints is limited to 4 in x86 architecture
return hw_bp_total < 4
[docs]
def add_breakpoint(
expression, breakpoint_type=typedefs.BREAKPOINT_TYPE.HARDWARE, on_hit=typedefs.BREAKPOINT_ON_HIT.BREAK
):
"""Adds a breakpoint at the address evaluated by the given expression. Uses a software breakpoint if all hardware
breakpoint slots are being used
Args:
expression (str): Any gdb expression
breakpoint_type (int): Can be a member of typedefs.BREAKPOINT_TYPE
on_hit (int): Can be a member of typedefs.BREAKPOINT_ON_HIT
Returns:
str: Number of the breakpoint set
None: If setting breakpoint fails
"""
output = ""
str_address = examine_expression(expression).address
if not str_address:
print("expression for breakpoint is not valid")
return
if get_breakpoints_in_range(str_address):
print("breakpoint/watchpoint for address " + str_address + " is already set")
return
if breakpoint_type == typedefs.BREAKPOINT_TYPE.HARDWARE:
if hardware_breakpoint_available():
output = send_command("hbreak *" + str_address)
else:
print("All hardware breakpoint slots are being used, using a software breakpoint instead")
output = send_command("break *" + str_address)
elif breakpoint_type == typedefs.BREAKPOINT_TYPE.SOFTWARE:
output = send_command("break *" + str_address)
if regexes.breakpoint_created.search(output):
global breakpoint_on_hit_dict
number = regexes.breakpoint_number.search(output).group(1)
breakpoint_on_hit_dict[number] = on_hit
return number
else:
return
[docs]
@execute_with_temporary_interruption
def add_watchpoint(
expression: str,
length: int = 4,
watchpoint_type: int = typedefs.WATCHPOINT_TYPE.BOTH,
on_hit: int = typedefs.BREAKPOINT_ON_HIT.BREAK,
) -> list[str]:
"""Adds a watchpoint at the address evaluated by the given expression
Args:
expression (str): Any gdb expression
length (int): Length of the watchpoint
watchpoint_type (int): Can be a member of typedefs.WATCHPOINT_TYPE
on_hit (int): Can be a member of typedefs.BREAKPOINT_ON_HIT
Returns:
list: Numbers of the successfully set breakpoints as strings
"""
str_address = examine_expression(expression).address
if not str_address:
print("expression for watchpoint is not valid")
return
if watchpoint_type == typedefs.WATCHPOINT_TYPE.WRITE_ONLY:
watch_command = "watch"
elif watchpoint_type == typedefs.WATCHPOINT_TYPE.READ_ONLY:
watch_command = "rwatch"
elif watchpoint_type == typedefs.WATCHPOINT_TYPE.BOTH:
watch_command = "awatch"
remaining_length = length
breakpoints_set = []
arch = get_inferior_arch()
str_address_int = int(str_address, 16)
breakpoint_addresses = []
if arch == typedefs.INFERIOR_ARCH.ARCH_64:
max_length = 8
else:
max_length = 4
while remaining_length > 0:
if remaining_length >= max_length:
breakpoint_length = max_length
else:
breakpoint_length = remaining_length
if get_breakpoints_in_range(str_address_int, breakpoint_length):
print("breakpoint/watchpoint for address " + hex(str_address_int) + " is already set. Bailing out...")
break
if not hardware_breakpoint_available():
print("All hardware breakpoint slots are being used, unable to set a new watchpoint. Bailing out...")
break
cmd = f"{watch_command} * (char[{breakpoint_length}] *) {hex(str_address_int)}"
output = execute_func_temporary_interruption(send_command, cmd)
if regexes.breakpoint_created.search(output):
breakpoint_addresses.append([str_address_int, breakpoint_length])
else:
print("Failed to create a watchpoint at address " + hex(str_address_int) + ". Bailing out...")
break
breakpoint_number = regexes.breakpoint_number.search(output).group(1)
breakpoints_set.append(breakpoint_number)
global breakpoint_on_hit_dict
breakpoint_on_hit_dict[breakpoint_number] = on_hit
remaining_length -= max_length
str_address_int += max_length
global chained_breakpoints
chained_breakpoints.append(breakpoint_addresses)
return breakpoints_set
[docs]
def modify_breakpoint(expression, modify_what, condition=None, count=None):
"""Adds a condition to the breakpoint at the address evaluated by the given expression
Args:
expression (str): Any gdb expression
modify_what (int): Can be a member of typedefs.BREAKPOINT_MODIFY_TYPES
This function modifies condition of the breakpoint if CONDITION, enables the breakpoint if ENABLE, disables the
breakpoint if DISABLE, enables once then disables after hit if ENABLE_ONCE, enables for specified count then
disables after the count is reached if ENABLE_COUNT, enables once then deletes the breakpoint if ENABLE_DELETE
condition (str): Any gdb condition expression. This parameter is only used if modify_what passed as CONDITION
count (int): Only used if modify_what passed as ENABLE_COUNT
Returns:
bool: True if the condition has been set successfully, False otherwise
Examples:
modify_what-->typedefs.BREAKPOINT_MODIFY_TYPES.CONDITION
condition-->$eax==0x523
condition-->$rax>0 && ($rbp<0 || $rsp==0)
condition-->printf($r10)==3
modify_what-->typedefs.BREAKPOINT_MODIFY_TYPES.ENABLE_COUNT
count-->10
"""
str_address = examine_expression(expression).address
if not str_address:
print("expression for breakpoint is not valid")
return False
str_address_int = int(str_address, 16)
modification_list = [[str_address_int]]
for n, item in enumerate(chained_breakpoints):
for breakpoint in item:
if breakpoint[0] <= str_address_int <= breakpoint[0] + breakpoint[1] - 1:
modification_list = item
break
for breakpoint in modification_list:
found_breakpoint = get_breakpoints_in_range(breakpoint[0])
if not found_breakpoint:
print("no such breakpoint exists for address " + str_address)
continue
else:
breakpoint_number = found_breakpoint[0].number
if modify_what == typedefs.BREAKPOINT_MODIFY.CONDITION:
if condition is None:
print("Please set condition first")
return False
send_command("condition " + breakpoint_number + " " + condition)
elif modify_what == typedefs.BREAKPOINT_MODIFY.ENABLE:
send_command("enable " + breakpoint_number)
elif modify_what == typedefs.BREAKPOINT_MODIFY.DISABLE:
send_command("disable " + breakpoint_number)
elif modify_what == typedefs.BREAKPOINT_MODIFY.ENABLE_ONCE:
send_command("enable once " + breakpoint_number)
elif modify_what == typedefs.BREAKPOINT_MODIFY.ENABLE_COUNT:
if count is None:
print("Please set count first")
return False
elif count < 1:
print("Count can't be lower than 1")
return False
send_command("enable count " + str(count) + " " + breakpoint_number)
elif modify_what == typedefs.BREAKPOINT_MODIFY.ENABLE_DELETE:
send_command("enable delete " + breakpoint_number)
else:
print("Parameter modify_what is not valid")
return False
return True
[docs]
def delete_breakpoint(expression):
"""Deletes a breakpoint at the address evaluated by the given expression
Args:
expression (str): Any gdb expression
Returns:
bool: True if the breakpoint has been deleted successfully, False otherwise
"""
str_address = examine_expression(expression).address
if not str_address:
print("expression for breakpoint is not valid")
return False
str_address_int = int(str_address, 16)
deletion_list = [[str_address_int]]
global chained_breakpoints
for n, item in enumerate(chained_breakpoints):
for breakpoint in item:
if breakpoint[0] <= str_address_int <= breakpoint[0] + breakpoint[1] - 1:
deletion_list = item
del chained_breakpoints[n]
break
for breakpoint in deletion_list:
found_breakpoint = get_breakpoints_in_range(breakpoint[0])
if not found_breakpoint:
print("no such breakpoint exists for address " + str_address)
continue
else:
breakpoint_number = found_breakpoint[0].number
global breakpoint_on_hit_dict
try:
del breakpoint_on_hit_dict[breakpoint_number]
except KeyError:
pass
send_command("delete " + breakpoint_number)
return True
[docs]
@execute_with_temporary_interruption
def track_watchpoint(expression, length, watchpoint_type):
"""Starts tracking a value by setting a watchpoint at the address holding it
Use get_track_watchpoint_info() to get info about the watchpoint you set
Args:
expression (str): Any gdb expression
length (int): Length of the watchpoint
watchpoint_type (int): Can be a member of typedefs.WATCHPOINT_TYPE
Returns:
list: Numbers of the successfully set breakpoints as strings
None: If fails to set any watchpoint
"""
breakpoints = add_watchpoint(expression, length, watchpoint_type, typedefs.BREAKPOINT_ON_HIT.FIND_CODE)
if not breakpoints:
return
for breakpoint in breakpoints:
send_command(
"commands " + breakpoint + "\npince-get-track-watchpoint-info " + str(breakpoints) + "\nc&" + "\nend"
)
return breakpoints
[docs]
def get_track_watchpoint_info(watchpoint_list):
"""Gathers the information for the tracked watchpoint(s)
Args:
watchpoint_list (list): A list that holds the watchpoint numbers, must be returned from track_watchpoint()
Returns:
dict: Holds the program counter addresses at the moment watchpoint hits as keys
Format of dict--> {address1:info_list1, address2:info_list2, ...}
Format of info_list--> [count, previous_pc_address, register_info, float_info, disas_info]
count-->(int) Count of the hits for the same pc address
previous_pc_address-->(str) The address of the instruction that comes before the instruction pc address
holds. If there's no previous address available(end of region etc.), previous_pc_address=pc_address
register_info-->(dict) Same dict returned from read_registers()
float_info-->(dict) Same dict returned from read_float_registers()
disas_info-->(str) A small section that's disassembled just after previous_pc_counter
"""
track_watchpoint_file = utils.get_track_watchpoint_file(currentpid, watchpoint_list)
try:
output = pickle.load(open(track_watchpoint_file, "rb"))
except:
output = ""
return output
[docs]
@execute_with_temporary_interruption
def track_breakpoint(expression, register_expressions):
"""Starts tracking a value by setting a breakpoint at the address holding it
Use get_track_breakpoint_info() to get info about the breakpoint you set
Args:
expression (str): Any gdb expression
register_expressions (str): Register expressions, separated by a comma. Registers should start with "$"
PINCE will gather info about values presented by register expressions every time the breakpoint is reached
For instance, passing "$rax,$rcx+5,$rbp+$r12" will make PINCE track values rax, rcx+5 and rbp+r12
Returns:
str: Number of the breakpoint set
None: If fails to set any breakpoint
"""
breakpoint = add_breakpoint(expression, on_hit=typedefs.BREAKPOINT_ON_HIT.FIND_ADDR)
if not breakpoint:
return
# TODO (lldb): When we switch to LLDB, remove c& and only continue if there isn't an active trace
# Apply the same for track_watchpoint
send_command(
"commands "
+ breakpoint
+ "\npince-get-track-breakpoint-info "
+ register_expressions.replace(" ", "")
+ ","
+ breakpoint
+ "\nc&"
+ "\nend"
)
return breakpoint
[docs]
def get_track_breakpoint_info(breakpoint):
"""Gathers the information for the tracked breakpoint
Args:
breakpoint (str): breakpoint number, must be returned from track_breakpoint()
Returns:
dict: Holds the register expressions as keys and their info as values
Format of dict--> {expression1:expression_info_dict1, expression2:expression_info_dict2, ...}
expression-->(str) The register expression
Format of expression_info_dict--> {value1:count1, value2:count2, ...}
value-->(str) Value calculated by given register expression as hex str
count-->(int) How many times this expression has been reached
"""
track_breakpoint_file = utils.get_track_breakpoint_file(currentpid, breakpoint)
try:
output = pickle.load(open(track_breakpoint_file, "rb"))
except:
output = ""
return output
[docs]
class Tracer:
def __init__(self) -> None:
"""Use set_breakpoint after init and if it succeeds, use tracer_loop within a thread
There can be only one trace session at a time. Don't create new trace sessions before finishing the last one"""
self.expression = ""
self.max_trace_count = 1000
self.stop_condition = ""
self.step_mode = typedefs.STEP_MODE.SINGLE_STEP
self.stop_after_trace = False
self.collect_registers = True
self.trace_status = typedefs.TRACE_STATUS.IDLE
self.current_trace_count = 0
self.trace_data = ([], None)
self.cancel = False
utils.change_trace_status(currentpid, self.trace_status)
[docs]
@execute_with_temporary_interruption
def set_breakpoint(
self,
expression: str,
max_trace_count: int = 1000,
trigger_condition: str = "",
stop_condition: str = "",
step_mode: typedefs.STEP_MODE = typedefs.STEP_MODE.SINGLE_STEP,
stop_after_trace: bool = False,
collect_registers: bool = True,
) -> str:
"""Sets the breakpoint for tracing instructions at the address evaluated by the given expression
Args:
expression (str): Any gdb expression
max_trace_count (int): Maximum number of steps taken while tracing. Must be greater than or equal to 1
trigger_condition (str): Optional, any gdb expression. Tracing will start if the condition is met
stop_condition (str): Optional, any gdb expression. Tracing will stop whenever the condition is met
step_mode (int): Can be a member of typedefs.STEP_MODE
stop_after_trace (bool): Inferior won't be continuing after the tracing process
collect_registers (bool): Collect registers while stepping
Returns:
str: Number of the breakpoint set
None: If fails to set any breakpoint or if max_trace_count is not valid
"""
if max_trace_count < 1:
print("max_trace_count must be greater than or equal to 1")
return
if type(max_trace_count) != int:
print("max_trace_count must be an integer")
return
breakpoint = add_breakpoint(expression, on_hit=typedefs.BREAKPOINT_ON_HIT.TRACE)
if not breakpoint:
return
modify_breakpoint(expression, typedefs.BREAKPOINT_MODIFY.CONDITION, condition=trigger_condition)
(
self.expression,
self.max_trace_count,
self.stop_condition,
self.step_mode,
self.stop_after_trace,
self.collect_registers,
) = (expression, max_trace_count, stop_condition, step_mode, stop_after_trace, collect_registers)
send_command("commands " + breakpoint + "\npince-trace-instructions\nend")
return breakpoint
[docs]
def tracer_loop(self):
"""The main tracer loop, call within a thread"""
self.current_trace_count = 0
trace_status_file = utils.get_trace_status_file(currentpid)
while not (self.trace_status != typedefs.TRACE_STATUS.IDLE or self.cancel or currentpid == -1):
try:
with open(trace_status_file, "r") as trace_file:
self.trace_status = int(trace_file.read())
except (ValueError, FileNotFoundError):
pass
sleep(0.1)
global active_trace
active_trace = True
delete_breakpoint(self.expression)
self.trace_status = typedefs.TRACE_STATUS.TRACING
# The reason we don't use a tree class is to make the tree json-compatible
# tree format-->[node1, node2, node3, ...]
# node-->[(line_info, register_dict), parent_index, child_index_list]
tree = []
current_index = 0 # Avoid calling len()
current_root_index = 0
root_index = 0
# Root always be an empty node, it's up to you to use or delete it
tree.append([("", None), None, []])
try: # In case process exits during the trace session
for x in range(self.max_trace_count):
if self.cancel or currentpid == -1:
break
line_info = send_command("x/i $pc", cli_output=True).splitlines()[0].split(maxsplit=1)[1]
collect_dict = OrderedDict()
if self.collect_registers:
collect_dict.update(read_registers())
collect_dict.update(read_float_registers())
current_index += 1
tree.append([(line_info, collect_dict), current_root_index, []])
tree[current_root_index][2].append(current_index) # Add a child
self.current_trace_count = x + 1
if regexes.trace_instructions_ret.search(line_info):
if tree[current_root_index][1] is None: # If no parents exist
current_index += 1
tree.append([("", None), None, [current_root_index]])
tree[current_root_index][1] = current_index # Set new parent
current_root_index = current_index # current_node=current_node.parent
root_index = current_root_index # set new root
else:
current_root_index = tree[current_root_index][1] # current_node=current_node.parent
elif self.step_mode == typedefs.STEP_MODE.SINGLE_STEP:
if regexes.trace_instructions_call.search(line_info):
current_root_index = current_index
if self.stop_condition:
try:
if str(parse_and_eval(self.stop_condition)) == "1":
break
except:
pass
if self.step_mode == typedefs.STEP_MODE.SINGLE_STEP:
step_instruction()
elif self.step_mode == typedefs.STEP_MODE.STEP_OVER:
step_over_instruction()
wait_for_stop()
except:
traceback.print_exc()
self.trace_data = (tree, root_index)
self.trace_status = typedefs.TRACE_STATUS.FINISHED
active_trace = False
if not self.stop_after_trace:
continue_inferior()
[docs]
def cancel_trace(self):
"""Prematurely ends the trace session, trace data will still be collected"""
self.cancel = True
[docs]
def call_function_from_inferior(expression):
"""Calls the given function expression from the inferior
Args:
expression (str): Any gdb expression
Returns:
tuple: A tuple containing assigned value and result, both as str
Returns a tuple of (False, False) if the call fails
Examples:
call_function_from_inferior("printf('123')") returns ("$26","3")
"""
result = execute_func_temporary_interruption(send_command, f"call (void*(*)(char*, int)) {expression}")
filtered_result = regexes.convenience_variable.search(result)
if filtered_result:
return filtered_result.group(1), filtered_result.group(2)
return False, False
[docs]
def find_entry_point():
"""Finds entry point of the inferior
Returns:
str: Entry point as hex str
None: If fails to find an entry point
"""
result = send_command("info file")
filtered_result = regexes.entry_point.search(result)
if filtered_result:
return filtered_result.group(1)
[docs]
def search_opcode(searched_str, starting_address, ending_address_or_offset, case_sensitive=False, enable_regex=False):
"""Searches for the given str in the disassembled output
Args:
searched_str (str): String that will be searched
starting_address (str): Any gdb expression
ending_address_or_offset (str): If you pass this parameter as an offset, you should add "+" in front of it
(e.g "+42" or "+0x42"). If you pass this parameter as an hex address, the address range between the expression
and the secondary address is disassembled.
If the second parameter is an address. it always should be bigger than the first address.
case_sensitive (bool): If True, search will be case sensitive
enable_regex (bool): If True, searched_str will be treated as a regex expression
Returns:
list: A list of str values in this format-->[[address1,opcodes1],[address2, ...], ...]
None: If enable_regex is True and given regex isn't valid
"""
if enable_regex:
try:
if case_sensitive:
regex = re.compile(searched_str)
else:
regex = re.compile(searched_str, re.IGNORECASE)
except Exception as e:
print("An exception occurred while trying to compile the given regex\n", str(e))
return
returned_list = []
disas_output = disassemble(starting_address, ending_address_or_offset)
for item in disas_output:
address = item[0]
opcode = item[2]
if enable_regex:
if not regex.search(opcode):
continue
else:
if case_sensitive:
if opcode.find(searched_str) == -1:
continue
else:
if opcode.lower().find(searched_str.lower()) == -1:
continue
returned_list.append([address, opcode])
return returned_list
[docs]
def dissect_code(region_list, discard_invalid_strings=True):
"""Searches given regions for jumps, calls and string references
Use function get_dissect_code_data() to gather the results
Args:
region_list (list): A list of (start_address, end_address) -> (str, str)
Can be returned from functions like utils.filter_regions
discard_invalid_strings (bool): Entries that can't be decoded as utf-8 won't be included in referenced strings
"""
send_command("pince-dissect-code", send_with_file=True, file_contents_send=(region_list, discard_invalid_strings))
[docs]
def get_dissect_code_status():
"""Returns the current state of dissect code process
Returns:
tuple:(current_region, current_region_count, referenced_strings_count,
referenced_jumps_count, referenced_calls_count)
current_region-->(str) Currently scanned memory region
current_region_count-->(str) "Region x of y"
current_range-->(str) Currently scanned memory range(current buffer)
referenced_strings_count-->(int) Count of referenced strings
referenced_jumps_count-->(int) Count of referenced jumps
referenced_calls_count-->(int) Count of referenced calls
Returns a tuple of ("", "", "", 0, 0, 0) if fails to gather info
"""
dissect_code_status_file = utils.get_dissect_code_status_file(currentpid)
try:
output = pickle.load(open(dissect_code_status_file, "rb"))
except:
output = "", "", "", 0, 0, 0
return output
[docs]
def cancel_dissect_code():
"""Finishes the current dissect code process early on"""
if last_gdb_command.find("pince-dissect-code") != -1:
cancel_last_command()
[docs]
def get_dissect_code_data(referenced_strings=True, referenced_jumps=True, referenced_calls=True):
"""Returns shelve.DbfilenameShelf objects of referenced dicts
Args:
referenced_strings (bool): If True, include referenced strings in the returned list
referenced_jumps (bool): If True, include referenced jumps in the returned list
referenced_calls (bool): If True, include referenced calls in the returned list
Returns:
list: A list of shelve.DbfilenameShelf objects. Can be used as dicts, they are backwards compatible
For instance, if you call this function with default params, you'll get this--▼
[referenced_strings_dict,referenced_jumps_dict,referenced_calls_dict]
And if you, let's say, pass referenced_jumps as False, you'll get this instead--▼
[referenced_strings_dict,referenced_calls_dict]
referenced_strings_dict-->(shelve.DbfilenameShelf object) Holds referenced string addresses
Format: {referenced_address1:referrer_address_set1, referenced_address2:referrer_address_set2, ...}
referenced_jumps_dict-->(shelve.DbfilenameShelf object) Holds referenced jump addresses
Format: {referenced_address1:referenced_by_dict1, referenced_address2:referenced_by_dict2, ...}
Format of referenced_by_dict: {address1:opcode1, address2:opcode2, ...}
referenced_calls_dict-->(shelve.DbfilenameShelf object) Holds referenced call addresses
Format: {referenced_address1:referrer_address_set1, referenced_address2:referrer_address_set2, ...}
"""
dict_list = []
if referenced_strings:
dict_list.append(shelve.open(utils.get_referenced_strings_file(currentpid), "r"))
if referenced_jumps:
dict_list.append(shelve.open(utils.get_referenced_jumps_file(currentpid), "r"))
if referenced_calls:
dict_list.append(shelve.open(utils.get_referenced_calls_file(currentpid), "r"))
return dict_list
[docs]
def search_referenced_strings(
searched_str, value_index=typedefs.VALUE_INDEX.STRING_UTF8, case_sensitive=False, enable_regex=False
):
"""Searches for given str in the referenced strings
Args:
searched_str (str): String that will be searched
value_index (int): Can be a member of typedefs.VALUE_INDEX
case_sensitive (bool): If True, search will be case sensitive
enable_regex (bool): If True, searched_str will be treated as a regex expression
Returns:
list: [[referenced_address1, reference_count1, found_value1], ...]
None: If enable_regex is True and searched_str isn't a valid regex expression
"""
if enable_regex:
try:
if case_sensitive:
regex = re.compile(searched_str)
else:
regex = re.compile(searched_str, re.IGNORECASE)
except Exception as e:
print("An exception occurred while trying to compile the given regex\n", str(e))
return
str_dict = get_dissect_code_data(True, False, False)[0]
mem_handle = memory_handle()
returned_list = []
for address, refs in str_dict.items():
value = read_memory(int(address, 16), value_index, 100, mem_handle=mem_handle)
value_str = "" if value is None else str(value)
if not value_str:
continue
if enable_regex:
if not regex.search(value_str):
continue
else:
if case_sensitive:
if value_str.find(searched_str) == -1:
continue
else:
if value_str.lower().find(searched_str.lower()) == -1:
continue
returned_list.append((address, len(refs), value))
str_dict.close()
mem_handle.close()
return returned_list
[docs]
def search_referenced_calls(searched_str, case_sensitive=True, enable_regex=False):
"""Searches for given str in the referenced calls
Args:
searched_str (str): String that will be searched
case_sensitive (bool): If True, search will be case sensitive
enable_regex (bool): If True, searched_str will be treated as a regex expression
Returns:
list: [[referenced_address1, found_string1], ...]
None: If enable_regex is True and searched_str isn't a valid regex expression
"""
param_str = (searched_str, case_sensitive, enable_regex)
return send_command("pince-search-referenced-calls " + str(param_str), recv_with_file=True)
[docs]
def complete_command(gdb_command):
"""Tries to complete the given gdb command and returns completion possibilities
Args:
gdb_command (str): The gdb command that'll be completed
Returns:
list: Possible completions as a list of str
"""
returned_list = []
for item in send_command("complete " + gdb_command, cli_output=True).splitlines():
if not regexes.max_completions_reached.search(item):
returned_list.append(item)
return returned_list