~sircmpwn/meta.sr.ht

ref: 72548bd7545f78670878667674cc7645835a17bd meta.sr.ht/metasrht/blueprints/oauth_web.py -rw-r--r-- 9.2 KiB
72548bd7Drew DeVault API: Updates per core-go auth changes 1 year, 1 month ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import json
from datetime import datetime
from flask import Blueprint, render_template, request, redirect, abort
from metasrht.audit import audit_log
from metasrht.types import OAuthClient, OAuthToken, DelegatedScope
from metasrht.types import RevocationUrl
from srht.database import db
from srht.flask import session
from srht.oauth import current_user, loginrequired
from srht.validation import Validation, valid_url
from srht.webhook.celery import async_request

oauth_web = Blueprint('oauth_web', __name__)

@oauth_web.route("/oauth")
@loginrequired
def oauth_GET():
    client_authorizations = (OAuthToken.query
            .join(OAuthToken.client)
            .filter(OAuthClient.preauthorized == False)
            .filter(OAuthToken.user_id == current_user.id)
            .filter(OAuthToken.expires > datetime.utcnow())
            .filter(OAuthToken.client_id != None)).all()
    personal_tokens = (OAuthToken.query
            .filter(OAuthToken.user_id == current_user.id)
            .filter(OAuthToken.expires > datetime.utcnow())
            .filter(OAuthToken.client_id == None)
            .filter(OAuthToken.token_partial != "internal")).all()
    def client_tokens(client):
        return (OAuthToken.query
                .filter(OAuthToken.client_id == client.id)).count()
    return render_template("oauth.html", client_tokens=client_tokens,
            client_authorizations=client_authorizations,
            personal_tokens=personal_tokens)

@oauth_web.route("/oauth/register")
@loginrequired
def oauth_register_GET():
    return render_template("oauth-register.html")

@oauth_web.route("/oauth/register", methods=["POST"])
@loginrequired
def oauth_register_POST():
    valid = Validation(request)

    client_name = valid.require("client-name")
    redirect_uri = valid.require("redirect-uri")

    if redirect_uri != "urn:ietf:wg:oauth:2.0:oob":
        valid.expect(not redirect_uri or valid_url(redirect_uri),
                "Must be a valid HTTP or HTTPS URI", field="redirect-uri")

    if not valid.ok:
        return render_template("oauth-register.html",
                client_name=client_name,
                redirect_uri=redirect_uri,
                valid=valid)

    client = OAuthClient(current_user, client_name, redirect_uri)
    secret = client.gen_client_secret()
    session["client_id"] = client.client_id
    session["client_secret"] = secret
    session["client_event"] = "registered"
    db.session.add(client)
    audit_log("register oauth client",
            "Registered OAuth client {}".format(client.client_id))
    db.session.commit()
    return redirect("/oauth/registered")

@oauth_web.route("/oauth/registered")
@loginrequired
def oauth_registered():
    client_id = session.get("client_id")
    client_secret = session.get("client_secret")
    client_event = session.get("client_event")
    if not client_id or not client_secret or not client_event:
        abort(400)
    del session["client_id"]
    del session["client_secret"]
    del session["client_event"]
    return render_template("oauth-registered.html",
            client_id=client_id,
            client_secret=client_secret,
            client_event=client_event)

@oauth_web.route("/oauth/client/<client_id>/settings")
@loginrequired
def client_settings_GET(client_id):
    client = OAuthClient.query.filter(OAuthClient.client_id == client_id).first()
    if not client or client.user_id != current_user.id:
        abort(404)
    return render_template("client-settings.html", client=client)

@oauth_web.route("/oauth/client/<client_id>/security")
@loginrequired
def client_security_GET(client_id):
    client = OAuthClient.query.filter(OAuthClient.client_id == client_id).first()
    if not client or client.user_id != current_user.id:
        abort(404)
    return render_template("client-security.html", client=client)

@oauth_web.route("/oauth/reset-secret/<client_id>", methods=["POST"])
@loginrequired
def reset_secret(client_id):
    client = OAuthClient.query.filter(OAuthClient.client_id == client_id).first()
    if not client or client.user_id != current_user.id:
        abort(404)
    secret = client.gen_client_secret()
    session["client_id"] = client.client_id
    session["client_secret"] = secret
    session["client_event"] = "reset-secret"
    audit_log("reset client secret",
            "Reset OAuth client secret for {}".format(client.client_id))
    db.session.commit()
    return redirect("/oauth/registered")

