~fluix/tilde

d3f4453731c038fdc1d5b588cfbece485406dd8c — Steven Guikal 7 months ago 36aec7f
Switch to SQLAlchemy database instead of Redis
7 files changed, 114 insertions(+), 103 deletions(-)

M .gitignore
M app.py
M auth/forms.py
M auth/models.py
M auth/views.py
M config.defaults
D core.py
M .gitignore => .gitignore +1 -1
@@ 5,5 5,5 @@
__pycache__/
env/
uploads/
dump.rdb
data.db
config

M app.py => app.py +19 -11
@@ 3,13 3,10 @@
# SPDX-License-Identifier: AGPL-3.0-only

import logging
import core
import redis

from flask import Flask, render_template
from flask_login import LoginManager
from auth.views import auth
from auth.models import User
from flatpages.views import flatpages
from flask_sqlalchemy import SQLAlchemy

logger = logging.getLogger(__name__)



@@ 27,10 24,20 @@ if not app.config["SECRET_KEY"]:
    logger.fatal("SECRET_KEY must defined in some configuration file.")
    exit(1)

core.redis = redis.Redis(**app.config.get("REDIS"))
app.config["db"] = SQLAlchemy(app)

with app.app_context():
    from auth.views import auth
    from flatpages.views import flatpages

    app.register_blueprint(flatpages)
    app.register_blueprint(auth, subdomain="auth")


@app.before_first_request
def create_tables():
    app.config["db"].create_all()

app.register_blueprint(flatpages)
app.register_blueprint(auth, subdomain="auth")

login_manager = LoginManager()
login_manager.login_view = "auth.login"


@@ 39,7 46,8 @@ login_manager.init_app(app)


@login_manager.user_loader
def load_user(username):
def load_user(id):
    """Check is_active property to log out users that are no longer active."""
    user = User.get(username)
    return user if user.is_active else None
    from auth.models import User

    return User.query.get(id)

M auth/forms.py => auth/forms.py +20 -9
@@ 1,23 1,34 @@
# SPDX-FileCopyrightText: 2021 Steven Guikal <void@fluix.one>
#
# SPDX-License-Identifier: AGPL-3.0-only

from wtforms import Form, PasswordField, StringField
from wtforms.validators import Email, Length, required, regexp
from wtforms.validators import Email, EqualTo, InputRequired, Length, Regexp
from wtforms.widgets import TextArea


class LoginForm(Form):
    username = StringField("Username", [required()])
    password = PasswordField("Password", [required()])
    username = StringField("Username", [InputRequired()])
    password = PasswordField("Password", [InputRequired()])


class RegistrationForm(Form):
    username = StringField(
        "Username",
        [
            required(),
            regexp("[a-zA-Z0-9]{2,20}", message="Input must be alphanumeric."),
            InputRequired(),
            Regexp("[a-zA-Z0-9]{2,20}", message="Input must be alphanumeric."),
            Length(min=2, max=20),
        ],
    )
    email = StringField("Email", [required(), Email()])
    password1 = PasswordField("Password", [required(), Length(min=16)])
    password2 = PasswordField("Password (confirm)", [required()])
    bio = StringField("Biography", [required()], widget=TextArea())
    email = StringField("Email", [InputRequired(), Email()])
    password1 = PasswordField(
        "Password",
        [
            InputRequired(),
            Length(min=16),
            EqualTo("password2", message="Passwords must match."),
        ],
    )
    password2 = PasswordField("Password (confirm)", [InputRequired()])
    bio = StringField("Biography", [InputRequired()], widget=TextArea())

M auth/models.py => auth/models.py +32 -46
@@ 1,51 1,37 @@
# SPDX-FileCopyrightText: 2021 Steven Guikal <void@fluix.one>
#
# SPDX-License-Identifier: AGPL-3.0-only

from enum import Enum, auto

from flask import current_app
from flask_login.mixins import UserMixin
import core


class User(UserMixin, object):
    PENDING = "pending"
    ACTIVE = "active"
    BANNED = "banned"
    DELETED = "deleted"

    def __init__(self, username, email, password, bio, status=PENDING):
        self.username = username
        self.email = email
        self.password = password
        self.bio = bio
        self.status = status

    @classmethod
    def register(cls, *args):
        user = cls(*args)
        if not user.save():
            raise ValueError("Username is taken.")

    @classmethod
    def get(cls, username):
        user_bytes = core.redis.hgetall(f"user:{username.lower()}")
        user = {k.decode(): v.decode() for k, v in user_bytes.items()}
        return cls(**user) if user else None

    def save(self):
        key = f"user:{self.username.lower()}"
        if not core.redis.hsetnx(key, "username", self.username):
            return False
        core.redis.hset(
            key,
            mapping={
                "username": self.username,
                "email": self.email,
                "password": self.password,
                "bio": self.bio,
                "status": self.status,
            },
        )
        return True
