wafw00f源码浅析
2021-05-06 10:51:16 Author: xz.aliyun.com(查看原文) 阅读量:120 收藏

https://github.com/EnableSecurity/wafw00f/
Wafw00f是一款知名的WAF识别工具,目前Github有2300的Star,由Python编写。简单测试了几个网站,发现识别效果不是那么好

整体

  • docs:sphinx自动生成文档的工作
  • wafw00f:代码
  • setup.py:安装脚本

wafw00f

  • bin:启动文件,执行这里的脚本即可启动程序
  • lib:asciiarts是logo,evillib是发请求的工具类,后续分析
  • plugins:指纹识别的规则,后续分析
  • main:核心代码
  • manager:管理规则的脚本
  • wafprio:指纹识别优先级规定

首先看一下plugins目录下的规则文件,每一个都是py文件,并且都有函数is_waf

def is_waf(self):
    schemes = [
        self.matchHeader(('aeSecure-code', '.+?')),
        self.matchContent(r'aesecure_denied\.png')
    ]
    if any(i for i in schemes):
        return True
    return False

比如以上这个规则,是匹配响应头和响应Body中的关键字

正是分析代码,按照一般的规则,先从main.py分析
一开始是常见的参数解析:
parser = OptionParser(usage='%prog url1 [url2 [url3 ... ]]\r\nexample: %prog http://www.victim.org/')

338行print(randomArt())调用asciiarts.py打印LOGO

如果是-l参数那么就打印所有可用的waf,从wafprio文件中读取:

if options.list:
    ......
    try:
        m = [i.replace(')', '').split(' (') for i in wafdetectionsprio]
        print(R+'  WAF Name'+' '*24+'Manufacturer\n  '+'-'*8+' '*24+'-'*12+'\n')
        max_len = max(len(str(x)) for k in m for x in k)
        ......

如果是-v参数那么就打印版本信息:

print('[+] The version of WAFW00F you have is %sv%s%s' % (B, __version__, E))

如果有自定义请求头,那么就尝试从指定的文件中解析请求头,getheaders函数较简单

extraheaders = getheaders(options.headers)

如果输入是一个完整的文件,尝试JSON解析:

with open(options.input) as f:
    try:
        urls = json.loads(f.read())
    except json.decoder.JSONDecodeError:
        log.critical("JSON file %s did not contain well-formed JSON", options.input)
        sys.exit(1)

另外也支持CSV文件和最基础的txt列表格式:

elif options.input.endswith('.csv'):
    columns = defaultdict(list)
    with open(options.input) as f:
        reader = csv.DictReader(f)
        for row in reader:
            for (k,v) in row.items():
                columns[k].append(v)
    targets = columns['url']
else:
    with open(options.input) as f:
        targets = [x for x in f.read().splitlines()]

然后进行了URL验证,比如是否HTTP开头,代理信息处理等操作,创建一个核心类WAF00F,并发请求:

attacker = WAFW00F(target, debuglevel=options.verbose, path=path,
            followredirect=options.followredirect, extraheaders=extraheaders,
                proxies=proxies)
global rq
rq = attacker.normalRequest()

WAF00F类在开头定义了五个Payload,为了触发WAF而设计:

xsstring = '<script>alert("XSS");</script>'
sqlistring = "UNION SELECT ALL FROM information_schema AND ' or SLEEP(5) or '"
lfistring = '../../../../etc/passwd'
rcestring = '/bin/cat /etc/passwd; ping 127.0.0.1; curl google.com'
xxestring = '<!ENTITY xxe SYSTEM "file:///etc/shadow">]><pwn>&hack;</pwn>'

上面调用的normalRequest函数使用了evillib文件,简单的requests库使用:

if not headers: 
    h = self.headers
else: h = headers
req = requests.get(self.target, proxies=self.proxies, headers=h, timeout=timeout,
        allow_redirects=self.allowredir, params=params, verify=False)

拿到响应后,就要进行WAF探测的内容了。options.findall是传入的参数-a,代表你想测试所有的WAF,不因为探测到某一种而停止,这里作为一个布尔参数传入:

waf = attacker.identwaf(options.findall)

identwaf开头又调用了performCheck函数,这个函数作用是执行它的参数(函数传参),所以需要关心的是centralAttack是什么:

try:
    self.attackres = self.performCheck(self.centralAttack)
except RequestBlocked:
    return detected

可以看到是发请求,参数是上面的payload,为了触发WAF:

return self.Request(path=self.path, params={'a': self.xsstring, 'b': self.sqlistring, 'c': self.lfistring})

