~homeworkprod/byceps

ref: 87b31ba68669551f699c7d12cf18401663b2d043 byceps/byceps/services/orga/service.py -rw-r--r-- 3.0 KiB
87b31ba6 — Jochen Kupperschmidt Fix type errors 1 year, 1 month ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""
byceps.services.orga.service
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:Copyright: 2006-2021 Jochen Kupperschmidt
:License: Revised BSD (see `LICENSE` file for details)
"""

from __future__ import annotations
from typing import Optional, Sequence

from ...database import db
from ...typing import BrandID, UserID

from ..brand.dbmodels.brand import Brand as DbBrand
from ..brand import service as brand_service
from ..user import event_service as user_event_service
from ..user.dbmodels.user import User as DbUser

from .dbmodels import OrgaFlag as DbOrgaFlag


def get_person_count_by_brand_id() -> dict[BrandID, int]:
    """Return organizer count (including 0) per brand, indexed by brand ID."""
    brand_ids_and_orga_flag_counts = db.session \
        .query(
            DbBrand.id,
            db.func.count(DbOrgaFlag.brand_id)
        ) \
        .outerjoin(DbOrgaFlag) \
        .group_by(DbBrand.id) \
        .all()

    return dict(brand_ids_and_orga_flag_counts)


def get_orgas_for_brand(brand_id: BrandID) -> Sequence[DbUser]:
    """Return all users flagged as organizers for the brand."""
    return DbUser.query \
        .join(DbOrgaFlag).filter(DbOrgaFlag.brand_id == brand_id) \
        .options(db.joinedload('detail')) \
        .all()


def count_orgas_for_brand(brand_id: BrandID) -> int:
    """Return the number of organizers with the organizer flag set for
    that brand.
    """
    return DbUser.query \
        .distinct(DbUser.id) \
        .join(DbOrgaFlag).filter(DbOrgaFlag.brand_id == brand_id) \
        .count()


def add_orga_flag(
    brand_id: BrandID, user_id: UserID, initiator_id: UserID
) -> DbOrgaFlag:
    """Add an orga flag for a user for that brand."""
    orga_flag = DbOrgaFlag(brand_id, user_id)
    db.session.add(orga_flag)

    event = user_event_service.build_event(
        'orgaflag-added',
        user_id,
        {
            'brand_id': str(brand_id),
            'initiator_id': str(initiator_id),
        },
    )
    db.session.add(event)

    db.session.commit()

    return orga_flag


def remove_orga_flag(orga_flag: DbOrgaFlag, initiator_id: UserID) -> None:
    """Remove the orga flag."""
    db.session.delete(orga_flag)

    user_id = orga_flag.user_id
    event = user_event_service.build_event(
        'orgaflag-removed',
        user_id,
        {
            'brand_id': str(orga_flag.brand_id),
            'initiator_id': str(initiator_id),
        },
    )
    db.session.add(event)

    db.session.commit()


def find_orga_flag(brand_id: BrandID, user_id: UserID) -> Optional[DbOrgaFlag]:
    """Return the orga flag for that brand and user, or `None` if not found."""
    return DbOrgaFlag.query \
        .filter_by(brand_id=brand_id) \
        .filter_by(user_id=user_id) \
        .first()


def is_user_orga(user_id: UserID) -> bool:
    """Return `True` if the user is an organizer."""
    return db.session \
        .query(
            db.session \
                .query(DbOrgaFlag) \
                .filter_by(user_id=user_id) \
                .exists()
        ) \
        .scalar()