~homeworkprod/byceps

ref: 8d6c8fff423db3603a5b644db24c413ffbf05c89 byceps/byceps/services/user/creation_service.py -rw-r--r-- 5.6 KiB
8d6c8fff — Jochen Kupperschmidt Ignore invalid user ID in session 2 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
"""
byceps.services.user.creation_service
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:Copyright: 2006-2021 Jochen Kupperschmidt
:License: Revised BSD (see `LICENSE` file for details)
"""

from __future__ import annotations
from datetime import datetime
from typing import Optional

from flask import current_app

from ...database import db
from ...events.user import UserAccountCreated
from ...typing import UserID

from ..authentication.password import service as password_service
from ..consent import consent_service
from ..consent.transfer.models import Consent
from ..site.transfer.models import SiteID

from . import email_address_verification_service
from . import event_service
from .dbmodels.detail import UserDetail as DbUserDetail
from .dbmodels.user import User as DbUser
from . import service as user_service
from .transfer.models import User


class UserCreationFailed(Exception):
    pass


def create_user(
    screen_name: str,
    email_address: str,
    password: str,
    first_names: Optional[str],
    last_name: Optional[str],
    site_id: SiteID,
    *,
    consents: Optional[set[Consent]] = None,
) -> tuple[User, UserAccountCreated]:
    """Create a user account and related records."""
    # user with details, password, and roles
    user, event = create_basic_user(
        screen_name,
        email_address,
        password,
        first_names=first_names,
        last_name=last_name,
        site_id=site_id,
    )

    # consents
    if consents:
        for consent in consents:
            # Insert missing user ID.
            consent = consent_service.build_consent(
                user.id,
                consent.subject_id,
                consent.expressed_at,
            )
            db.session.add(consent)

    db.session.commit()

    request_email_address_confirmation(user, email_address, site_id)

    return user, event


def create_basic_user(
    screen_name: str,
    email_address: Optional[str],
    password: str,
    *,
    first_names: Optional[str] = None,
    last_name: Optional[str] = None,
    creator_id: Optional[UserID] = None,
    site_id: Optional[SiteID] = None,
) -> tuple[User, UserAccountCreated]:
    # user with details
    user, event = _create_user(
        screen_name,
        email_address,
        first_names=first_names,
        last_name=last_name,
        creator_id=creator_id,
        site_id=site_id,
    )

    # password
    password_service.create_password_hash(user.id, password)

    return user, event


def _create_user(
    screen_name: Optional[str],
    email_address: Optional[str],
    *,
    first_names: Optional[str] = None,
    last_name: Optional[str] = None,
    creator_id: Optional[UserID] = None,
    site_id: Optional[SiteID] = None,
) -> tuple[User, UserAccountCreated]:
    if creator_id is not None:
        creator = user_service.get_user(creator_id)
    else:
        creator = None

    created_at = datetime.utcnow()

    user = build_user(created_at, screen_name, email_address)

    user.detail.first_names = first_names
    user.detail.last_name = last_name

    db.session.add(user)

    try:
        db.session.commit()
    except Exception as e:
        current_app.logger.error('User creation failed: %s', e)
        db.session.rollback()
        raise UserCreationFailed()

    # Create event in separate step as user ID is not available earlier.
    event_data = {}
    if creator is not None:
        event_data['initiator_id'] = str(creator.id)
    if site_id is not None:
        event_data['site_id'] = site_id
    event_service.create_event(
        'user-created', user.id, event_data, occurred_at=created_at
    )

    user_dto = user_service._db_entity_to_user(user)

    event = UserAccountCreated(
        occurred_at=user.created_at,
        initiator_id=creator.id if creator else None,
        initiator_screen_name=creator.screen_name if creator else None,
        user_id=user.id,
        user_screen_name=user.screen_name,
        site_id=site_id,
    )

    return user_dto, event


def build_user(
    created_at: datetime,
    screen_name: Optional[str],
    email_address: Optional[str],
) -> DbUser:
    if screen_name is not None:
        normalized_screen_name = _normalize_screen_name(screen_name)
    else:
        normalized_screen_name = None

    if email_address is not None:
        normalized_email_address = _normalize_email_address(email_address)
    else:
        normalized_email_address = None

    user = DbUser(created_at, normalized_screen_name, normalized_email_address)

    detail = DbUserDetail(user=user)

    return user


def request_email_address_confirmation(
    user: User, email_address: str, site_id: SiteID
) -> None:
    """Send an e-mail to the user to request confirmation of the e-mail
    address.
    """
    normalized_email_address = _normalize_email_address(email_address)

    email_address_verification_service.send_email_address_confirmation_email(
        normalized_email_address, user.screen_name, user.id, site_id
    )


def _normalize_screen_name(screen_name: str) -> str:
    """Normalize the screen name, or raise an exception if invalid."""
    normalized = screen_name.strip()

    if not normalized or (' ' in normalized) or ('@' in normalized):
        raise ValueError(f"Invalid screen name: '{screen_name}'")

    return normalized


def _normalize_email_address(email_address: str) -> str:
    """Normalize the e-mail address, or raise an exception if invalid."""
    normalized = email_address.strip().lower()

    if not normalized or (' ' in normalized) or ('@' not in normalized):
        raise ValueError(f"Invalid email address: '{email_address}'")

    return normalized