~homeworkprod/byceps

ref: 4237b3ec9496efe95dcce82bea3207ab9de4d520 byceps/byceps/blueprints/authentication/session.py -rw-r--r-- 2.5 KiB
4237b3ec — Jochen Kupperschmidt Move ticketing blueprint into `site` subpackage 1 year, 10 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""
byceps.blueprints.authentication.session
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:Copyright: 2006-2020 Jochen Kupperschmidt
:License: Modified BSD, see LICENSE for details.
"""

from typing import Optional

from flask import session

from ...services.authentication.exceptions import AuthenticationFailed
from ...services.authentication.session import service as session_service
from ...services.user import service as user_service
from ...services.user.transfer.models import User
from ...typing import PartyID, UserID


KEY_USER_ID = 'user_id'
KEY_USER_AUTH_TOKEN = 'user_auth_token'


def start(user_id: UserID, auth_token: str, *, permanent: bool = False) -> None:
    """Initialize the user's session by putting the relevant data
    into the session cookie.
    """
    session[KEY_USER_ID] = str(user_id)
    session[KEY_USER_AUTH_TOKEN] = str(auth_token)
    session.permanent = permanent


def end() -> None:
    """End the user's session by deleting the session cookie."""
    session.pop(KEY_USER_ID, None)
    session.pop(KEY_USER_AUTH_TOKEN, None)
    session.permanent = False


def get_user(*, party_id: Optional[PartyID] = None) -> Optional[User]:
    """Return the current user if authenticated, `None` if not."""
    user_id = _get_user_id()
    auth_token = _get_auth_token()

    return _load_user(user_id, auth_token, party_id=party_id)


def _get_user_id() -> Optional[str]:
    """Return the current user's ID, or `None` if not available."""
    return session.get(KEY_USER_ID)


def _get_auth_token() -> Optional[str]:
    """Return the current user's auth token, or `None` if not available."""
    return session.get(KEY_USER_AUTH_TOKEN)


def _load_user(
    user_id: Optional[str],
    auth_token: Optional[str],
    *,
    party_id: Optional[PartyID] = None,
) -> Optional[User]:
    """Load the user with that ID.

    Return `None` if:
    - the ID is unknown.
    - the account is not enabled.
    - the auth token is invalid.
    """
    if user_id is None:
        return None

    user = user_service.find_active_user(
        user_id, include_avatar=True, include_orga_flag_for_party_id=party_id
    )

    if user is None:
        return None

    # Validate auth token.
    if (auth_token is None) or not _is_auth_token_valid(user.id, auth_token):
        # Bad auth token, not logging in.
        return None

    return user


def _is_auth_token_valid(user_id: UserID, auth_token: str) -> bool:
    try:
        session_service.authenticate_session(user_id, auth_token)
    except AuthenticationFailed:
        return False
    else:
        return True