~sirn/fanboi2

ref: a68873a108d20a6a49a1b6caf374bfb2cf6a94fe fanboi2/fanboi2/services/user.py -rw-r--r-- 8.6 KiB
a68873a1Kridsada Thanabulpong Coding style cleanups and setup pre-commit hooks (#42) 3 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
import datetime
import secrets

from passlib.context import CryptContext
from sqlalchemy.sql import and_, or_, func, desc
from sqlalchemy.orm import joinedload

from ..auth import SESSION_TOKEN_VALIDITY
from ..models import User, UserSession, Group


ARGON2_MEMORY_COST = 1024
ARGON2_PARALLELISM = 2
ARGON2_ROUNDS = 6


def _create_crypt_context():
    return CryptContext(
        schemes=["argon2"],
        deprecated=["auto"],
        truncate_error=True,
        argon2__memory_cost=ARGON2_MEMORY_COST,
        argon2__parallelism=ARGON2_PARALLELISM,
        argon2__rounds=ARGON2_ROUNDS,
    )


class UserCreateService(object):
    """User create service provides a service for creating user."""

    def __init__(self, dbsession, identity_svc):
        self.dbsession = dbsession
        self.identity_svc = identity_svc
        self.crypt_context = _create_crypt_context()

    def create(self, parent_id, username, password, name, groups=None):
        """Creates a user. :param:`parent_id` must be present for all users
        except the root user, usually the user who created this specific user.

        :param parent_id: An :type:`int` ID of the user who created this user.
        :param username: A username.
        :param password: A password.
        :param name: A default name to use when posted in board.
        :param groups: Group the user belongs to.
        """
        if not groups:
            groups = []

        ident_type = "ident"
        if "admin" in groups:
            ident_type = "ident_admin"

        user = User(
            username=username,
            name=name,
            ident_type=ident_type,
            ident=self.identity_svc.identity_for(username=username),
            encrypted_password=self.crypt_context.hash(password),
            parent_id=parent_id,
        )

        for g in groups:
            group = self.dbsession.query(Group).filter_by(name=g).first()
            if not group:
                group = Group(name=g)
            user.groups.append(group)

        self.dbsession.add(user)
        return user


class UserLoginService(object):
    """User login service provides a service for managing user logins."""

    def __init__(self, dbsession):
        self.dbsession = dbsession
        self.crypt_context = _create_crypt_context()
        self.sessions_map = {}

    def _generate_token(self):
        """Generates a secure random token."""
        return secrets.token_urlsafe(48)

    def authenticate(self, username, password):
        """Returns :type:`True` if the given username and password combination
        could be authenticated or :type:`False` otherwise.

        :param username: A username :type:`str` to authenticate.
        :param password: A password :type:`str` to authenticate.
        """
        user = (
            self.dbsession.query(User)
            .filter(
                and_(User.deactivated == False, User.username == username)  # noqa: E711
            )
            .first()
        )
        if not user:
            return False

        ok, new_hash = self.crypt_context.verify_and_update(
            password, user.encrypted_password
        )
        if not ok:
            return False

        if new_hash is not None:
            user.encrypted_password = new_hash
            self.dbsession.add(user)
        return True

    def _user_session_c(self, token, ip_address):
        """Internal method for querying user session object and cache
        it throughout the request lifecycle.

        :param token: A user login token :type:`str`.
        :param ip_address: IP address of the user.
        """
        if not (token, ip_address) in self.sessions_map:
            user_session = (
                self.dbsession.query(UserSession)
                .options(joinedload(UserSession.user))
                .filter(
                    and_(
                        UserSession.token == token,
                        UserSession.ip_address == ip_address,
                        or_(
                            UserSession.revoked_at == None,  # noqa: E711
                            UserSession.revoked_at >= func.now(),
                        ),
                    )
                )
                .first()
            )
            self.sessions_map[(token, ip_address)] = user_session
        return self.sessions_map[(token, ip_address)]

    def _user_c(self, token, ip_address):
        """Internal method for querying user object from a session
        and cache it throughout the request lifecycle.

        :param token: A user login token :type:`str`.
        :param ip_address: IP address of the user.
        """
        user_session = self._user_session_c(token, ip_address)
        if user_session is None:
            return None
        user = user_session.user
        if user.deactivated:
            return None
        return user

    def user_from_token(self, token, ip_address):
        """Returns a :class:`User` by looking up the given :param:`token`
        or :type:`None` if the token does not exists or has been revoked.

        :param token: A user login token :type:`str`.
        :param ip_address: IP address of the user.
        """
        return self._user_c(token, ip_address)

    def groups_from_token(self, token, ip_address):
        """Return list of group names by looking up the given :param:`token`
        or :type:`None` if the token does not exists or has been revoked.

        :param token: A user login token :type:`str`.
        :param ip_address: IP address of the user.
        """
        user = self._user_c(token, ip_address)
        if user is None:
            return None
        return [g.name for g in user.groups]

    def revoke_token(self, token, ip_address):
        """Revoke the given token. This method should be called when the user
        is logging out.

        :param token: A user login token :type:`str`.
        :param ip_address: IP address of the user.
        """
        user_session = self._user_session_c(token, ip_address)
        if user_session is None:
            return None
        user_session.revoked_at = datetime.datetime.now()
        self.dbsession.add(user_session)
        return user_session

    def mark_seen(self, token, ip_address, revocation=SESSION_TOKEN_VALIDITY):
        """Mark the given token as seen and extend the token validity period
        by the given :param:`revocation` seconds.

        :param token: A user login token :type:`str`.
        :param ip_address: IP address of the user.
        :param revocation: Number of seconds until the token is invalidated.
        """
        user_session = self._user_session_c(token, ip_address)
        if user_session is None:
            return None
        if user_session.user.deactivated:
            return None
        revoke_delta = datetime.timedelta(seconds=revocation)
        user_session.last_seen_at = datetime.datetime.now()
        user_session.revoked_at = datetime.datetime.now() + revoke_delta
        self.dbsession.add(user_session)
        return user_session

    def token_for(self, username, ip_address):
        """Create a new token for the given :param:`username`.

        :param username: A username to create token for.
        :param ip_address: IP address that used to retrieve this token.
        """
        user = (
            self.dbsession.query(User)
            .filter(
                and_(User.deactivated == False, User.username == username)  # noqa: E711
            )
            .one()
        )

        user_session = UserSession(
            user=user, ip_address=ip_address, token=self._generate_token()
        )

        self.dbsession.add(user_session)
        return user_session.token


class UserQueryService(object):
    """User query service provides a service for querying users."""

    def __init__(self, dbsession):
        self.dbsession = dbsession

    def user_from_id(self, id):
        """Returns a user matching ID. Raises :class:`NoResultFound` is
        user could not be found.

        :param id: A user `type`:int: id.
        """
        return self.dbsession.query(User).filter_by(id=id).one()


class UserSessionQueryService(object):
    """User session query service provides a service for querying
    user sessions.
    """

    def __init__(self, dbsession):
        self.dbsession = dbsession

    def list_recent_from_user_id(self, user_id):
        """Query recent sessions for the given user ID.

        :param user_id: A user `type`:int: id.
        """
        return list(
            self.dbsession.query(UserSession)
            .filter_by(user_id=user_id)
            .order_by(desc(UserSession.last_seen_at))
            .limit(5)
            .all()
        )