from werkzeug.security import check_password_hash, generate_password_hash

db = current_app.config["db"]


class Status(Enum):
    PENDING = auto()
    ACTIVE = auto()
    BANNED = auto()
    DELETED = auto()


class User(UserMixin, db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(20, collation="NOCASE"), unique=True)
    email = db.Column(db.String())
    password_hash = db.Column(db.String())
    status = db.Column(db.Enum(Status), default=Status.PENDING)
    bio = db.Column(db.Text())

    @property
    def is_active(self):
        return self.status == self.ACTIVE
        return self.status == Status.ACTIVE

    def set_password(self, password):
        self.password_hash = generate_password_hash(password)

    def get_id(self):
        return self.username
    def check_password(self, password):
        return check_password_hash(self.password_hash, password)

M auth/views.py => auth/views.py +40 -26
@@ 2,15 2,24 @@
#
# SPDX-License-Identifier: AGPL-3.0-only

from flask import Blueprint, flash, redirect, render_template, request, session, url_for
from flask import (
    Blueprint,
    current_app,
    flash,
    redirect,
    render_template,
    request,
    session,
    url_for,
)
from flask_login import login_required, login_user, logout_user
from werkzeug.security import check_password_hash, generate_password_hash
from sqlalchemy.exc import IntegrityError

from .forms import LoginForm, RegistrationForm
from .models import User

from .models import Status, User

auth = Blueprint("auth", __name__, template_folder="templates")
db = current_app.config["db"]


@auth.route("/")


@@ 24,30 33,32 @@ def account():
def logout():
    logout_user()
    flash("Logged out successfully.")
    return redirect(url_for("index"))
    return redirect(url_for("flatpages.index"))


@auth.route("/login", methods=["GET", "POST"])
def login():
    form = LoginForm(request.form)
    if request.method == "POST" and form.validate():
        user = User.get(form.username.data)
        if not user:
            form.username.errors.append("User doesn't exist.")
        if user.status == User.PENDING:
            form.username.errors.append("User is pending verification.")
        if user.status == User.DELETED:
            form.username.errors.append("User is deleted.")
        if user.status == User.BANNED:
            form.username.errors.append("User is banned.")
        if not check_password_hash(user.password, form.password.data):
            form.password.errors.append("Incorrect password.")
        user = User.query.filter_by(username=form.username.data).first()

        if not user or not user.check_password(form.password.data):
            form.username.errors.append("Username or password is incorrect.")
            return render_template("login.html", form=form)

        status_errors = {
            Status.PENDING: "User is pending verification.",
            Status.DELETED: "User is deleted.",
            Status.BANNED: "User is banned.",
        }
        if status_error := status_errors.get(user.status):
            form.username.errors.append(status_error)
        if form.errors:
            return render_template("login.html", form=form)

        login_user(user)
        flash(f"Login successful. Welcome ~{user.username}!")
        return redirect(session.get("next", url_for("index")))
        return redirect(session.get("next", url_for("flatpages.index")))
    return render_template("login.html", form=form)




@@ 55,17 66,20 @@ def login():
def register():
    form = RegistrationForm(request.form)
    if request.method == "POST" and form.validate():
        if form.password1.data != form.password2.data:
            form.password2.errors.append("Passwords do not match.")
            return render_template("register.html", form=form)

        password = generate_password_hash(form.password1.data)
        user = User(
            username=form.username.data,
            email=form.email.data,
            bio=form.bio.data,
        )
        user.set_password(form.password1.data)
        db.session.add(user)
        try:
            User.register(form.username.data, form.email.data, password, form.bio.data)
        except ValueError as e:
            form.username.errors.append(e)
            db.session.commit()
        except IntegrityError as e:
            db.session.rollback()
            form.username.errors.append("Username taken.")
            return render_template("register.html", form=form)

        flash("Account registration sent in. Please await a reply.")
        return redirect(url_for("index"))
        return redirect(url_for("flatpages.index"))
    return render_template("register.html", form=form)

M config.defaults => config.defaults +2 -6
@@ 21,9 21,5 @@ MEDIA_ROOT = "./uploads/"
# of checking safetyness.
USE_SESSION_FOR_NEXT = True

# Redis as a database
REDIS = {
    "host": "127.0.0.1",
    "port": 6379,
    "db": 0,
}
SQLALCHEMY_DATABASE_URI = "sqlite:///data.db"
SQLALCHEMY_TRACK_MODIFICATIONS = False

D core.py => core.py +0 -4
@@ 1,4 0,0 @@
# SPDX-FileCopyrightText: 2021 Steven Guikal <void@fluix.one>
#
# SPDX-License-Identifier: AGPL-3.0-only
redis = None