Angr 中的函数识别

Angr 中的函数识别

函数识别概述

​ 二进制文件通过IDA或者radare2这样的反汇编工具,能够识别二进制文件中的函数边界信息,并根据其调用关系生成整个程序的函数调用图。因此准确识别二进制文件中的函数边界对于进一步的分析二进制文件非常重要。二进制文件通常组织为 数据、代码、元数据的形式,在没有strip掉符号表的二进制文件中,关于函数的起始偏移、大小通常在元数据中可以直接找到,但是对于去除符号表的二进制文件,由于其符号表的缺失,通常需要采用别的以下方法进行函数识别:

  • 函数序言和尾声

  • 通过对call、jmp等指令所跳转的地址进行分析

  • 分析不可达代码位置

​ 但是采用以上方法同样会存在一些不准确的问题,比如依赖函数序言和尾声的检查,在使用不同编译器或者优化选项的情况下,采用这种硬编码的方式可能就不再适用;对于call或者jmp指令的分析,可能很多地址只有在运行时才能确定等等。

Angr中的函数识别

1. 初始化阶段

​ Angr中函数识别的过程是在CFGFast中构建CFG的时候进行的,在初始化阶段主要步骤如下:

  • 首先调用self._executable_memory_regions获取二进制文件有执行权限的region,将这些区间作为候选分析位置

  • 然后对得到的regions进行一些筛选和处理,同时对分析进行一些设置,比如设置是否有符号表、是否收集数据引用信息等。这里可以使用objdump命令查看文件的符号表信息:

    • 查看动态符号表信息
    nevv@ubuntu:~/angr/others$ objdump -T cgibin | grep "main"
    004023e0 g    DF .text    00000304  Base        main
    00409480 g    DF .text    000005ec  Base        hedwigcgi_main
    00408a78 g    DF .text    00000198  Base        captchacgi_main
    0040b930 g    DF .text    00000b28  Base        conntrackcgi_main
    00406b40 g    DF .text    00000008  Base        dlcfg_main
    0040a268 g    DF .text    000003d4  Base        servicecgi_main
    0040cb5c g    DF .text    00000604  Base        hnap_main
    0040a6f0 g    DF .text    00000218  Base        ssdpcgi_main
    0040b818 g    DF .text    00000110  Base        genacgi_main
    00407164 g    DF .text    000000e4  Base        fwupdater_main
    00409ae0 g    DF .text    00000620  Base        pigwidgeoncgi_main
    00406b48 g    DF .text    00000008  Base        fwup_main
    004195a0      DF *UND*    00000000              __uClibc_main
    00405284 g    DF .text    000002c8  Base        phpcgi_main
    0040ae50 g    DF .text    00000484  Base        soapcgi_main
    00408e10 g    DF .text    0000066c  Base        sessioncgi_main
    00406b50 g    DF .text    00000008  Base        seamacgi_main
    
    • 查看静态符号表信息:
    nevv@ubuntu:~/angr/others$ objdump -t cgibin
    
    cgibin:     file format elf32-tradlittlemips
    
    SYMBOL TABLE:
    no symbols
    

    两者的区别如下:

    • 静态链接中有一个专门的段叫符号表 -- “.symtab”(Symbol Table), 里面保存了所有关于该目标文件的符号的定义和引用。

    • 动态链接中同样有一个段叫 动态符号表 -- “.dynsym”(Dynamic Symbol) , 但.dynsym 相对于 .symtab 只保存了与动态链接相关的导入导出符号。so中同样有.symtab,其中保存着所有的符号。

  • 最后调用 self._analyze 进行前向分析

2. 分析阶段

_analyze方法很简单,主要就是self._pre_analysis()和根据self._graph_visitor 是否存在来调用对应的分析方法,对于还没有建立一个图结构的分析来说(就比如cfg恢复分析),第一次总是会调用self._analysis_core_baremetal() 方法。

    def _analyze(self):
        """
        The main analysis routine.

        :return: None
        """

        self._pre_analysis()

        if self._graph_visitor is None:
            # There is no base graph that we can rely on. The analysis itself should generate successors for the
            # current job.
            # An example is the CFG recovery.

            self._analysis_core_baremetal()

        else:
            # We have a base graph to follow. Just handle the current job.

            self._analysis_core_graph()

        self._post_analysis()
