~sircmpwn/core.sr.ht

core.sr.ht/srht/oauth/decorator.py -rw-r--r-- 5.0 KiB
0e245224Ryan Gonzalez srht.Validation: Don't reject enums with 0 values 3 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from datetime import datetime, timedelta
from flask import current_app, request, g, abort
from functools import wraps
from srht.config import cfg, get_origin
from srht.crypto import encrypt_request_authorization
from srht.crypto import verify_encrypted_authorization
from srht.database import db
from srht.oauth import OAuthError, UserType
from srht.oauth.scope import OAuthScope
from srht.validation import Validation
import hashlib
import requests

metasrht = get_origin("meta.sr.ht")

def _internal_auth(f, auth, *args, **kwargs):
    # Used for authenticating internal API users, like other sr.ht sites.
    oauth_service = current_app.oauth_service
    OAuthClient = oauth_service.OAuthClient
    OAuthToken = oauth_service.OAuthToken
    User = oauth_service.User

    auth = verify_encrypted_authorization(auth)
    node_id = auth["node_id"]
    username = auth.get("name", auth.get("username"))
    assert username

    # Create a synthetic OAuthToken based on the client ID and username
    token = node_id
    token_hash = hashlib.sha512((token + username).encode()).hexdigest()
    oauth_token = OAuthToken.query.filter(
            OAuthToken.token_hash == token_hash).one_or_none()
    if not oauth_token:
        user = User.query.filter(User.username == username).one_or_none()
        if user == None:
            if current_app.site == "meta.sr.ht":
                # This is accepted for /api/user/profile for the purpose of
                # supporting srht.oauth.AbstractOAuthService.fetch_unknown_user
                assert request.path == "/api/user/profile",\
                    "Internal authorization token issued for unknown user"
                abort(404)
            profile = oauth_service.fetch_unknown_user(username)
            user = oauth_service.get_user(profile)
        oauth_token = OAuthToken()
        oauth_token.user_id = user.id
        # Note: the expiration is meaningless
        oauth_token.expires = datetime.utcnow() + timedelta(days=9999)
        oauth_token.token_hash = token_hash
        oauth_token.token_partial = "internal"
        oauth_token._scopes = "*"
        db.session.add(oauth_token)
        db.session.commit()

    g.current_oauth_token = oauth_token
    return f(*args, **kwargs)

def oauth(scopes):
    """
    Validates OAuth authorization for a wrapped function. Scopes should be a
    string-formatted list of required scopes, or None if no particular scopes
    are required.
    """
    def wrap(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            internal = request.headers.get('X-Srht-Authorization')
            if internal:
                return _internal_auth(f, internal, *args, **kwargs)

            token = request.headers.get('Authorization')
            valid = Validation(request)
            if not token or not (token.startswith('token ')
                    or token.startswith('Bearer ')
                    or token.startswith('Internal ')):
                return valid.error("No authorization supplied (expected an "
                    "OAuth token)", status=401)

            token = token.split(' ')
            if len(token) != 2:
                return valid.error("Invalid authorization supplied", status=401)

            if token[0] == "Internal":
                return _internal_auth(f, token[1], *args, **kwargs)

            token = token[1]
            token_hash = hashlib.sha512(token.encode()).hexdigest()

            try:
                if scopes:
                    required = OAuthScope(scopes)
                    required.client_id = current_app.oauth_service.client_id
                else:
                    required = None
            except OAuthError as err:
                return err.response

            try:
                oauth_token = current_app.oauth_service.get_token(
                        token, token_hash, required)
            except OAuthError as err:
                return err.response

            if not oauth_token:
                return valid.error("Invalid or expired OAuth token", status=401)

            g.current_oauth_token = oauth_token

            if oauth_token.scopes == "*" or scopes is None:
                return f(*args, **kwargs)

            applicable = [s for s in oauth_token.scopes if s.fulfills(required)]
            if not any(applicable):
                return valid.error("Your OAuth token is not permitted to use " +
                    "this endpoint (needs {})".format(required), status=403)

            if oauth_token.user.user_type == UserType.suspended:
                return valid.error("The authorized user's account has been " +
                    "suspended with the following notice: \n" +
                    oauth_token.user.suspension_notice + "\n" +
                    "Contact support: " + cfg("sr.ht", "owner-email"),
                    status=403)

            if oauth_token.user.user_type == UserType.unconfirmed:
                return valid.error("The authorized user's account has not " +
                    "been confirmed.", status=403)

            return f(*args, **kwargs)
        return wrapper
    return wrap