~homeworkprod/byceps

ref: 4237b3ec9496efe95dcce82bea3207ab9de4d520 byceps/byceps/services/user_avatar/service.py -rw-r--r-- 3.9 KiB
4237b3ec — Jochen Kupperschmidt Move ticketing blueprint into `site` subpackage 1 year, 11 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""
byceps.services.user_avatar.service
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:Copyright: 2006-2020 Jochen Kupperschmidt
:License: Modified BSD, see LICENSE for details.
"""

from typing import BinaryIO, Dict, List, Optional, Set

from ...database import db
from ...typing import UserID
from ...util.image import create_thumbnail
from ...util.image.models import Dimensions, ImageType
from ...util import upload

from ..image import service as image_service
from ..image.service import ImageTypeProhibited  # Provide to view functions.
from ..user.models.user import User as DbUser
from ..user import service as user_service

from .models import Avatar as DbAvatar, AvatarSelection as DbAvatarSelection
from .transfer.models import AvatarID, AvatarUpdate


MAXIMUM_DIMENSIONS = Dimensions(512, 512)


def update_avatar_image(
    user_id: UserID,
    stream: BinaryIO,
    allowed_types: Set[ImageType],
    *,
    maximum_dimensions: Dimensions = MAXIMUM_DIMENSIONS,
) -> AvatarID:
    """Set a new avatar image for the user.

    Raise `ImageTypeProhibited` if the stream data is not of one the
    allowed types.
    """
    user = user_service.find_active_db_user(user_id)
    if user is None:
        raise user_service.UserIdRejected(user_id)

    image_type = image_service.determine_image_type(stream, allowed_types)
    image_dimensions = image_service.determine_dimensions(stream)

    image_too_large = image_dimensions > maximum_dimensions
    if image_too_large or not image_dimensions.is_square:
        stream = create_thumbnail(
            stream, image_type.name, maximum_dimensions, force_square=True
        )

    avatar = DbAvatar(user.id, image_type)
    db.session.add(avatar)
    db.session.commit()

    # Might raise `FileExistsError`.
    upload.store(stream, avatar.path, create_parent_path_if_nonexistant=True)

    user.avatar = avatar
    db.session.commit()

    return avatar.id


def remove_avatar_image(user_id: UserID) -> None:
    """Remove the user's avatar image.

    The avatar will be unlinked from the user, but the database record
    as well as the image file itself won't be removed, though.
    """
    selection = DbAvatarSelection.query.get(user_id)

    if selection is None:
        raise ValueError(f'No avatar set for user ID {user_id}.')

    db.session.delete(selection)
    db.session.commit()


def get_avatars_uploaded_by_user(user_id: UserID) -> List[AvatarUpdate]:
    """Return the avatars uploaded by the user."""
    avatars = DbAvatar.query \
        .filter_by(creator_id=user_id) \
        .all()

    return [AvatarUpdate(avatar.created_at, avatar.url) for avatar in avatars]


def get_avatar_url_for_user(user_id: UserID) -> Optional[str]:
    """Return the URL of the user's current avatar, or `None` if not set."""
    avatar_urls_by_user_id = get_avatar_urls_for_users({user_id})
    return avatar_urls_by_user_id.get(user_id)


def get_avatar_urls_for_users(user_ids: Set[UserID]) -> Dict[UserID, str]:
    """Return the URLs of those users' current avatars."""
    if not user_ids:
        return {}

    user_ids_and_avatars = db.session.query(
            DbAvatarSelection.user_id,
            DbAvatar,
        ) \
        .join(DbAvatar) \
        .filter(DbAvatarSelection.user_id.in_(user_ids)) \
        .all()

    urls_by_user_id = {
        user_id: avatar.url for user_id, avatar in user_ids_and_avatars
    }

    # Include all user IDs in result.
    return {user_id: urls_by_user_id.get(user_id) for user_id in user_ids}


def get_avatar_url_for_md5_email_address_hash(md5_hash: str) -> Optional[str]:
    """Return the URL of the current avatar of the user with that hashed
    email address, or `None` if not set.
    """
    avatar = DbAvatar.query \
        .join(DbAvatarSelection) \
        .join(DbUser) \
        .filter(db.func.md5(DbUser.email_address) == md5_hash) \
        .one_or_none()

    if avatar is None:
        return None

    return avatar.url