0x1 _pre_analysis

​ 这个函数的主要功能就是,初始化分析过程中需要用到的变量,使用符号表,将符号表中的函数起始位置作为分析的起始位置,使用函数序言搜索函数,并将搜索结果保存在 _function_prologue_addrs 中。

    def _pre_analysis(self):
        import pdb
        pdb.set_trace()
        # 初始化一些cfg相关的变量
        self._initialize_cfg()

        # Scan for __x86_return_thunk and friends
        self._known_thunks = self._find_thunks()
        """
        这里应该是会寻找一些特殊的字节序列
>>> print disasm('E807000000F3900FAEE8EBF948890424C3'.decode("hex"))
   0:   e8 07 00 00 00          call   0xc
   5:   f3 90                   pause  
   7:   0f ae e8                lfence 
   a:   eb f9                   jmp    0x5
   c:   48                      dec    eax
   d:   89 04 24                mov    DWORD PTR [esp],eax
  10:   c3                      ret
>>> print disasm('E807000000F3900FAEE8EBF9488D642408C3'.decode("hex"))
   0:   e8 07 00 00 00          call   0xc
   5:   f3 90                   pause  
   7:   0f ae e8                lfence 
   a:   eb f9                   jmp    0x5
   c:   48                      dec    eax
   d:   8d 64 24 08             lea    esp,[esp+0x8]
  11:   c3                      ret
>>>  
        """

        # 初始化一些分析时候需要用到的变量
        self._pending_jobs = PendingJobs(self.functions, self._deregister_analysis_job)
        self._traced_addresses = set()
        self._function_returns = defaultdict(set)

        # 不是所有的函数调用都使用call指令,因此需要记录下每一个单一函数的退出点,
        # 在需要函数调用的时候,在函数调用图上添加对应的边
        self._function_exits = defaultdict(set)

        # 创建一个初始化状态
        self._initial_state = self.project.factory.blank_state(mode="fastpath")
        initial_options = self._initial_state.options - {o.TRACK_CONSTRAINTS} - o.refs
        initial_options |= {o.SUPER_FASTPATH, o.SYMBOL_FILL_UNCONSTRAINED_REGISTERS, o.SYMBOL_FILL_UNCONSTRAINED_MEMORY}
        # initial_options.remove(o.COW_STATES)
        self._initial_state.options = initial_options

        starting_points = set()

        # clear all existing functions
        self.kb.functions.clear()

        if self._use_symbols:
            starting_points |= self._function_addresses_from_symbols
        # 根据符号表获取函数的地址,这里有190个函数
        """
        调试的时候不知道这里为什么有一个0
        """

        if self._extra_function_starts:
            starting_points |= set(self._extra_function_starts)

        # 对函数入口点进行排序
        starting_points = sorted(list(starting_points), reverse=True)

        if self._start_at_entry and self.project.entry is not None and self._inside_regions(self.project.entry) and \
                self.project.entry not in starting_points:
            # make sure self.project.entry is inserted
            starting_points += [ self.project.entry ]

        # 对于每一个起始位置,创建一个CFGJOB进行分析
        for sp in starting_points:
            job = CFGJob(sp, sp, 'Ijk_Boring')
            self._insert_job(job)
            # register the job to function `sp`
            self._register_analysis_job(sp, job)

        self._updated_nonreturning_functions = set()

        # 这里是使用函数序言进行函数查找,该例子找到了217个
        if self._use_function_prologues and self.project.concrete_target is None:
            self._function_prologue_addrs = sorted(self._func_addrs_from_prologues())
            # make a copy of those prologue addresses, so that we can pop from the list
            self._remaining_function_prologue_addrs = self._function_prologue_addrs[::]

            # make function_prologue_addrs a set for faster lookups
            self._function_prologue_addrs = set(self._function_prologue_addrs)

0x2 _analysis_core_baremetal

​ 这个函数的功能就是从刚才的队列中取出来添加的 job 并处理。主要就是以下三步:

  • _pre_job_handling

  • _process_job_and_get_successors

  • _intra_analysis

    该函数的源码如下:

