~sircmpwn/meta.sr.ht

ref: 72548bd7545f78670878667674cc7645835a17bd meta.sr.ht/metasrht/blueprints/security.py -rw-r--r-- 4.9 KiB
72548bd7Drew DeVault API: Updates per core-go auth changes 1 year, 1 month ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from flask import Blueprint, render_template, request, redirect, abort, url_for
from metasrht.audit import audit_log
from metasrht.auth.builtin import hash_password
from metasrht.qrcode import gen_qr
from metasrht.totp import totp
from metasrht.types import User, UserAuthFactor, FactorType, AuditLogEntry
from prometheus_client import Counter
from srht.config import cfg
from srht.database import db
from srht.flask import session
from srht.oauth import current_user, loginrequired
from srht.validation import Validation, valid_url
from urllib.parse import quote
import base64
import os

security = Blueprint('security', __name__)

site_name = cfg("sr.ht", "site-name")

metrics = type("metrics", tuple(), {
    c.describe()[0].name: c
    for c in [
        Counter("meta_totp_enabled", "Number of times TOTP was disabled for a user"),
        Counter("meta_totp_disabled", "Number of times TOTP was enabled for a user"),
    ]
})

@security.route("/security")
@loginrequired
def security_GET():
    totp = (UserAuthFactor.query
        .filter(UserAuthFactor.user_id == current_user.id)
        .filter(UserAuthFactor.factor_type == FactorType.totp)).one_or_none()
    audit_log = (AuditLogEntry.query
        .filter(AuditLogEntry.user_id == current_user.id)
        .order_by(AuditLogEntry.created.desc())).limit(15)
    return render_template("security.html",
        audit_log=audit_log,
        totp=totp)

@security.route("/security/audit/log")
@loginrequired
def security_audit_log_GET():
    audit_log = (AuditLogEntry.query
        .filter(AuditLogEntry.user_id == current_user.id)
        .order_by(AuditLogEntry.created.desc())).all()
    return render_template("audit-log.html", audit_log=audit_log)

def totp_get_qrcode(secret):
    return gen_qr(otpauth_uri(secret))

def otpauth_uri(secret):
    return "otpauth://totp/{}:{}?secret={}&issuer={}".format(
        quote(site_name), quote("{} <{}>".format(current_user.username,
            current_user.email)), secret, quote(site_name))

@security.route("/security/totp/enable")
@loginrequired
def security_totp_enable_GET():
    secret = base64.b32encode(os.urandom(10)).decode('utf-8')
    return render_template("totp-enable.html",
        qrcode=totp_get_qrcode(secret),
        otpauth_uri=otpauth_uri(secret),
        secret=secret)

@security.route("/security/totp/enable", methods=["POST"])
@loginrequired
def security_totp_enable_POST():
    valid = Validation(request)

    secret = valid.require("secret")
    code = valid.require("code")

    if not valid.ok:
        return render_template("totp-enable.html",
            qrcode=totp_get_qrcode(secret),
            otpauth_uri=otpauth_uri(secret),
            secret=secret, valid=valid), 400
    code = code.replace(" ", "")
    try:
        code = int(code)
    except:
        valid.error(
                "This TOTP code is invalid (expected a number)", field="code")
    if not valid.ok:
        return render_template("totp-enable.html",
            qrcode=totp_get_qrcode(secret),
            otpauth_uri=otpauth_uri(secret),
            secret=secret, valid=valid), 400

    valid.expect(totp(secret, code),
            "The code you entered is incorrect.", field="code")

    if not valid.ok:
        return render_template("totp-enable.html",
            qrcode=totp_get_qrcode(secret),
            otpauth_uri=otpauth_uri(secret),
            secret=secret, valid=valid), 400

    factor = UserAuthFactor(current_user, FactorType.totp)
    factor.secret = secret.encode('utf-8')

    recovery_codes = []
    hashed_codes = []
    for _ in range(10):
        code = base64.b32encode(os.urandom(10)).decode('utf-8').rstrip("=")
        recovery_codes.append(code)
        hashed_codes.append(hash_password(code))

    factor.extra = hashed_codes

    db.session.add(factor)
    audit_log("Enable TOTP", details="Enabled two-factor authentication",
            email=True, subject=f"TOTP has been enabled for your {cfg('sr.ht', 'site-name')} account",
            email_details="2FA via TOTP was enabled")
    db.session.commit()
    metrics.meta_totp_enabled.inc()

    session["recovery-codes"] = recovery_codes
    return redirect("/security/totp/complete")

@security.route("/security/totp/complete")
@loginrequired
def security_totp_complete():
    codes = session.pop("recovery-codes", None)
    if not codes:
        return redirect("/security")
    return render_template("totp-enabled.html", codes=codes)

@security.route("/security/totp/disable", methods=["POST"])
@loginrequired
def security_totp_disable_POST():
    factor = (UserAuthFactor.query
        .filter(UserAuthFactor.user_id == current_user.id)
        .filter(UserAuthFactor.factor_type == FactorType.totp)).one_or_none()
    if not factor:
        return redirect("/security")

    session["extra_factors"] = [factor.id]
    session["authorized_user"] = current_user.id
    session["challenge_type"] = "disable_totp"
    session["return_to"] = "/security"
    return redirect(url_for("auth.totp_challenge_GET"))