最近在看 rex,没有官方文档,已有的教程也有点过时,基本只能通过看源码学习。
本篇教程当作是学习的记录,也希望能帮助对 rex 感兴趣的同学对 rex 的架构和使用方式有个整体的认识。
Rex 是 Shellphish 团队开发的自动生成 exploit 的引擎,是 Mechaphish 中的一个模块,最初用于 CGC 竞赛。
Rex 基于硬件模拟器 QEMU 和 angr ,通过混合执行(Concolic Execution)复现崩溃路径,根据寄存器及内存信息对漏洞类型/可利用性进行判定等,并尝试应用多种漏洞利用技术自动生成利用脚本。
本篇文章会介绍 rex 安装/顶层接口/内部实现/相关依赖等内容。
推荐直接使用 shellphish/mechaphish
docker 镜像,比较方便
docker pull shellphish/mechaphish;
docker run -it shellphish/mechaphish
rex 基于 angr,关于 angr 的使用方式,可以查看我的另一篇教程。
首先测试一下 rex 是否安装成功,简单测试代码如下:
tg = archr.targets.LocalTarget(<path_to_binary>, target_os='cgc') crash = rex.Crash(tg, <input>)
首先需要创建 target ,类型是 archr.targets.Target
接下来通过 rex.Crash 接口,传递创建的 target 和可以触发 crash 的输入,我们可以获得 Crash 对象,便可以对 Crash 对象进行一系列分析,下面会涉及对 Crash 对象的操作。
这里 path_to_binary
为二进制文件路径,target_os 指定系统,cgc 或者 linux, 这里我们可以使用 cgc 的文件进行测试(可以在binaries 中找到)
t = archr.targets.LocalTarget(["/home/angr-dev/binaries/tests/defcon24/legit_00003"], target_os='cgc') crash = rex.Crash(t, b"\x00\x0b1\xc1\x00\x0c\xeb\xe4\xf1\xf1\x14\r\rM\r\xf3\x1b\r\r\r~\x7f\x1b\xe3\x0c`_222\r\rM\r\xf3\x1b\r\x7f\x002\x7f~\x7f\xe2\xff\x7f\xff\xff\x8b\xc7\xc9\x83\x8b\x0c\xeb\x80\x002\xac\xe2\xff\xff\x00t\x8bt\x8bt_o_\x00t\x8b\xc7\xdd\x83\xc2t~n~~\xac\xe2\xff\xff_k_\x00t\x8b\xc7\xdd\x83\xc2t~n~~\xac\xe2\xff\xff\x00t\x8bt\x8b\xac\xf1\x83\xc2t~c\x00\x00\x00~~\x7f\xe2\xff\xff\x00t\x9e\xac\xe2\xf1\xf2@\x83\xc3t")
rex 也提供了多种测试样例, 可以在 tests 目录查看, 测试使用的文件可以在 binaries 仓库 中找到.
使用 rex 通常步骤:
此外也可以对 state 添加约束进行求解等,自行探索。
- crash_types 返回 crash 的漏洞类型
- explorable() Determine if the crash can be explored with the 'crash explorer'.
- exploitable() Determine if the crash is exploitable.
- exploit() 返回一个 ExploitFactory 实例,用于管理和构建 exp
- explore() explore a crash further to find new bugs
- memory_control() determine what symbolic memory we control which is at a constant address
- stack_control() determine what symbolic memory we control on the stack.
- copy() 拷贝 crash 对象
- checkpoint() Save intermediate results (traced states, etc.) to a file
- checkpoint_restore()
通过 crash 的 exploit 方法我们可以获得 ExploitFactory 实例,用于管理和构建 exploit。
ExploitFactory 有一个重要的属性 arsenal,是一个字典,用来存储对应 technique 的 exploit, 关于 rex 中实现的 technique 后面会涉及。
rex 定义了如下几种漏洞:
IP_OVERWRITE = "ip_overwrite" PARTIAL_IP_OVERWRITE = "partial_ip_overwrite" UNCONTROLLED_IP_OVERWRITE = "uncontrolled_ip_overwrite" BP_OVERWRITE = "bp_overwrite" PARTIAL_BP_OVERWRITE = "partial_bp_overwrite" WRITE_WHAT_WHERE = "write_what_where" WRITE_X_WHERE = "write_x_where" UNCONTROLLED_WRITE = "uncontrolled_write" # a write where the destination address is uncontrolled ARBITRARY_READ = "arbitrary_read" NULL_DEREFERENCE = "null_dereference" ARBITRARY_TRANSMIT = "arbitrary_transmit" # transmit where the buf argument is completely controlled ARBITRARY_RECEIVE = "arbitrary_receive" # receive where the buf argument is completel controlled
Rex 内部实现主要包含三个模块:
可以简单理解成 crash + technique = exploit ,下面我们来看具体内容
导入 crash 后,首先对 crash 进行 trace、筛选内存写操作和 判定漏洞类型。对应的函数分别为 _trace / _filter_memory_writes / _triage_crash
函数: _trace
使用给定的输入,通过符号执行,重现路径,如果没有 Crash 会抛出 NonCrashingInput 异常.
首先使用用户输入获得具体的 trace,
# collect a concrete trace save_core = True if isinstance(self.tracer_bow, archr.arsenal.RRTracerBow): save_core = False r = self.tracer_bow.fire(testcase=test_case, channel=channel,save_core=save_core)
再进行符号化 trace
self._t = r.tracer_technique(keep_predecessors=2, copy_states=False, mode=TracingMode.Strict) simgr.use_technique(self._t) simgr.use_technique(angr.exploration_techniques.Oppologist())
结束 trace, 检查是否有 crash
# tracing completed # if there was no crash we'll have to use the previous path's state if 'crashed' in simgr.stashes: # the state at crash time self.state = simgr.crashed[0] # a path leading up to the crashing basic block self.prev = self._t.predecessors[-1] else: self.state = simgr.traced[0] self.prev = self.state
获得所有的写内存操作,并将分成符号内存( symbolic memory bytes )和 flag 内存( flag memory bytes )。flag memory 针对的是 cgc 格式文件,其他情况下为空。
def _filter_memory_writes(self): memory_writes = sorted(self.state.memory.mem.get_symbolic_addrs()) if self.is_cgc: # remove all memory writes that directly end up in the CGC flag page (0x4347c000 - 0x4347d000) memory_writes = [m for m in memory_writes if m // 0x1000 != 0x4347c] user_writes = [m for m in memory_writes if any("aeg_stdin" in v for v in self.state.memory.load(m, 1).variables)] if self.is_cgc: flag_writes = [m for m in memory_writes if any(v.startswith("cgc-flag") for v in self.state.memory.load(m, 1).variables)] else: flag_writes = [] l.debug("Finished filtering memory writes.") self.symbolic_mem = self._segment(user_writes) self.flag_mem = self._segment(flag_writes)
rex 中 _triage_crash
函数用于判断 crash 对应的漏洞类型,漏洞类型之后的可利用性判定
# 判断 ip 是否可控,bp 类似 if self.state.solver.symbolic(ip): # how much control of ip do we have? if self._symbolic_control(ip) >= self.state.arch.bits: l.info("detected ip overwrite vulnerability") self.crash_types.append(Vulnerability.IP_OVERWRITE) else: l.info("detected partial ip overwrite vulnerability") self.crash_types.append(Vulnerability.PARTIAL_IP_OVERWRITE) return
# 筛选出目的地址可控的操作 # grab the all actions in the last basic block symbolic_actions = [ ] if self._t is not None and self._t.last_state is not None: recent_actions = reversed(self._t.last_state.history.recent_actions) state = self._t.last_state # TODO: this is a dead assignment! what was this supposed to be? else: recent_actions = reversed(self.state.history.actions) state = self.state for a in recent_actions: if a.type == 'mem': if self.state.solver.symbolic(a.addr.ast): symbolic_actions.append(a)
#判断是内存读还是内存写,并判断数据是否可控,由此确定漏洞类型 for sym_action in symbolic_actions: if sym_action.action == "write": if self.state.solver.symbolic(sym_action.data): l.info("detected write-what-where vulnerability") self.crash_types.append(Vulnerability.WRITE_WHAT_WHERE) else: l.info("detected write-x-where vulnerability") self.crash_types.append(Vulnerability.WRITE_X_WHERE) self.violating_action = sym_action break if sym_action.action == "read": # special vulnerability type, if this is detected we can explore the crash further l.info("detected arbitrary-read vulnerability") self.crash_types.append(Vulnerability.ARBITRARY_READ) self.violating_action = sym_action break
完成漏洞类型判定后,我们会对 crash 进行一些判断如 explorable/leakable
,如 explore 目的是寻找一个更有价值的 crash, 方便漏洞利用。
首先判断 crash 是否可 explore, 可以 explore 的漏洞类型是: ARBITRARY_READ/WRITE_WHAT_WHERE/WRITE_X_WHERE
def explorable(self): explorables = [Vulnerability.ARBITRARY_READ, Vulnerability.WRITE_WHAT_WHERE, Vulnerability.WRITE_X_WHERE] return self.one_of(explorables)
explore 主要针对任意内存读写漏洞,对应两种实现:_explore_arbitrary_read
和 _explore_arbitrary_write
if self.one_of([Vulnerability.ARBITRARY_READ]): self._explore_arbitrary_read(path_file) elif self.one_of([Vulnerability.WRITE_WHAT_WHERE, Vulnerability.WRITE_X_WHERE]): self._explore_arbitrary_write(path_file) else: raise CannotExplore("unknown explorable crash type: %s" % self.crash_types)
_explore_arbitrary_read / _explore_arbitrary_write
进行路径探索,分别对应任意写和任意读漏洞,使读写的地址是符号化地址,即我们可控的 ( point the violating address at a symbolic memory region ),返回一个 crash 对象。
通过调用 exploitable 接口判断 crash 是否可利用,rex 会判断 Crash 的漏洞类型是否属于可 exploitable 漏洞之一 。
def exploitable(self): exploitables = [Vulnerability.IP_OVERWRITE, Vulnerability.PARTIAL_IP_OVERWRITE, Vulnerability.BP_OVERWRITE, Vulnerability.PARTIAL_BP_OVERWRITE, Vulnerability.WRITE_WHAT_WHERE, Vulnerability.WRITE_X_WHERE] return self.one_of(exploitables)
检查是否可以泄露信息, 判断 crash 类型是否属于 ARBITRARY_READ/ARBITRARY_TRANSMIT
每个 technique 都是 Technique 对象的子类, 主要对 check / apply 这两个接口进行重写. 同时 Technique 对象实现了一些通用的接口, 作为构造 exploit 的辅助函数.
下面介绍一下 check / apply
check: 检查对于给定的crash, 该技术能否应用到 binary 上,返回布尔值
apply : 在binary的崩溃状态点应用该技术,返回 Exploit 对象或抛出 CannotExploit 异常
apply 其实就是根据每个技术的不同,添加不同的约束。
每种包含 applicable_to 属性,表示可以应用的平台, unix 或者 cgc
以下是 technique 的基本信息, 基本通过名称就能知道攻击技术,就不一一介绍了,它们的实现也比较朴素。
名称 | 限定漏洞类型 | 其他条件 | 平台 |
call_jmp_sp_shellcode |
call_shellcode |
circumstantially_set_register | IP_OVERWRITE / PARTIAL_IP_OVERWRITE | cgc | |
rop_leak_memory |
rop_register_control |
rop_set_register |
rop_to_accept_system |
IP_OVERWRITE / PARTIAL_IP_OVERWRITE | 存在 accept& read函数 | unix |
rop_to_execl |
IP_OVERWRITE/PARTIAL_IP_OVERWRITE | 存在 execl&dup2 函数 | unix |
rop_to_system |
IP_OVERWRITE / PARTIAL_IP_OVERWRITE | 存在 system 函数 | unix |
rop_to_system_complicated |
IP_OVERWRITE / PARTIAL_IP_OVERWRITE | libc 被加载& system 函数 & plt | unix |
shellcode_leak_address |
shellcode_set_register |
可以在调用 exploit 时设置 blacklist_techniques
成功应用 Technique 会返回 Exploit 对象,接下来介绍 Exploit 对象。
An Exploit object represents the successful application of an exploit technique to a crash state.
rex 实现了 ExploitFactory
类,用于管理和构建 exploit,
调用 exploit() 方法时,ExploitFactory
会依次应用每一种利用技术, 尝试生成 exploit, 得到的 exploit 会以arsenal[<techinique_name>]
形式存储在 arsenal 属性中. 针对 CGC 实现了 CGCExploitFactory 类.
构建 exp:
def exploit(self, blacklist_symbolic_explore=True, **kwargs): """ Initialize an exploit factory, with which you can build exploits. :return: An initialized ExploitFactory instance. :rtype: ExploitFactory """ factory = self._prepare_exploit_factory(blacklist_symbolic_explore, **kwargs) factory.initialize() return factory
函数主要为 exploit 的生成做一些准备操作,比如设置 technique 的黑名单,判断输入类型等。
以下是分别对 cgc 和 linux 两种格式的测试样例
def test_legit_00003(): # Test exploration and exploitation of legit_00003. inp = b"1\n" + b"A" * 200 #设置输入内容 path = os.path.join(bin_location, "tests/defcon24/legit_00003") with archr.targets.LocalTarget([path], target_os='cgc') as target: crash = rex.Crash(target, inp, fast_mode=True, rop_cache_path=os.path.join(cache_location, 'legit_00003')) nose.tools.assert_true(crash.explorable()) #判断是否可以 explore nose.tools.assert_true(crash.one_of(Vulnerability.WRITE_WHAT_WHERE)) # 漏洞是否为任意写 crash.explore() #进行探索m arsenal = crash.exploit(blacklist_techniques={'rop_set_register', 'rop_leak_memory'}) nose.tools.assert_true(len(arsenal.register_setters) >= 2) nose.tools.assert_true(len(arsenal.leakers) >= 1) crash.project.loader.close() for reg_setter in arsenal.register_setters: nose.tools.assert_true(_do_pov_test(reg_setter)) for leaker in arsenal.leakers: nose.tools.assert_true(_do_pov_test(leaker))
def test_linux_stacksmash_32(): # Test exploiting a simple linux program with a stack buffer overflow. We should be able to exploit the test binary by # ropping to 'system', calling shellcode in the BSS and calling 'jmpsp' shellcode in the BSS. inp = b"A" * 227 lib_path = os.path.join(bin_location, "tests/i386") ld_path = os.path.join(lib_path, "ld-linux.so.2") path = os.path.join(lib_path, "vuln_stacksmash") with archr.targets.LocalTarget([ld_path, '--library-path', lib_path, path], path, target_arch='i386').build().start() as target: crash = rex.Crash(target, inp, fast_mode=True, rop_cache_path=os.path.join(cache_location, 'vuln_stacksmash')) exploit = crash.exploit(blacklist_techniques={'rop_leak_memory', 'rop_set_register'}) crash.project.loader.close() # make sure we're able to exploit it in all possible ways assert len(exploit.arsenal) == 3 assert 'rop_to_system' in exploit.arsenal assert 'call_shellcode' in exploit.arsenal assert 'call_jmp_sp_shellcode' in exploit.arsenal _check_arsenal_has_send(exploit.arsenal)
这里顺便介绍一些 rex 依赖的 archr 模块
前面提到,在使用 rex 前,需要使用 archr 创建 target 对象。我们可以指定 target_path / target_os(linux /cgc) / target_arch(linux , x86_64)
archr 模块实现了以 target 为中心的分析模型。(传统是以程序 program 为中心)
Targets: 包含 target 的说明,如何配置,如何启动以及如何交互。
Bows:明确 target 特定的分析动作,包括 tracing,符号执行(symbolic execution)等,为了实现目标,Bows 可能会注入 Arrows
, gdbserver
等)到 target 中。
archr 提供了两种 target:
: docker 镜像LocalTarget
:本地系统运行的 target提供了以下 Bows :
名称 | 描述 |
DataScoutBow |
获取进程启动时的内存映射,环境,属性 |
AngrProjectBow |
创建 angr Project |
AngrStateBow |
创建 angr State |
QEMUTraceBow |
执行 qemu tracing |
GDBServerBow |
在 gdbserver 中启动 target |
STraceBow |
strace 目标(即跟踪系统调用和信号) |
CoreBow |
启动target 并恢复 core |
InputFDBow |
确定用户输入的FD数目 |
对于自动化利用,rex 比较简陋,漏洞利用技术也比较简单,但是我们可以学习它的思路,对其进行改进。