def _analysis_core_baremetal(self):

    if not self._job_info_queue:
        self._job_queue_empty()

    while not self.should_abort:

        if self._status_callback is not None:
            self._status_callback(self)

        # should_abort might be changed by the status callback function
        if self.should_abort:
            return

        if not self._job_info_queue:
            self._job_queue_empty()   # 时间消耗 1/3

        if not self._job_info_queue:
            # still no job available
            break

        job_info = self._job_info_queue[0]

        try:
            self._pre_job_handling(job_info.job)
        except AngrDelayJobNotice:
            # delay the handling of this job
            continue
        except AngrSkipJobNotice:
            # consume and skip this job
            self._job_info_queue = self._job_info_queue[1:]
            self._job_map.pop(self._job_key(job_info.job), None)
            continue

        # remove the job info from the map
        self._job_map.pop(self._job_key(job_info.job), None)

        self._job_info_queue = self._job_info_queue[1:]

        self._process_job_and_get_successors(job_info)

        # Short-cut for aborting the analysis
        if self.should_abort:
            break

        self._intra_analysis()  
  • _job_queue_empty 占了构建cfg的总时间的1/3。其执行流程如下
    • 首先查找来自必须要返回的函数的job,如果有,添加到队列中
    • 然后将已经完成分析的函数添加到_completed_functions中
    • 迭代地分析所有更改的函数,更新它们的返回属性,直到达到一个定点(即没有找到新的返回/不返回函数)。
    • 尝试分析剩下的间接调用位置
    • 如果选择使用函数序言分析的话,就将之前使用函数序言找到的函数添加到job队列中
    • 如果选择使用完整分析的话,会获取下条指令封装为job添加到队列中
    def _job_queue_empty(self):

        if self._pending_jobs:
            # fastpath
            # look for a job that comes from a function that must return
            # if we can find one, just use it
            job = self._pop_pending_job(returning=True)
            if job is not None:
                self._insert_job(job)
                return

            self._clean_pending_exits()

        # did we finish analyzing any function?
        # fill in self._completed_functions
        self._make_completed_functions()

        # analyze function features, most importantly, whether each function returns or not
        self._analyze_all_function_features()

        # Clear _changed_functions set
        self._updated_nonreturning_functions = set()

        if self._pending_jobs:
            self._clean_pending_exits()

            job = self._pop_pending_job(returning=True)
            if job is not None:
                self._insert_job(job)
                return

            job = self._pop_pending_job(returning=False)
            if job is not None:
                self._insert_job(job)
                return

        # Try to see if there is any indirect jump left to be resolved
        if self._resolve_indirect_jumps and self._indirect_jumps_to_resolve:
            self._process_unresolved_indirect_jumps()

            if self._job_info_queue:
                return

        if self._use_function_prologues and self._remaining_function_prologue_addrs:
            while self._remaining_function_prologue_addrs:
                prolog_addr = self._remaining_function_prologue_addrs[0]
                self._remaining_function_prologue_addrs = self._remaining_function_prologue_addrs[1:]
                if self._seg_list.is_occupied(prolog_addr):
                    continue

                job = CFGJob(prolog_addr, prolog_addr, 'Ijk_Boring')
                self._insert_job(job)
                self._register_analysis_job(prolog_addr, job)
                return

        if self._force_complete_scan:
            addr = self._next_code_addr()
            if addr is None:
                l.debug("Force-scan jumping failed")
            else:
                l.debug("Force-scanning to %#x", addr)

            if addr is not None:
                job = CFGJob(addr, addr, "Ijk_Boring", last_addr=None, job_type=CFGJob.JOB_TYPE_COMPLETE_SCANNING)
                self._insert_job(job)
                self._register_analysis_job(addr, job)

  • _pre_job_handling

    一些简单的对 job 的预处理,比如对进度条的计算

  • _process_job_and_get_successors

    这个函数的主要目的是对 job 进行处理,并获取当前job的后继,把当前job的后继节点包装为 job,添加入队列。时间消耗约为2/3

    def _process_job_and_get_successors(self, job_info):
        """
        Process a job, get all successors of this job, and call _handle_successor() to handle each successor.

        :param JobInfo job_info: The JobInfo instance
        :return: None
        """

        job = job_info.job

        successors = self._get_successors(job)

        all_new_jobs = [ ]

        for successor in successors:
            new_jobs = self._handle_successor(job, successor, successors)
            # 在cfgfast中是直接把其所有的后继返回

            if new_jobs:
                all_new_jobs.extend(new_jobs)

                for new_job in new_jobs:
                    self._insert_job(new_job)

        self._post_job_handling(job, all_new_jobs, successors)

