~fluix/tilde

8f46b9d65448ed50e322f49ecdbad39737700a1b — Steven Guikal 7 months ago 8385d93
Add user account activation
2 files changed, 52 insertions(+), 23 deletions(-)

M auth/models.py
M auth/views.py
M auth/models.py => auth/models.py +16 -4
@@ 6,16 6,21 @@ from enum import Enum, auto

from flask import current_app
from flask_login.mixins import UserMixin
from itsdangerous import BadSignature
from itsdangerous.url_safe import URLSafeSerializer
from werkzeug.security import check_password_hash, generate_password_hash

db = current_app.config["db"]


class Status(Enum):
    PENDING = auto()
    ACTIVE = auto()
    BANNED = auto()
    DELETED = auto()
    PENDING = "User is pending verification. Login with the activation link provided in the email."
    ACTIVE = None
    BANNED = "User is banned."
    DELETED = "User is deleted."

    def __init__(self, error):
        self.error = error


class User(UserMixin, db.Model):


@@ 35,3 40,10 @@ class User(UserMixin, db.Model):

    def check_password(self, password):
        return check_password_hash(self.password_hash, password)

    def check_activation(self, code):
        signer = URLSafeSerializer(current_app.secret_key, salt="activate")
        try:
            return signer.loads(code) == [self.id, self.username]
        except BadSignature:
            return False

M auth/views.py => auth/views.py +36 -19
@@ 2,6 2,7 @@
#
# SPDX-License-Identifier: AGPL-3.0-only

import click
import json

from flask import (


@@ 15,6 16,7 @@ from flask import (
    url_for,
)
from flask_login import login_required, login_user, logout_user
from itsdangerous.url_safe import URLSafeSerializer
from sqlalchemy.exc import IntegrityError

from .forms import LoginForm, RegistrationForm


@@ 42,21 44,26 @@ def logout():
def login():
    form = LoginForm(request.form)
    if request.method == "POST" and form.validate():
        user = User.query.filter_by(username=form.username.data).first()
        if form.errors:
            return render_template("login.html", form=form)

        if not user or not user.check_password(form.password.data):
            form.username.errors.append("Username or password is incorrect.")
        user = User.query.filter_by(username=form.username.data).first()
        if not user:
            form.username.errors.append("User does not exist.")
            return render_template("login.html", form=form)
        if not user.check_password(form.password.data):
            form.password.errors.append("Incorrect password.")
            return render_template("login.html", form=form)

        # TODO: Move these on to the Enum itself
        status_errors = {
            Status.PENDING: "User is pending verification.",
            Status.DELETED: "User is deleted.",
            Status.BANNED: "User is banned.",
        }
        if status_error := status_errors.get(user.status):
            form.username.errors.append(status_error)
        if form.errors:
        if user.status == Status.PENDING:
            if not user.check_activation(request.args.get("activate", "")):
                form.username.errors.append(user.status.error)
                return render_template("login.html", form=form)
            user.status = Status.ACTIVE
            db.session.add(user)
            db.session.commit()
        elif user.status.error:
            form.username.errors.append(user.status.error)
            return render_template("login.html", form=form)

        login_user(user)


@@ 91,17 98,27 @@ def register():
@auth.cli.command("ergo")
def ergo():
    data = json.loads(input())
    status_errors = {
        Status.PENDING: "User is pending verification.",
        Status.DELETED: "User is deleted.",
        Status.BANNED: "User is banned.",
    }
    out = {"success": True}

    user = User.query.filter_by(username=data.get("accountName", "")).first()
    if not user or not user.check_password(data.get("passphrase")):
        out = {"success": False, "error": "Username or password is incorrect."}
    elif status_error := status_errors.get(user.status):
        out = {"success": False, "error": status_error}
    elif user.status.error:
        out = {"success": False, "error": user.status.error}

    print(json.dumps(out))


@auth.cli.command("activate")
@click.argument("username")
def activate(username):
    user = User.query.filter_by(username=username).first()
    if not user:
        print("User does not exist")
        exit(1)
    if user.status != Status.PENDING:
        print(f"User status is {user.status}, not {Status.PENDING}.")
        exit(1)
    signer = URLSafeSerializer(current_app.secret_key, salt="activate")
    activation_code = signer.dumps([user.id, user.username])
    print(f"{url_for('auth.login')}?activate={activation_code}")