@oauth_web.route("/oauth/revoke-tokens/<client_id>")
@loginrequired
def revoke_tokens_GET(client_id):
    client = OAuthClient.query.filter(OAuthClient.client_id == client_id).first()
    if not client or client.user_id != current_user.id:
        abort(404)
    return render_template("are-you-sure.html",
            blurb="revoke all OAuth tokens for client {}".format(client_id),
            action="/oauth/revoke-tokens/{}".format(client_id),
            cancel="/oauth")

@oauth_web.route("/oauth/revoke-tokens/<client_id>", methods=["POST"])
@loginrequired
def revoke_tokens_POST(client_id):
    client = OAuthClient.query.filter(
            OAuthClient.client_id == client_id).first()
    if not client or client.user_id != current_user.id:
        abort(404)
    for token in OAuthToken.query.filter(OAuthToken.client_id == client.id).all():
        revocations = (RevocationUrl.query
                .filter(RevocationUrl.token_id == token.id)).all()
        for revocation in revocations:
            async_request.delay(revocation.url, json.dumps({
                "token_hash": token.token_hash,
            }), {"Content-Type": "application/json"})
            db.session.delete(revocation)
        db.session.delete(token)
    audit_log("revoked oauth tokens",
            "Revoked all OAuth tokens for {}".format(client_id))
    db.session.commit()
    return redirect("/oauth")

@oauth_web.route("/oauth/client/<client_id>/delete")
@loginrequired
def client_delete_GET(client_id):
    client = OAuthClient.query.filter(OAuthClient.client_id == client_id).first()
    if not client or client.user_id != current_user.id:
        abort(404)
    return render_template("client-delete.html", client=client)

@oauth_web.route("/oauth/client/<client_id>/delete", methods=["POST"])
@loginrequired
def client_delete_POST(client_id):
    client = OAuthClient.query.filter(OAuthClient.client_id == client_id).first()
    if not client or client.user_id != current_user.id:
        abort(404)
    audit_log("deleted oauth client", "Deleted OAuth client {}".format(client_id))
    db.session.delete(client)
    db.session.commit()
    return redirect("/oauth")

@oauth_web.route("/oauth/revoke-token/<token_id>")
@loginrequired
def revoke_token_GET(token_id):
    token = OAuthToken.query.filter(OAuthToken.id == token_id).first()
    if not token or token.user_id != current_user.id:
        abort(404)
    if token.client:
        return render_template("are-you-sure.html",
                blurb="revoke all access from <strong>{}</strong> to your account".format(
                    token.client.client_name),
                action="/oauth/revoke-token/{}".format(token_id),
                cancel="/oauth")
    else:
        return render_template("are-you-sure.html",
                blurb="revoke personal access token <strong>{}...</strong>".format(
                    token.token_partial),
                action="/oauth/revoke-token/{}".format(token_id),
                cancel="/oauth")

@oauth_web.route("/oauth/revoke-token/<token_id>", methods=["POST"])
@loginrequired
def revoke_token_POST(token_id):
    token = OAuthToken.query.filter(OAuthToken.id == token_id).first()
    if not token or token.user_id != current_user.id:
        abort(404)
    if token.client:
        audit_log("revoked oauth token",
                "revoked access from {}".format(token.client.client_name))
    else:
        audit_log("revoked personal access token",
                "revoked {}...".format(token.token_partial))
    token.expires = datetime.utcnow()
    revocations = (RevocationUrl.query
            .filter(RevocationUrl.token_id == token.id)).all()
    for revocation in revocations:
        async_request.delay(revocation.url, json.dumps({
            "token_hash": token.token_hash,
        }), {"Content-Type": "application/json"})
        db.session.delete(revocation)
    db.session.commit()
    return redirect("/oauth")

@oauth_web.route("/oauth/personal-token")
@loginrequired
def personal_token_GET():
    return render_template("oauth-personal-token.html")

@oauth_web.route("/oauth/personal-token", methods=["POST"])
@loginrequired
def personal_token_POST():
    valid = Validation(request)

    comment = valid.optional("comment")
    valid.expect(not comment or len(comment) < OAuthToken.comment.type.length,
            f"Comment longer than max of {OAuthToken.comment.type.length} " +
            "characters.", "comment")
    if not valid.ok:
        return render_template("oauth-personal-token.html", **valid.kwargs)

    oauth_token = OAuthToken(current_user, None)
    token = oauth_token.gen_token(comment=comment)
    oauth_token._scopes = "*"
    audit_log("issued oauth token", "issued personal access token {}…".format(
        oauth_token.token_partial))
    db.session.add(oauth_token)
    db.session.commit()
    return render_template("oauth-personal-token.html", token=token)