​ _get_successors 在子类 cfgFast 中实现,主要功能是从给定的地址在搜索一个基本块

0x3 _get_successors

​ 调用_scan_block函数,并将其后继基本块包装为job对象,并添加到待分析的队列中:

    def _get_successors(self, job):  # pylint:disable=arguments-differ

        # current_function_addr = job.func_addr
        # addr = job.addr

        # if current_function_addr != -1:
        #    l.debug("Tracing new exit %#x in function %#x", addr, current_function_addr)
        # else:
        #    l.debug("Tracing new exit %#x", addr)

        jobs = self._scan_block(job)

        # l.debug("... got %d jobs: %s", len(jobs), jobs)

        for job_ in jobs:  # type: CFGJob
            # register those jobs
            self._register_analysis_job(job_.func_addr, job_)

        return jobs
0x4 _scan_block

    def _scan_block(self, cfg_job):
        """
        Scan a basic block starting at a specific address

        :param CFGJob cfg_job: The CFGJob instance.
        :return: a list of successors
        :rtype: list
        """

        addr = cfg_job.addr
        current_func_addr = cfg_job.func_addr

        # Fix the function address
        # This is for rare cases where we cannot successfully determine the end boundary of a previous function, and
        # as a consequence, our analysis mistakenly thinks the previous function goes all the way across the boundary,
        # resulting the missing of the second function in function manager.
        if addr in self._function_addresses_from_symbols:
            current_func_addr = addr

        if self._addr_hooked_or_syscall(addr):
            entries = self._scan_procedure(cfg_job, current_func_addr)

        else:
            entries = self._scan_irsb(cfg_job, current_func_addr)

        return entries
0x5 _scan_irsb
    def _scan_irsb(self, cfg_job, current_func_addr):
        """
        Generate a list of successors (generating them each as entries) to IRSB.
        Updates previous CFG nodes with edges.

        :param CFGJob cfg_job: The CFGJob instance.
        :param int current_func_addr: Address of the current function
        :return: a list of successors
        :rtype: list
        """
        # 生成cfgnode
        addr, function_addr, cfg_node, irsb = self._generate_cfgnode(cfg_job, current_func_addr)
    
        # 添加函数内部指向该node的边
        cfg_job.apply_function_edges(self, clear=True)

        # function_addr and current_function_addr can be different. e.g. when tracing an optimized tail-call that jumps
        # into another function that has been identified before.

        if cfg_node is None:
            # exceptions occurred, or we cannot get a CFGNode for other reasons
            return [ ]
        
        # 为cfg添加相应的边
        self._graph_add_edge(cfg_node, cfg_job.src_node, cfg_job.jumpkind, cfg_job.src_ins_addr,
                             cfg_job.src_stmt_idx
                             )
        # 将对应的cfg添加到对应函数
        self._function_add_node(cfg_node, function_addr)

        if self.functions.get_by_addr(function_addr).returning is not True:
            self._updated_nonreturning_functions.add(function_addr)

        # If we have traced it before, don't trace it anymore
        real_addr = get_real_address_if_arm(self.project.arch, addr)
        if real_addr in self._traced_addresses:
            # the address has been traced before
            return [ ]
        else:
            # Mark the address as traced
            self._traced_addresses.add(real_addr)

        # irsb cannot be None here
        # assert irsb is not None

        # IRSB在每个CFGNode中只使用一次,因此在这里必须释放掉以节省内存
        cfg_node.irsb = None
        # 1/10 _scan_irsb 的时间消耗
        self._process_block_arch_specific(addr, irsb, function_addr)

        # Scan the basic block to collect data references
        if self._collect_data_ref:
            self._collect_data_references(irsb, addr)
        # 3/20 _scan_irsb 的时间消耗
        # Get all possible successors
        irsb_next, jumpkind = irsb.next, irsb.jumpkind
        successors = [ ]

        last_ins_addr = None
        ins_addr = addr
        if irsb.statements:
            for i, stmt in enumerate(irsb.statements):
                if isinstance(stmt, pyvex.IRStmt.Exit):
                    successors.append((i,
                                       last_ins_addr if self.project.arch.branch_delay_slot else ins_addr,
                                       stmt.dst,
                                       stmt.jumpkind
                                       )
                                      )
                elif isinstance(stmt, pyvex.IRStmt.IMark):
                    last_ins_addr = ins_addr
                    ins_addr = stmt.addr + stmt.delta
        else:
            for ins_addr, stmt_idx, exit_stmt in irsb.exit_statements:
                successors.append((
                    stmt_idx,
                    last_ins_addr if self.project.arch.branch_delay_slot else ins_addr,
                    exit_stmt.dst,
                    exit_stmt.jumpkind
                ))

        successors.append((DEFAULT_STATEMENT,
                           last_ins_addr if self.project.arch.branch_delay_slot else ins_addr, irsb_next, jumpkind)
                          )

        entries = [ ]

        # 如果是arm架构的话,就会做一些处理,然后再返回
        successors = self._post_process_successors(addr, irsb.size, successors)

        # Process each successor 这一部分时间消耗占用了 15/20
        for suc in successors:
            stmt_idx, ins_addr, target, jumpkind = suc

            entries += self._create_jobs(target, jumpkind, function_addr, irsb, addr, cfg_node, ins_addr,
                                         stmt_idx
                                         )

        return entries
