~homeworkprod/byceps

ref: 4237b3ec9496efe95dcce82bea3207ab9de4d520 byceps/byceps/services/user_badge/awarding_service.py -rw-r--r-- 4.9 KiB
4237b3ec — Jochen Kupperschmidt Move ticketing blueprint into `site` subpackage 1 year, 11 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
"""
byceps.services.user_badge.awarding_service
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:Copyright: 2006-2020 Jochen Kupperschmidt
:License: Modified BSD, see LICENSE for details.
"""

from collections import defaultdict
from datetime import datetime
from typing import Dict, Optional, Set, Tuple

from ...database import db
from ...events.user_badge import UserBadgeAwarded
from ...typing import UserID

from ..user import event_service, service as user_service

from .badge_service import _db_entity_to_badge, find_badge, get_badges
from .models.awarding import BadgeAwarding as DbBadgeAwarding
from .models.badge import Badge as DbBadge
from .transfer.models import (
    Badge,
    BadgeAwarding,
    BadgeID,
    QuantifiedBadgeAwarding,
)


def award_badge_to_user(
    badge_id: BadgeID, user_id: UserID, *, initiator_id: Optional[UserID] = None
) -> Tuple[BadgeAwarding, UserBadgeAwarded]:
    """Award the badge to the user."""
    badge = find_badge(badge_id)
    if badge is None:
        raise ValueError(f'Unknown badge ID "{badge_id}"')

    user = user_service.get_user(user_id)
    awarded_at = datetime.utcnow()

    awarding = DbBadgeAwarding(badge_id, user_id, awarded_at=awarded_at)
    db.session.add(awarding)

    event_data = {'badge_id': str(badge_id)}
    if initiator_id:
        event_data['initiator_id'] = str(initiator_id)
    event = event_service.build_event(
        'user-badge-awarded', user_id, event_data, occurred_at=awarded_at
    )
    db.session.add(event)

    db.session.commit()

    awarding_dto = _db_entity_to_badge_awarding(awarding)

    event = UserBadgeAwarded(
        occurred_at=awarded_at,
        user_id=user_id,
        user_screen_name=user.screen_name,
        badge_id=badge_id,
        badge_label=badge.label,
        initiator_id=initiator_id,
    )

    return awarding_dto, event


def count_awardings() -> Dict[BadgeID, int]:
    """Return the number of times each badge has been awarded.

    Because a badge can be awarded multiple times to a user, the number
    of awardings does not represent the number of awardees.
    """
    rows = db.session \
        .query(
            DbBadge.id,
            db.func.count(DbBadgeAwarding.id)
        ) \
        .outerjoin(DbBadgeAwarding) \
        .group_by(DbBadge.id) \
        .all()

    return {badge_id: count for badge_id, count in rows}


def get_awardings_of_badge(badge_id: BadgeID) -> Set[QuantifiedBadgeAwarding]:
    """Return the awardings of this badge."""
    rows = db.session \
        .query(
            DbBadgeAwarding.badge_id,
            DbBadgeAwarding.user_id,
            db.func.count(DbBadgeAwarding.badge_id)
        ) \
        .filter(DbBadgeAwarding.badge_id == badge_id) \
        .group_by(
            DbBadgeAwarding.badge_id,
            DbBadgeAwarding.user_id
        ) \
        .all()

    return {
        QuantifiedBadgeAwarding(badge_id, user_id, quantity)
        for badge_id, user_id, quantity in rows
    }


def get_badges_awarded_to_user(user_id: UserID) -> Dict[Badge, int]:
    """Return all badges that have been awarded to the user (and how often)."""
    rows = db.session \
        .query(
            DbBadgeAwarding.badge_id,
            db.func.count(DbBadgeAwarding.badge_id)
        ) \
        .filter(DbBadgeAwarding.user_id == user_id) \
        .group_by(
            DbBadgeAwarding.badge_id,
        ) \
        .all()

    badge_ids_with_awarding_quantity = {row[0]: row[1] for row in rows}

    badge_ids = set(badge_ids_with_awarding_quantity.keys())

    if badge_ids:
        badges = DbBadge.query \
            .filter(DbBadge.id.in_(badge_ids)) \
            .all()
    else:
        badges = []

    badges_with_awarding_quantity = {}
    for badge in badges:
        quantity = badge_ids_with_awarding_quantity[badge.id]
        badges_with_awarding_quantity[_db_entity_to_badge(badge)] = quantity

    return badges_with_awarding_quantity


def get_badges_awarded_to_users(
    user_ids: Set[UserID], *, featured_only: bool = False
) -> Dict[UserID, Set[Badge]]:
    """Return all badges that have been awarded to the users, indexed
    by user ID.

    If `featured_only` is `True`, only return featured badges.
    """
    if not user_ids:
        return {}

    awardings = DbBadgeAwarding.query \
        .filter(DbBadgeAwarding.user_id.in_(user_ids)) \
        .all()

    badge_ids = {awarding.badge_id for awarding in awardings}
    badges = get_badges(badge_ids, featured_only=featured_only)
    badges_by_id = {badge.id: badge for badge in badges}

    badges_by_user_id: Dict[UserID, Set[Badge]] = defaultdict(set)
    for awarding in awardings:
        badge = badges_by_id.get(awarding.badge_id)
        if badge:
            badges_by_user_id[awarding.user_id].add(badge)

    return dict(badges_by_user_id)


def _db_entity_to_badge_awarding(entity: DbBadgeAwarding) -> BadgeAwarding:
    return BadgeAwarding(
        entity.id,
        entity.badge_id,
        entity.user_id,
        entity.awarded_at
    )