Angr 中的函数识别
函数识别概述
二进制文件通过IDA或者radare2这样的反汇编工具,能够识别二进制文件中的函数边界信息,并根据其调用关系生成整个程序的函数调用图。因此准确识别二进制文件中的函数边界对于进一步的分析二进制文件非常重要。二进制文件通常组织为 数据、代码、元数据的形式,在没有strip掉符号表的二进制文件中,关于函数的起始偏移、大小通常在元数据中可以直接找到,但是对于去除符号表的二进制文件,由于其符号表的缺失,通常需要采用别的以下方法进行函数识别:
函数序言和尾声
通过对call、jmp等指令所跳转的地址进行分析
分析不可达代码位置
但是采用以上方法同样会存在一些不准确的问题,比如依赖函数序言和尾声的检查,在使用不同编译器或者优化选项的情况下,采用这种硬编码的方式可能就不再适用;对于call或者jmp指令的分析,可能很多地址只有在运行时才能确定等等。
Angr中的函数识别
1. 初始化阶段
Angr中函数识别的过程是在CFGFast中构建CFG的时候进行的,在初始化阶段主要步骤如下:
首先调用
self._executable_memory_regions
获取二进制文件有执行权限的region,将这些区间作为候选分析位置-
然后对得到的regions进行一些筛选和处理,同时对分析进行一些设置,比如设置是否有符号表、是否收集数据引用信息等。这里可以使用objdump命令查看文件的符号表信息:
- 查看动态符号表信息
nevv@ubuntu:~/angr/others$ objdump -T cgibin | grep "main" 004023e0 g DF .text 00000304 Base main 00409480 g DF .text 000005ec Base hedwigcgi_main 00408a78 g DF .text 00000198 Base captchacgi_main 0040b930 g DF .text 00000b28 Base conntrackcgi_main 00406b40 g DF .text 00000008 Base dlcfg_main 0040a268 g DF .text 000003d4 Base servicecgi_main 0040cb5c g DF .text 00000604 Base hnap_main 0040a6f0 g DF .text 00000218 Base ssdpcgi_main 0040b818 g DF .text 00000110 Base genacgi_main 00407164 g DF .text 000000e4 Base fwupdater_main 00409ae0 g DF .text 00000620 Base pigwidgeoncgi_main 00406b48 g DF .text 00000008 Base fwup_main 004195a0 DF *UND* 00000000 __uClibc_main 00405284 g DF .text 000002c8 Base phpcgi_main 0040ae50 g DF .text 00000484 Base soapcgi_main 00408e10 g DF .text 0000066c Base sessioncgi_main 00406b50 g DF .text 00000008 Base seamacgi_main
- 查看静态符号表信息:
nevv@ubuntu:~/angr/others$ objdump -t cgibin cgibin: file format elf32-tradlittlemips SYMBOL TABLE: no symbols
两者的区别如下:
静态链接中有一个专门的段叫符号表 -- “.symtab”(Symbol Table), 里面保存了所有关于该目标文件的符号的定义和引用。
动态链接中同样有一个段叫 动态符号表 -- “.dynsym”(Dynamic Symbol) , 但.dynsym 相对于 .symtab 只保存了与动态链接相关的导入导出符号。so中同样有.symtab,其中保存着所有的符号。
最后调用
self._analyze
进行前向分析
2. 分析阶段
_analyze
方法很简单,主要就是self._pre_analysis()
和根据self._graph_visitor
是否存在来调用对应的分析方法,对于还没有建立一个图结构的分析来说(就比如cfg恢复分析),第一次总是会调用self._analysis_core_baremetal()
方法。
def _analyze(self):
"""
The main analysis routine.
:return: None
"""
self._pre_analysis()
if self._graph_visitor is None:
# There is no base graph that we can rely on. The analysis itself should generate successors for the
# current job.
# An example is the CFG recovery.
self._analysis_core_baremetal()
else:
# We have a base graph to follow. Just handle the current job.
self._analysis_core_graph()
self._post_analysis()
0x1 _pre_analysis
这个函数的主要功能就是,初始化分析过程中需要用到的变量,使用符号表,将符号表中的函数起始位置作为分析的起始位置,使用函数序言搜索函数,并将搜索结果保存在 _function_prologue_addrs
中。
def _pre_analysis(self):
import pdb
pdb.set_trace()
# 初始化一些cfg相关的变量
self._initialize_cfg()
# Scan for __x86_return_thunk and friends
self._known_thunks = self._find_thunks()
"""
这里应该是会寻找一些特殊的字节序列
>>> print disasm('E807000000F3900FAEE8EBF948890424C3'.decode("hex"))
0: e8 07 00 00 00 call 0xc
5: f3 90 pause
7: 0f ae e8 lfence
a: eb f9 jmp 0x5
c: 48 dec eax
d: 89 04 24 mov DWORD PTR [esp],eax
10: c3 ret
>>> print disasm('E807000000F3900FAEE8EBF9488D642408C3'.decode("hex"))
0: e8 07 00 00 00 call 0xc
5: f3 90 pause
7: 0f ae e8 lfence
a: eb f9 jmp 0x5
c: 48 dec eax
d: 8d 64 24 08 lea esp,[esp+0x8]
11: c3 ret
>>>
"""
# 初始化一些分析时候需要用到的变量
self._pending_jobs = PendingJobs(self.functions, self._deregister_analysis_job)
self._traced_addresses = set()
self._function_returns = defaultdict(set)
# 不是所有的函数调用都使用call指令,因此需要记录下每一个单一函数的退出点,
# 在需要函数调用的时候,在函数调用图上添加对应的边
self._function_exits = defaultdict(set)
# 创建一个初始化状态
self._initial_state = self.project.factory.blank_state(mode="fastpath")
initial_options = self._initial_state.options - {o.TRACK_CONSTRAINTS} - o.refs
initial_options |= {o.SUPER_FASTPATH, o.SYMBOL_FILL_UNCONSTRAINED_REGISTERS, o.SYMBOL_FILL_UNCONSTRAINED_MEMORY}
# initial_options.remove(o.COW_STATES)
self._initial_state.options = initial_options
starting_points = set()
# clear all existing functions
self.kb.functions.clear()
if self._use_symbols:
starting_points |= self._function_addresses_from_symbols
# 根据符号表获取函数的地址,这里有190个函数
"""
调试的时候不知道这里为什么有一个0
"""
if self._extra_function_starts:
starting_points |= set(self._extra_function_starts)
# 对函数入口点进行排序
starting_points = sorted(list(starting_points), reverse=True)
if self._start_at_entry and self.project.entry is not None and self._inside_regions(self.project.entry) and \
self.project.entry not in starting_points:
# make sure self.project.entry is inserted
starting_points += [ self.project.entry ]
# 对于每一个起始位置,创建一个CFGJOB进行分析
for sp in starting_points:
job = CFGJob(sp, sp, 'Ijk_Boring')
self._insert_job(job)
# register the job to function `sp`
self._register_analysis_job(sp, job)
self._updated_nonreturning_functions = set()
# 这里是使用函数序言进行函数查找,该例子找到了217个
if self._use_function_prologues and self.project.concrete_target is None:
self._function_prologue_addrs = sorted(self._func_addrs_from_prologues())
# make a copy of those prologue addresses, so that we can pop from the list
self._remaining_function_prologue_addrs = self._function_prologue_addrs[::]
# make function_prologue_addrs a set for faster lookups
self._function_prologue_addrs = set(self._function_prologue_addrs)
0x2 _analysis_core_baremetal
这个函数的功能就是从刚才的队列中取出来添加的 job 并处理。主要就是以下三步:
_pre_job_handling
_process_job_and_get_successors
-
_intra_analysis
该函数的源码如下:
def _analysis_core_baremetal(self):
if not self._job_info_queue:
self._job_queue_empty()
while not self.should_abort:
if self._status_callback is not None:
self._status_callback(self)
# should_abort might be changed by the status callback function
if self.should_abort:
return
if not self._job_info_queue:
self._job_queue_empty() # 时间消耗 1/3
if not self._job_info_queue:
# still no job available
break
job_info = self._job_info_queue[0]
try:
self._pre_job_handling(job_info.job)
except AngrDelayJobNotice:
# delay the handling of this job
continue
except AngrSkipJobNotice:
# consume and skip this job
self._job_info_queue = self._job_info_queue[1:]
self._job_map.pop(self._job_key(job_info.job), None)
continue
# remove the job info from the map
self._job_map.pop(self._job_key(job_info.job), None)
self._job_info_queue = self._job_info_queue[1:]
self._process_job_and_get_successors(job_info)
# Short-cut for aborting the analysis
if self.should_abort:
break
self._intra_analysis()
- _job_queue_empty 占了构建cfg的总时间的1/3。其执行流程如下
- 首先查找来自必须要返回的函数的job,如果有,添加到队列中
- 然后将已经完成分析的函数添加到_completed_functions中
- 迭代地分析所有更改的函数,更新它们的返回属性,直到达到一个定点(即没有找到新的返回/不返回函数)。
- 尝试分析剩下的间接调用位置
- 如果选择使用函数序言分析的话,就将之前使用函数序言找到的函数添加到job队列中
- 如果选择使用完整分析的话,会获取下条指令封装为job添加到队列中
def _job_queue_empty(self):
if self._pending_jobs:
# fastpath
# look for a job that comes from a function that must return
# if we can find one, just use it
job = self._pop_pending_job(returning=True)
if job is not None:
self._insert_job(job)
return
self._clean_pending_exits()
# did we finish analyzing any function?
# fill in self._completed_functions
self._make_completed_functions()
# analyze function features, most importantly, whether each function returns or not
self._analyze_all_function_features()
# Clear _changed_functions set
self._updated_nonreturning_functions = set()
if self._pending_jobs:
self._clean_pending_exits()
job = self._pop_pending_job(returning=True)
if job is not None:
self._insert_job(job)
return
job = self._pop_pending_job(returning=False)
if job is not None:
self._insert_job(job)
return
# Try to see if there is any indirect jump left to be resolved
if self._resolve_indirect_jumps and self._indirect_jumps_to_resolve:
self._process_unresolved_indirect_jumps()
if self._job_info_queue:
return
if self._use_function_prologues and self._remaining_function_prologue_addrs:
while self._remaining_function_prologue_addrs:
prolog_addr = self._remaining_function_prologue_addrs[0]
self._remaining_function_prologue_addrs = self._remaining_function_prologue_addrs[1:]
if self._seg_list.is_occupied(prolog_addr):
continue
job = CFGJob(prolog_addr, prolog_addr, 'Ijk_Boring')
self._insert_job(job)
self._register_analysis_job(prolog_addr, job)
return
if self._force_complete_scan:
addr = self._next_code_addr()
if addr is None:
l.debug("Force-scan jumping failed")
else:
l.debug("Force-scanning to %#x", addr)
if addr is not None:
job = CFGJob(addr, addr, "Ijk_Boring", last_addr=None, job_type=CFGJob.JOB_TYPE_COMPLETE_SCANNING)
self._insert_job(job)
self._register_analysis_job(addr, job)
-
_pre_job_handling
一些简单的对 job 的预处理,比如对进度条的计算
-
_process_job_and_get_successors
这个函数的主要目的是对 job 进行处理,并获取当前job的后继,把当前job的后继节点包装为 job,添加入队列。时间消耗约为2/3
def _process_job_and_get_successors(self, job_info):
"""
Process a job, get all successors of this job, and call _handle_successor() to handle each successor.
:param JobInfo job_info: The JobInfo instance
:return: None
"""
job = job_info.job
successors = self._get_successors(job)
all_new_jobs = [ ]
for successor in successors:
new_jobs = self._handle_successor(job, successor, successors)
# 在cfgfast中是直接把其所有的后继返回
if new_jobs:
all_new_jobs.extend(new_jobs)
for new_job in new_jobs:
self._insert_job(new_job)
self._post_job_handling(job, all_new_jobs, successors)
_get_successors 在子类 cfgFast 中实现,主要功能是从给定的地址在搜索一个基本块
0x3 _get_successors
调用_scan_block函数,并将其后继基本块包装为job对象,并添加到待分析的队列中:
def _get_successors(self, job): # pylint:disable=arguments-differ
# current_function_addr = job.func_addr
# addr = job.addr
# if current_function_addr != -1:
# l.debug("Tracing new exit %#x in function %#x", addr, current_function_addr)
# else:
# l.debug("Tracing new exit %#x", addr)
jobs = self._scan_block(job)
# l.debug("... got %d jobs: %s", len(jobs), jobs)
for job_ in jobs: # type: CFGJob
# register those jobs
self._register_analysis_job(job_.func_addr, job_)
return jobs
0x4 _scan_block
def _scan_block(self, cfg_job):
"""
Scan a basic block starting at a specific address
:param CFGJob cfg_job: The CFGJob instance.
:return: a list of successors
:rtype: list
"""
addr = cfg_job.addr
current_func_addr = cfg_job.func_addr
# Fix the function address
# This is for rare cases where we cannot successfully determine the end boundary of a previous function, and
# as a consequence, our analysis mistakenly thinks the previous function goes all the way across the boundary,
# resulting the missing of the second function in function manager.
if addr in self._function_addresses_from_symbols:
current_func_addr = addr
if self._addr_hooked_or_syscall(addr):
entries = self._scan_procedure(cfg_job, current_func_addr)
else:
entries = self._scan_irsb(cfg_job, current_func_addr)
return entries
0x5 _scan_irsb
def _scan_irsb(self, cfg_job, current_func_addr):
"""
Generate a list of successors (generating them each as entries) to IRSB.
Updates previous CFG nodes with edges.
:param CFGJob cfg_job: The CFGJob instance.
:param int current_func_addr: Address of the current function
:return: a list of successors
:rtype: list
"""
# 生成cfgnode
addr, function_addr, cfg_node, irsb = self._generate_cfgnode(cfg_job, current_func_addr)
# 添加函数内部指向该node的边
cfg_job.apply_function_edges(self, clear=True)
# function_addr and current_function_addr can be different. e.g. when tracing an optimized tail-call that jumps
# into another function that has been identified before.
if cfg_node is None:
# exceptions occurred, or we cannot get a CFGNode for other reasons
return [ ]
# 为cfg添加相应的边
self._graph_add_edge(cfg_node, cfg_job.src_node, cfg_job.jumpkind, cfg_job.src_ins_addr,
cfg_job.src_stmt_idx
)
# 将对应的cfg添加到对应函数
self._function_add_node(cfg_node, function_addr)
if self.functions.get_by_addr(function_addr).returning is not True:
self._updated_nonreturning_functions.add(function_addr)
# If we have traced it before, don't trace it anymore
real_addr = get_real_address_if_arm(self.project.arch, addr)
if real_addr in self._traced_addresses:
# the address has been traced before
return [ ]
else:
# Mark the address as traced
self._traced_addresses.add(real_addr)
# irsb cannot be None here
# assert irsb is not None
# IRSB在每个CFGNode中只使用一次,因此在这里必须释放掉以节省内存
cfg_node.irsb = None
# 1/10 _scan_irsb 的时间消耗
self._process_block_arch_specific(addr, irsb, function_addr)
# Scan the basic block to collect data references
if self._collect_data_ref:
self._collect_data_references(irsb, addr)
# 3/20 _scan_irsb 的时间消耗
# Get all possible successors
irsb_next, jumpkind = irsb.next, irsb.jumpkind
successors = [ ]
last_ins_addr = None
ins_addr = addr
if irsb.statements:
for i, stmt in enumerate(irsb.statements):
if isinstance(stmt, pyvex.IRStmt.Exit):
successors.append((i,
last_ins_addr if self.project.arch.branch_delay_slot else ins_addr,
stmt.dst,
stmt.jumpkind
)
)
elif isinstance(stmt, pyvex.IRStmt.IMark):
last_ins_addr = ins_addr
ins_addr = stmt.addr + stmt.delta
else:
for ins_addr, stmt_idx, exit_stmt in irsb.exit_statements:
successors.append((
stmt_idx,
last_ins_addr if self.project.arch.branch_delay_slot else ins_addr,
exit_stmt.dst,
exit_stmt.jumpkind
))
successors.append((DEFAULT_STATEMENT,
last_ins_addr if self.project.arch.branch_delay_slot else ins_addr, irsb_next, jumpkind)
)
entries = [ ]
# 如果是arm架构的话,就会做一些处理,然后再返回
successors = self._post_process_successors(addr, irsb.size, successors)
# Process each successor 这一部分时间消耗占用了 15/20
for suc in successors:
stmt_idx, ins_addr, target, jumpkind = suc
entries += self._create_jobs(target, jumpkind, function_addr, irsb, addr, cfg_node, ins_addr,
stmt_idx
)
return entries
0x6 _create_jobs
给定一个node和其后继节点的一些信息,返回CFGJobs的list。这个函数的主要执行流程如下:
根据当前cfg_node获取target_address
-
如果target_address是None,说明跳转地址不是一个确定的值,此时根据jumpkind的类型进行判断:
如果是ret类型,说明是函数结尾,否则说明是一个间接跳转,调用_indirect_jump_encountered尝试去解析这个间接跳转
如果成功解析,则直接把其所有可能的地址包装为cfg_job并返回
如果jumpkind 属于 ("Ijk_Boring", 'Ijk_InvalICache'),说明跳转的地址是一个plt表的地址,直接添加到对应plt函数的边,并创建一个CFGJob添加到jobs中
否则就把这个间接跳转加入待解析的间接跳转工作队列中。
def _create_jobs(self, target, jumpkind, current_function_addr, irsb, addr, cfg_node, ins_addr, stmt_idx):
"""
Given a node and details of a successor, makes a list of CFGJobs
and if it is a call or exit marks it appropriately so in the CFG
:param int target: Destination of the resultant job
:param str jumpkind: The jumpkind of the edge going to this node
:param int current_function_addr: Address of the current function
:param pyvex.IRSB irsb: IRSB of the predecessor node
:param int addr: The predecessor address
:param CFGNode cfg_node: The CFGNode of the predecessor node
:param int ins_addr: Address of the source instruction.
:param int stmt_idx: ID of the source statement.
:return: a list of CFGJobs
:rtype: list
"""
if type(target) is pyvex.IRExpr.Const: # pylint: disable=unidiomatic-typecheck
target_addr = target.con.value
elif type(target) in (pyvex.IRConst.U8, pyvex.IRConst.U16, pyvex.IRConst.U32, pyvex.IRConst.U64): # pylint: disable=unidiomatic-typecheck
target_addr = target.value
elif type(target) is int: # pylint: disable=unidiomatic-typecheck
target_addr = target
else:
target_addr = None
if target_addr in self._known_thunks and jumpkind == 'Ijk_Boring':
thunk_kind = self._known_thunks[target_addr][0]
if thunk_kind == 'ret':
jumpkind = 'Ijk_Ret'
target_addr = None
elif thunk_kind == 'jmp':
pass # ummmmmm not sure about this one
else:
raise AngrCFGError("This shouldn't be possible")
jobs = [ ]
is_syscall = jumpkind.startswith("Ijk_Sys")
# Special handling:
# If a call instruction has a target that points to the immediate next instruction, we treat it as a boring jump
if jumpkind == "Ijk_Call" and \
not self.project.arch.call_pushes_ret and \
cfg_node.instruction_addrs and \
ins_addr == cfg_node.instruction_addrs[-1] and \
target_addr == irsb.addr + irsb.size:
jumpkind = "Ijk_Boring"
if target_addr is None:
# The target address is not a concrete value
if jumpkind == "Ijk_Ret":
# This block ends with a return instruction.
if current_function_addr != -1:
self._function_exits[current_function_addr].add(addr)
self._function_add_return_site(addr, current_function_addr)
self.functions[current_function_addr].returning = True
self._pending_jobs.add_returning_function(current_function_addr)
cfg_node.has_return = True
elif self._resolve_indirect_jumps and \
(jumpkind in ('Ijk_Boring', 'Ijk_Call', 'Ijk_InvalICache') or jumpkind.startswith('Ijk_Sys')):
# This is an indirect jump. Try to resolve it.
# FIXME: in some cases, a statementless irsb will be missing its instr addresses
# and this next part will fail. Use the real IRSB instead
irsb = cfg_node.block.vex
cfg_node.instruction_addrs = irsb.instruction_addresses
resolved, resolved_targets, ij = self._indirect_jump_encountered(addr, cfg_node, irsb,
current_function_addr, stmt_idx)
if resolved:
for resolved_target in resolved_targets:
if jumpkind == 'Ijk_Call':
jobs += self._create_job_call(cfg_node.addr, irsb, cfg_node, stmt_idx, ins_addr,
current_function_addr, resolved_target, jumpkind)
else:
edge = FunctionTransitionEdge(cfg_node, resolved_target, current_function_addr,
to_outside=False, stmt_idx=stmt_idx, ins_addr=ins_addr,
)
ce = CFGJob(resolved_target, current_function_addr, jumpkind,
last_addr=resolved_target, src_node=cfg_node, src_stmt_idx=stmt_idx,
src_ins_addr=ins_addr, func_edges=[ edge ],
)
jobs.append(ce)
return jobs
if jumpkind in ("Ijk_Boring", 'Ijk_InvalICache'):
resolved_as_plt = False
if irsb and self._heuristic_plt_resolving:
# Test it on the initial state. Does it jump to a valid location?
# It will be resolved only if this is a .plt entry
resolved_as_plt = self._resolve_plt(addr, irsb, ij)
if resolved_as_plt:
jump_target = next(iter(ij.resolved_targets))
target_func_addr = jump_target # TODO: FIX THIS
edge = FunctionTransitionEdge(cfg_node, jump_target, current_function_addr,
to_outside=True, dst_func_addr=jump_target,
stmt_idx=stmt_idx, ins_addr=ins_addr,
)
ce = CFGJob(jump_target, target_func_addr, jumpkind, last_addr=jump_target,
src_node=cfg_node, src_stmt_idx=stmt_idx, src_ins_addr=ins_addr,
func_edges=[edge],
)
jobs.append(ce)
if resolved_as_plt:
# has been resolved as a PLT entry. Remove it from indirect_jumps_to_resolve
if ij.addr in self._indirect_jumps_to_resolve:
self._indirect_jumps_to_resolve.remove(ij.addr)
self._deregister_analysis_job(current_function_addr, ij)
else:
# add it to indirect_jumps_to_resolve
self._indirect_jumps_to_resolve.add(ij)
# register it as a job for the current function
self._register_analysis_job(current_function_addr, ij)
else: # jumpkind == "Ijk_Call" or jumpkind.startswith('Ijk_Sys')
self._indirect_jumps_to_resolve.add(ij)
self._register_analysis_job(current_function_addr, ij)
jobs += self._create_job_call(addr, irsb, cfg_node, stmt_idx, ins_addr, current_function_addr, None,
jumpkind, is_syscall=is_syscall
)
elif target_addr is not None:
# This is a direct jump with a concrete target.
# pylint: disable=too-many-nested-blocks
if jumpkind in ('Ijk_Boring', 'Ijk_InvalICache'):
# if the target address is at another section, it has to be jumping to a new function
if not self._addrs_belong_to_same_section(addr, target_addr):
target_func_addr = target_addr
to_outside = True
else:
# it might be a jumpout
target_func_addr = None
real_target_addr = get_real_address_if_arm(self.project.arch, target_addr)
if real_target_addr in self._traced_addresses:
node = self.model.get_any_node(target_addr)
if node is not None:
target_func_addr = node.function_address
if target_func_addr is None:
target_func_addr = current_function_addr
to_outside = not target_func_addr == current_function_addr
edge = FunctionTransitionEdge(cfg_node, target_addr, current_function_addr,
to_outside=to_outside,
dst_func_addr=target_func_addr,
ins_addr=ins_addr,
stmt_idx=stmt_idx,
)
ce = CFGJob(target_addr, target_func_addr, jumpkind, last_addr=addr, src_node=cfg_node,
src_ins_addr=ins_addr, src_stmt_idx=stmt_idx, func_edges=[ edge ])
jobs.append(ce)
elif jumpkind == 'Ijk_Call' or jumpkind.startswith("Ijk_Sys"):
jobs += self._create_job_call(addr, irsb, cfg_node, stmt_idx, ins_addr, current_function_addr,
target_addr, jumpkind, is_syscall=is_syscall
)
else:
# TODO: Support more jumpkinds
l.debug("Unsupported jumpkind %s", jumpkind)
l.debug("Instruction address: %#x", ins_addr)
return jobs
0x7 时间消耗分析
- 例子: cgibin (D-link DIR 815路由器固件)
- 总时间消耗: 32s (基本上30s全是构建cfg的时间)
- _job_queue_empty 大约占了总时间1/3的时间消耗(10s)
- scan_block 大约占了总时间2/3的时间消耗(20s)
- 其中最耗时的函数是_create_jobs函数,大约占用了16s
- _create_jobs函数主要是处理直接调用和间接调用,间接调用几乎占了_create_jobs函数100%的时间消耗
3. 解析间接调用
0x1 process_unresolved_indirect_jumps
- Ijk_Call 类型表示地址是由前一个基本块传递过来
- Ijk_Boring类型表示是一个jump table类型的
def _process_unresolved_indirect_jumps(self):
"""
Resolve all unresolved indirect jumps found in previous scanning.
Currently we support resolving the following types of indirect jumps:
- Ijk_Call: indirect calls where the function address is passed in from a proceeding basic block
- Ijk_Boring: jump tables
- For an up-to-date list, see analyses/cfg/indirect_jump_resolvers
:return: A set of concrete indirect jump targets (ints).
:rtype: set
"""
l.info("%d indirect jumps to resolve.", len(self._indirect_jumps_to_resolve))
all_targets = set()
for idx, jump in enumerate(self._indirect_jumps_to_resolve): # type:int,IndirectJump
if self._low_priority:
self._release_gil(idx, 20, 0.0001)
all_targets |= self._process_one_indirect_jump(jump)
self._indirect_jumps_to_resolve.clear()
return all_targets
0x2 process_one_indirect_jump
使用angr.analyses.cfg.indirect_jump_resolvers.jumptable.JumpTableResolver
求解器来求解间接调用。
def _process_one_indirect_jump(self, jump):
"""
Resolve a given indirect jump.
:param IndirectJump jump: The IndirectJump instance.
:return: A set of resolved indirect jump targets (ints).
"""
resolved = False
resolved_by = None
targets = None
block = self._lift(jump.addr, opt_level=1)
for resolver in self.indirect_jump_resolvers:
resolver.base_state = self._base_state
if not resolver.filter(self, jump.addr, jump.func_addr, block, jump.jumpkind):
continue
resolved, targets = resolver.resolve(self, jump.addr, jump.func_addr, block, jump.jumpkind)
if resolved:
resolved_by = resolver
break
if resolved:
self._indirect_jump_resolved(jump, jump.addr, resolved_by, targets)
else:
self._indirect_jump_unresolved(jump)
return set() if targets is None else set(targets)
0x3 indirect_jump_encountered
当遇到间接跳转时调用。将尝试使用不受时间限制(快速)的间接跳转解析器来解决这个间接跳转。如果无法解决,将查看以前是否已经解决了这个间接跳转。
def _indirect_jump_encountered(self, addr, cfg_node, irsb, func_addr, stmt_idx=DEFAULT_STATEMENT):
"""
Called when we encounter an indirect jump. We will try to resolve this indirect jump using timeless (fast)
indirect jump resolvers. If it cannot be resolved, we will see if this indirect jump has been resolved before.
:param int addr: Address of the block containing the indirect jump.
:param cfg_node: The CFGNode instance of the block that contains the indirect jump.
:param pyvex.IRSB irsb: The IRSB instance of the block that contains the indirect jump.
:param int func_addr: Address of the current function.
:param int or str stmt_idx: ID of the source statement.
:return: A 3-tuple of (whether it is resolved or not, all resolved targets, an IndirectJump object
if there is one or None otherwise)
:rtype: tuple
"""
jumpkind = irsb.jumpkind
l.debug('IRSB %#x has an indirect jump (%s) as its default exit.', addr, jumpkind)
# try resolving it fast
resolved, resolved_targets = self._resolve_indirect_jump_timelessly(addr, irsb, func_addr, jumpkind)
if resolved:
l.debug("Indirect jump at block %#x is resolved by a timeless indirect jump resolver. "
"%d targets found.", addr, len(resolved_targets))
return True, resolved_targets, None
l.debug("Indirect jump at block %#x cannot be resolved by a timeless indirect jump resolver.", addr)
# Add it to our set. Will process it later if user allows.
# Create an IndirectJump instance
if addr not in self.indirect_jumps:
if self.project.arch.branch_delay_slot:
ins_addr = cfg_node.instruction_addrs[-2]
else:
ins_addr = cfg_node.instruction_addrs[-1]
ij = IndirectJump(addr, ins_addr, func_addr, jumpkind, stmt_idx, resolved_targets=[])
self.indirect_jumps[addr] = ij
resolved = False
else:
ij = self.indirect_jumps[addr] # type: IndirectJump
resolved = len(ij.resolved_targets) > 0
return resolved, ij.resolved_targets, ij
0x4 resolve_indirect_jump_timelessly
会调用angr.analyses.cfg.indirect_jump_resolvers.mips_elf_fast.MipsElfFastResolver
来求解间接调用。
def _resolve_indirect_jump_timelessly(self, addr, block, func_addr, jumpkind):
"""
Checks if MIPS32 and calls MIPS32 check, otherwise false
:param int addr: irsb address
:param pyvex.IRSB block: irsb
:param int func_addr: Function address
:return: If it was resolved and targets alongside it
:rtype: tuple
"""
if block.statements is None:
block = self.project.factory.block(block.addr, size=block.size).vex
for res in self.timeless_indirect_jump_resolvers:
if res.filter(self, addr, func_addr, block, jumpkind):
r, resolved_targets = res.resolve(self, addr, func_addr, block, jumpkind)
if r:
return True, resolved_targets
return False, [ ]