0x6 _create_jobs

​ 给定一个node和其后继节点的一些信息,返回CFGJobs的list。这个函数的主要执行流程如下:

  • 根据当前cfg_node获取target_address

  • 如果target_address是None,说明跳转地址不是一个确定的值,此时根据jumpkind的类型进行判断:

    • 如果是ret类型,说明是函数结尾,否则说明是一个间接跳转,调用_indirect_jump_encountered尝试去解析这个间接跳转

    • 如果成功解析,则直接把其所有可能的地址包装为cfg_job并返回

    • 如果jumpkind 属于 ("Ijk_Boring", 'Ijk_InvalICache'),说明跳转的地址是一个plt表的地址,直接添加到对应plt函数的边,并创建一个CFGJob添加到jobs中

    • 否则就把这个间接跳转加入待解析的间接跳转工作队列中。

    def _create_jobs(self, target, jumpkind, current_function_addr, irsb, addr, cfg_node, ins_addr, stmt_idx):
        """
        Given a node and details of a successor, makes a list of CFGJobs
        and if it is a call or exit marks it appropriately so in the CFG

        :param int target:          Destination of the resultant job
        :param str jumpkind:        The jumpkind of the edge going to this node
        :param int current_function_addr: Address of the current function
        :param pyvex.IRSB irsb:     IRSB of the predecessor node
        :param int addr:            The predecessor address
        :param CFGNode cfg_node:    The CFGNode of the predecessor node
        :param int ins_addr:        Address of the source instruction.
        :param int stmt_idx:        ID of the source statement.
        :return:                    a list of CFGJobs
        :rtype:                     list
        """

        if type(target) is pyvex.IRExpr.Const:  # pylint: disable=unidiomatic-typecheck
            target_addr = target.con.value
        elif type(target) in (pyvex.IRConst.U8, pyvex.IRConst.U16, pyvex.IRConst.U32, pyvex.IRConst.U64):  # pylint: disable=unidiomatic-typecheck
            target_addr = target.value
        elif type(target) is int:  # pylint: disable=unidiomatic-typecheck
            target_addr = target
        else:
            target_addr = None

        if target_addr in self._known_thunks and jumpkind == 'Ijk_Boring':
            thunk_kind = self._known_thunks[target_addr][0]
            if thunk_kind == 'ret':
                jumpkind = 'Ijk_Ret'
                target_addr = None
            elif thunk_kind == 'jmp':
                pass # ummmmmm not sure about this one
            else:
                raise AngrCFGError("This shouldn't be possible")

        jobs = [ ]
        is_syscall = jumpkind.startswith("Ijk_Sys")

        # Special handling:
        # If a call instruction has a target that points to the immediate next instruction, we treat it as a boring jump
        if jumpkind == "Ijk_Call" and \
                not self.project.arch.call_pushes_ret and \
                cfg_node.instruction_addrs and \
                ins_addr == cfg_node.instruction_addrs[-1] and \
                target_addr == irsb.addr + irsb.size:
            jumpkind = "Ijk_Boring"

        if target_addr is None:
            # The target address is not a concrete value

            if jumpkind == "Ijk_Ret":
                # This block ends with a return instruction.
                if current_function_addr != -1:
                    self._function_exits[current_function_addr].add(addr)
                    self._function_add_return_site(addr, current_function_addr)
                    self.functions[current_function_addr].returning = True
                    self._pending_jobs.add_returning_function(current_function_addr)

                cfg_node.has_return = True

            elif self._resolve_indirect_jumps and \
                    (jumpkind in ('Ijk_Boring', 'Ijk_Call', 'Ijk_InvalICache') or jumpkind.startswith('Ijk_Sys')):
                # This is an indirect jump. Try to resolve it.
                # FIXME: in some cases, a statementless irsb will be missing its instr addresses
                # and this next part will fail. Use the real IRSB instead
                irsb = cfg_node.block.vex
                cfg_node.instruction_addrs = irsb.instruction_addresses
                resolved, resolved_targets, ij = self._indirect_jump_encountered(addr, cfg_node, irsb,
                                                                                 current_function_addr, stmt_idx)
                if resolved:
                    for resolved_target in resolved_targets:
                        if jumpkind == 'Ijk_Call':
                            jobs += self._create_job_call(cfg_node.addr, irsb, cfg_node, stmt_idx, ins_addr,
                                                          current_function_addr, resolved_target, jumpkind)
                        else:
                            edge = FunctionTransitionEdge(cfg_node, resolved_target, current_function_addr,
                                                          to_outside=False, stmt_idx=stmt_idx, ins_addr=ins_addr,
                                                          )
                            ce = CFGJob(resolved_target, current_function_addr, jumpkind,
                                        last_addr=resolved_target, src_node=cfg_node, src_stmt_idx=stmt_idx,
                                        src_ins_addr=ins_addr, func_edges=[ edge ],
                                        )
                            jobs.append(ce)
                    return jobs

                if jumpkind in ("Ijk_Boring", 'Ijk_InvalICache'):
                    resolved_as_plt = False

                    if irsb and self._heuristic_plt_resolving:
                        # Test it on the initial state. Does it jump to a valid location?
                        # It will be resolved only if this is a .plt entry
                        resolved_as_plt = self._resolve_plt(addr, irsb, ij)

                        if resolved_as_plt:
                            jump_target = next(iter(ij.resolved_targets))
                            target_func_addr = jump_target  # TODO: FIX THIS

                            edge = FunctionTransitionEdge(cfg_node, jump_target, current_function_addr,
                                                          to_outside=True, dst_func_addr=jump_target,
                                                          stmt_idx=stmt_idx, ins_addr=ins_addr,
                                                          )
                            ce = CFGJob(jump_target, target_func_addr, jumpkind, last_addr=jump_target,
                                        src_node=cfg_node, src_stmt_idx=stmt_idx, src_ins_addr=ins_addr,
                                        func_edges=[edge],
                                        )
                            jobs.append(ce)

                    if resolved_as_plt:
                        # has been resolved as a PLT entry. Remove it from indirect_jumps_to_resolve
                        if ij.addr in self._indirect_jumps_to_resolve:
                            self._indirect_jumps_to_resolve.remove(ij.addr)
                            self._deregister_analysis_job(current_function_addr, ij)
                    else:
                        # add it to indirect_jumps_to_resolve
                        self._indirect_jumps_to_resolve.add(ij)

                        # register it as a job for the current function
                        self._register_analysis_job(current_function_addr, ij)

                else:  # jumpkind == "Ijk_Call" or jumpkind.startswith('Ijk_Sys')
                    self._indirect_jumps_to_resolve.add(ij)
                    self._register_analysis_job(current_function_addr, ij)

                    jobs += self._create_job_call(addr, irsb, cfg_node, stmt_idx, ins_addr, current_function_addr, None,
                                                  jumpkind, is_syscall=is_syscall
                                                  )

        elif target_addr is not None:
            # This is a direct jump with a concrete target.

            # pylint: disable=too-many-nested-blocks
            if jumpkind in ('Ijk_Boring', 'Ijk_InvalICache'):
                # if the target address is at another section, it has to be jumping to a new function
                if not self._addrs_belong_to_same_section(addr, target_addr):
                    target_func_addr = target_addr
                    to_outside = True
                else:
                    # it might be a jumpout
                    target_func_addr = None
                    real_target_addr = get_real_address_if_arm(self.project.arch, target_addr)
                    if real_target_addr in self._traced_addresses:
                        node = self.model.get_any_node(target_addr)
                        if node is not None:
                            target_func_addr = node.function_address
                    if target_func_addr is None:
                        target_func_addr = current_function_addr

                    to_outside = not target_func_addr == current_function_addr

                edge = FunctionTransitionEdge(cfg_node, target_addr, current_function_addr,
                                              to_outside=to_outside,
                                              dst_func_addr=target_func_addr,
                                              ins_addr=ins_addr,
                                              stmt_idx=stmt_idx,
                                              )

                ce = CFGJob(target_addr, target_func_addr, jumpkind, last_addr=addr, src_node=cfg_node,
                            src_ins_addr=ins_addr, src_stmt_idx=stmt_idx, func_edges=[ edge ])
                jobs.append(ce)

            elif jumpkind == 'Ijk_Call' or jumpkind.startswith("Ijk_Sys"):
                jobs += self._create_job_call(addr, irsb, cfg_node, stmt_idx, ins_addr, current_function_addr,
                                              target_addr, jumpkind, is_syscall=is_syscall
                                              )

            else:
                # TODO: Support more jumpkinds
                l.debug("Unsupported jumpkind %s", jumpkind)
                l.debug("Instruction address: %#x", ins_addr)

        return jobs