继续回到上层,如果响应为空,会抛出异常,返回检测失败;如果返回正常,那么会从上文提到的指纹识别优先级规定文件wafprio中读取并逐个检测,如果有findall表示就会一直执行:

for wafvendor in self.checklist:
    self.log.info('Checking for %s' % wafvendor)
    if self.wafdetections[wafvendor](self):
        detected.append(wafvendor)
        if not findall:
            break
self.knowledge['wafname'] = detected
return detected

wafdetections的代码如下,load_plugins是上文提到manager.py文件中的函数,加载所有插件进行检测,每一个规则都有is_waf函数,记录了规则匹配方式;最后求差集确保添加到checklist中,也就是上文的指纹识别优先级规定文件:

wafdetections = dict()

plugin_dict = load_plugins()
result_dict = {}
for plugin_module in plugin_dict.values():
    wafdetections[plugin_module.NAME] = plugin_module.is_waf
# Check for prioritized ones first, then check those added externally
checklist = wafdetectionsprio
checklist += list(set(wafdetections.keys()) - set(checklist))

注意一个写法值得思考,这里的wafdetections不是一个字典吗?使用索引[]得到某一个item后,为什么要在后面加一个(self)呢?

if self.wafdetections[wafvendor](self):

要回答这个问题其实很简单,因为这个字典中保存的是key=string;value=function这样的数据,item的值就是函数,这时候可以使用wafdetections[key](param)的方式调用函数,而这个函数是什么呢?就是规则文件的is_waf函数:

def is_waf(self):
    schemes = [
        self.matchHeader(('aeSecure-code', '.+?')),
        self.matchContent(r'aesecure_denied\.png')
    ]
    if any(i for i in schemes):
        return True
    return False

而规则文件调用了self.matchHeaderself.matchContent函数,所以我们应该查看WAFW00F类的这两个函数,因为参数self就是WAFW00F类的this指针

matchHeader函数如下:解析响应头,处理Cookie,然后正则匹配,比较简单

def matchHeader(self, headermatch, attack=False):
    if attack:
        r = self.attackres
    else: r = rq
    if r is None:
        return
    header, match = headermatch
    headerval = r.headers.get(header)
    if headerval:
        # set-cookie can have multiple headers, python gives it to us
        # concatinated with a comma
        if header == 'Set-Cookie':
            headervals = headerval.split(', ')
        else:
            headervals = [headerval]
        for headerval in headervals:
            if re.search(match, headerval, re.I):
                return True
    return False

matchContent函数也是这样的原理,正则匹配响应Body:

def matchContent(self, regex, attack=True):
    if attack:
        r = self.attackres
    else: r = rq
    if r is None:
        return
    # We may need to match multiline context in response body
    if re.search(regex, r.text, re.I):
        return True
    return False

层层跳出,回到最开始的地方

waf = attacker.identwaf(options.findall)

当没有匹配到任何一个WAF的时候,会执行这样的代码:

if attacker.genericdetect():
    log.info('Generic Detection: %s' % attacker.knowledge['generic']['reason'])
    print('[*] The site %s seems to be behind a WAF or some sort of security solution' % target)
    print('[~] Reason: %s' % attacker.knowledge['generic']['reason'])
    results.append(buildResultRecord(target, 'generic'))
else:
    print('[-] No WAF detected by the generic detection')
    results.append(buildResultRecord(target, None))

观察genericdetect函数,先发一个常见的请求,然后发带有payload的请求,然后对比响应码,如果不相等,证明检测到了WAF。也就是说这个函数为了验证某个站点是否是具有WAF的(但是WAFW00F本身并没有识别出来)

resp1 = self.performCheck(self.normalRequest)
......
resp2 = self.performCheck(self.xssAttack)
if resp1.status_code != resp2.status_code:
    return True
......
resp2 = self.performCheck(self.lfiAttack)
......
resp2 = self.performCheck(self.sqliAttack)
......

后续的代码没有什么值得分析之处,将识别结果打印到命令行或者输出文件这样的功能

  • wafw00f有比较优秀的代码质量,丰富的输入输出(JSON、CSV、TXT)
  • 规则用py直接编写,能否改进为JSON,YML等文件
  • 作为一个拥有2300星的python项目,却没有使用任何高性能技术(多线程、多进程、协程)
  • 可以参考它的payload触发waf识别的原理,编写更完善的工具

参考wafw00f的源码,我打算用golang做一下更完善的版本
目前做了个开头:https://github.com/EmYiQing/go-wafw00f


文章来源: http://xz.aliyun.com/t/9497
如有侵权请联系:admin#unsafe.sh