~sircmpwn/meta.sr.ht

ref: 72548bd7545f78670878667674cc7645835a17bd meta.sr.ht/metasrht/blueprints/users.py -rw-r--r-- 6.7 KiB
72548bd7Drew DeVault API: Updates per core-go auth changes 1 year, 1 month ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import socket
from datetime import datetime, timedelta
from flask import Blueprint, render_template, request, redirect, url_for, abort
from metasrht.decorators import adminrequired
from metasrht.types import Invoice
from metasrht.types import User, UserAuthFactor, FactorType, AuditLogEntry
from metasrht.types import UserNote, PaymentInterval
from metasrht.webhooks import UserWebhook, deliver_profile_update
from sqlalchemy import and_
from srht.database import db
from srht.flask import paginate_query
from srht.graphql import exec_gql, gql_time
from srht.oauth import UserType
from srht.search import search_by
from srht.validation import Validation

users = Blueprint("users", __name__)

@users.route("/users")
@adminrequired
def users_GET():
    terms = request.args.get("search")
    users = User.query.order_by(User.created.desc())

    search_error = None
    try:
        users = search_by(users, terms, [User.username, User.email])
    except ValueError as ex:
        search_error = str(ex)

    users, pagination = paginate_query(users)
    return render_template("users.html",
            users=users, search=terms, search_error=search_error, **pagination)

def render_user_template(user):
    totp = (UserAuthFactor.query
        .filter(UserAuthFactor.user_id == user.id)
        .filter(UserAuthFactor.factor_type == FactorType.totp)).one_or_none()
    audit_log = (AuditLogEntry.query
        .filter(AuditLogEntry.user_id == user.id)
        .order_by(AuditLogEntry.created.desc())).limit(15)
    invoices = (Invoice.query
        .filter(Invoice.user_id == user.id)
        .order_by(Invoice.created.desc())).all()
    rdns = dict()
    for log in audit_log:
        addr = str(log.ip_address)
        if addr not in rdns:
            try:
                host, _, _ = socket.gethostbyaddr(addr)
                rdns[addr] = host
            except socket.herror:
                continue
    if user.reset_expiry:
        reset_pending = user.reset_expiry > datetime.utcnow()
    else:
        reset_pending = False
    one_year = datetime.utcnow()
    one_year = datetime(year=one_year.year + 1,
            month=one_year.month, day=one_year.day)
    dashboard_query = """
    query {
        personalAccessTokens { id, comment, issued, expires }
        oauthClients { id, uuid, name, url }
        oauthGrants {
            id
            issued
            expires
            tokenHash
            client {
                name
                url
                owner {
                    canonicalName
                }
            }
        }
    }
    """
    try:
        r = exec_gql("meta.sr.ht", dashboard_query, user=user)
        personal_tokens = r["personalAccessTokens"]
        for pt in personal_tokens:
            pt["issued"] = gql_time(pt["issued"])
            pt["expires"] = gql_time(pt["expires"])
        oauth_clients = r["oauthClients"]
        oauth_grants = r["oauthGrants"]
        for grant in oauth_grants:
            grant["issued"] = gql_time(grant["issued"])
            grant["expires"] = gql_time(grant["expires"])
    except:
        personal_tokens = []
        oauth_clients = []
        oauth_grants = []
    return render_template("user.html", user=user,
            totp=totp, audit_log=audit_log, reset_pending=reset_pending,
            one_year=one_year, rdns=rdns,
            personal_tokens=personal_tokens,
            oauth_clients=oauth_clients,
            oauth_grants=oauth_grants,
            invoices=invoices)

@users.route("/users/~<username>")
@adminrequired
def user_by_username_GET(username):
    user = User.query.filter(User.username == username).one_or_none()
    if not user:
        abort(404)
    return render_user_template(user)

@users.route("/users/~<username>/add-note", methods=["POST"])
@adminrequired
def user_add_note(username):
    user = User.query.filter(User.username == username).one_or_none()
    if not user:
        abort(404)
    valid = Validation(request)
    notes = valid.require("notes")
    if not valid.ok:
        return render_user_template(user)
    note = UserNote()
    note.user_id = user.id
    note.note = notes
    db.session.add(note)
    db.session.commit()
    return redirect(url_for(".user_by_username_GET", username=username))

@users.route("/users/~<username>/disable-totp", methods=["POST"])
@adminrequired
def user_disable_totp(username):
    user = User.query.filter(User.username == username).one_or_none()
    if not user:
        abort(404)
    UserAuthFactor.query.filter(UserAuthFactor.user_id == user.id).delete()
    db.session.commit()
    return redirect(url_for(".user_by_username_GET", username=username))

@users.route("/users/~<username>/set-type", methods=["POST"])
@adminrequired
def set_user_type(username):
    user = User.query.filter(User.username == username).one_or_none()
    if not user:
        abort(404)

    valid = Validation(request)
    user_type = valid.require("user_type", cls=UserType)
    if not valid.ok:
        return redirect(url_for(".user_by_username_GET", username=username))

    user.user_type = user_type
    db.session.commit()

    deliver_profile_update(user)
    return redirect(url_for(".user_by_username_GET", username=username))

@users.route("/users/~<username>/suspend", methods=["POST"])
@adminrequired
def user_suspend(username):
    user = User.query.filter(User.username == username).one_or_none()
    if not user:
        abort(404)
    valid = Validation(request)
    reason = valid.optional("reason")
    user.user_type = UserType.suspended
    user.suspension_notice = reason
    db.session.commit()
    deliver_profile_update(user)
    return redirect(url_for(".user_by_username_GET", username=username))

@users.route("/users/~<username>/invoice", methods=["POST"])
@adminrequired
def user_invoice(username):
    user = User.query.filter(User.username == username).one_or_none()
    if not user:
        abort(404)
    valid = Validation(request)
    amount = valid.require("amount")
    amount = int(amount) if amount else 0
    valid_thru = valid.require("valid_thru")
    valid_thru = datetime.strptime(valid_thru, "%Y-%m-%d") if valid_thru else None
    source = valid.require("source")
    if not valid.ok:
        return redirect(url_for(".user_by_username_GET", username=username))

    invoice = Invoice()
    invoice.cents = amount * 100
    invoice.user_id = user.id
    invoice.valid_thru = valid_thru
    invoice.source = source
    db.session.add(invoice)

    user.payment_due = valid_thru
    if valid_thru > datetime.utcnow() + timedelta(days=30):
        user.payment_interval = PaymentInterval.yearly
    else:
        user.payment_interval = PaymentInterval.monthly

    user.user_type = UserType.active_paying

    db.session.commit()

    return redirect(url_for(".user_by_username_GET", username=username))