0x7 时间消耗分析
  • 例子: cgibin (D-link DIR 815路由器固件)
  • 总时间消耗: 32s (基本上30s全是构建cfg的时间)
    • _job_queue_empty 大约占了总时间1/3的时间消耗(10s)
    • scan_block 大约占了总时间2/3的时间消耗(20s)
      • 其中最耗时的函数是_create_jobs函数,大约占用了16s
      • _create_jobs函数主要是处理直接调用和间接调用,间接调用几乎占了_create_jobs函数100%的时间消耗
3. 解析间接调用
0x1 process_unresolved_indirect_jumps
  • Ijk_Call 类型表示地址是由前一个基本块传递过来
  • Ijk_Boring类型表示是一个jump table类型的
    def _process_unresolved_indirect_jumps(self):
        """
        Resolve all unresolved indirect jumps found in previous scanning.

        Currently we support resolving the following types of indirect jumps:
        - Ijk_Call: indirect calls where the function address is passed in from a proceeding basic block
        - Ijk_Boring: jump tables
        - For an up-to-date list, see analyses/cfg/indirect_jump_resolvers

        :return:    A set of concrete indirect jump targets (ints).
        :rtype:     set
        """

        l.info("%d indirect jumps to resolve.", len(self._indirect_jumps_to_resolve))

        all_targets = set()
        for idx, jump in enumerate(self._indirect_jumps_to_resolve):  # type:int,IndirectJump
            if self._low_priority:
                self._release_gil(idx, 20, 0.0001)
            all_targets |= self._process_one_indirect_jump(jump)

        self._indirect_jumps_to_resolve.clear()

        return all_targets
0x2 process_one_indirect_jump

​ 使用angr.analyses.cfg.indirect_jump_resolvers.jumptable.JumpTableResolver求解器来求解间接调用。

def _process_one_indirect_jump(self, jump):
    """
    Resolve a given indirect jump.

    :param IndirectJump jump:  The IndirectJump instance.
    :return:        A set of resolved indirect jump targets (ints).
    """

    resolved = False
    resolved_by = None
    targets = None

    block = self._lift(jump.addr, opt_level=1)

    for resolver in self.indirect_jump_resolvers:
        resolver.base_state = self._base_state

        if not resolver.filter(self, jump.addr, jump.func_addr, block, jump.jumpkind):
            continue

        resolved, targets = resolver.resolve(self, jump.addr, jump.func_addr, block, jump.jumpkind)
        if resolved:
            resolved_by = resolver
            break

    if resolved:
        self._indirect_jump_resolved(jump, jump.addr, resolved_by, targets)
    else:
        self._indirect_jump_unresolved(jump)

    return set() if targets is None else set(targets)
0x3 indirect_jump_encountered

​ 当遇到间接跳转时调用。将尝试使用不受时间限制(快速)的间接跳转解析器来解决这个间接跳转。如果无法解决,将查看以前是否已经解决了这个间接跳转。

    def _indirect_jump_encountered(self, addr, cfg_node, irsb, func_addr, stmt_idx=DEFAULT_STATEMENT):
        """
        Called when we encounter an indirect jump. We will try to resolve this indirect jump using timeless (fast)
        indirect jump resolvers. If it cannot be resolved, we will see if this indirect jump has been resolved before.

        :param int addr:                Address of the block containing the indirect jump.
        :param cfg_node:                The CFGNode instance of the block that contains the indirect jump.
        :param pyvex.IRSB irsb:         The IRSB instance of the block that contains the indirect jump.
        :param int func_addr:           Address of the current function.
        :param int or str stmt_idx:     ID of the source statement.

        :return:    A 3-tuple of (whether it is resolved or not, all resolved targets, an IndirectJump object
                    if there is one or None otherwise)
        :rtype:     tuple
        """

        jumpkind = irsb.jumpkind
        l.debug('IRSB %#x has an indirect jump (%s) as its default exit.', addr, jumpkind)

        # try resolving it fast
        resolved, resolved_targets = self._resolve_indirect_jump_timelessly(addr, irsb, func_addr, jumpkind)
        if resolved:
            l.debug("Indirect jump at block %#x is resolved by a timeless indirect jump resolver. "
                    "%d targets found.", addr, len(resolved_targets))
            return True, resolved_targets, None

        l.debug("Indirect jump at block %#x cannot be resolved by a timeless indirect jump resolver.", addr)

        # Add it to our set. Will process it later if user allows.
        # Create an IndirectJump instance
        if addr not in self.indirect_jumps:
            if self.project.arch.branch_delay_slot:
                ins_addr = cfg_node.instruction_addrs[-2]
            else:
                ins_addr = cfg_node.instruction_addrs[-1]
            ij = IndirectJump(addr, ins_addr, func_addr, jumpkind, stmt_idx, resolved_targets=[])
            self.indirect_jumps[addr] = ij
            resolved = False
        else:
            ij = self.indirect_jumps[addr]  # type: IndirectJump
            resolved = len(ij.resolved_targets) > 0

        return resolved, ij.resolved_targets, ij
0x4 resolve_indirect_jump_timelessly

​ 会调用angr.analyses.cfg.indirect_jump_resolvers.mips_elf_fast.MipsElfFastResolver来求解间接调用。

    def _resolve_indirect_jump_timelessly(self, addr, block, func_addr, jumpkind):
        """
        Checks if MIPS32 and calls MIPS32 check, otherwise false

        :param int addr: irsb address
        :param pyvex.IRSB block: irsb
        :param int func_addr: Function address
        :return: If it was resolved and targets alongside it
        :rtype: tuple
        """

        if block.statements is None:
            block = self.project.factory.block(block.addr, size=block.size).vex

        for res in self.timeless_indirect_jump_resolvers:
            if res.filter(self, addr, func_addr, block, jumpkind):
                r, resolved_targets = res.resolve(self, addr, func_addr, block, jumpkind)
                if r:
                    return True, resolved_targets
        return False, [ ]
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,233评论 6 495
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,357评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,831评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,313评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,417评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,470评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,482评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,265评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,708评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,997评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,176评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,503评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,150评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,391评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,034评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,063评论 2 352

推荐阅读更多精彩内容