~homeworkprod/byceps

0feb9b72402be7bc3cc965dcb5ccf4aaef96efcf — Jochen Kupperschmidt 1 year, 3 months ago 3ce659e
Extract awarding-relating user badge service
M byceps/blueprints/admin/user/views.py => byceps/blueprints/admin/user/views.py +4 -2
@@ 19,7 19,7 @@ from ....services.user import command_service as user_command_service
from ....services.user import creation_service as user_creation_service
from ....services.user import service as user_service
from ....services.user import stats_service as user_stats_service
from ....services.user_badge import service as badge_service
from ....services.user_badge import awarding_service as badge_awarding_service
from ....util.framework.blueprint import create_blueprint
from ....util.framework.flash import flash_error, flash_success
from ....util.framework.templating import templated


@@ 113,7 113,9 @@ def view(user_id):

    attended_parties = service.get_attended_parties(user.id)

    badges_with_awarding_quantity = badge_service.get_badges_for_user(user.id)
    badges_with_awarding_quantity = badge_awarding_service.get_badges_awarded_to_user(
        user.id
    )

    return {
        'user': user,

M byceps/blueprints/admin/user_badge/views.py => byceps/blueprints/admin/user_badge/views.py +4 -3
@@ 11,6 11,7 @@ from flask import abort, g, request
from ....services.brand import service as brand_service
from ....services.user import service as user_service
from ....services.user_badge import (
    awarding_service as badge_awarding_service,
    command_service as badge_command_service,
    service as badge_service,
)


@@ 48,7 49,7 @@ def index():

        return brands_by_id[brand_id].title

    awarding_counts_by_badge_id = badge_service.count_awardings()
    awarding_counts_by_badge_id = badge_awarding_service.count_awardings()

    badges = [
        {


@@ 77,7 78,7 @@ def view(badge_id):
    if badge is None:
        abort(404)

    awardings = badge_service.get_awardings_of_badge(badge.id)
    awardings = badge_awarding_service.get_awardings_of_badge(badge.id)
    recipient_ids = [awarding.user_id for awarding in awardings]
    recipients = user_service.find_users(recipient_ids, include_avatars=True)



@@ 181,7 182,7 @@ def award(user_id):

    initiator_id = g.current_user.id

    _, event = badge_command_service.award_badge_to_user(
    _, event = badge_awarding_service.award_badge_to_user(
        badge_id, user_id, initiator_id=initiator_id
    )


M byceps/blueprints/api/v1/user_badge/views.py => byceps/blueprints/api/v1/user_badge/views.py +2 -2
@@ 11,7 11,7 @@ from marshmallow import ValidationError

from .....services.user import service as user_service
from .....services.user_badge import (
    command_service as badge_command_service,
    awarding_service as badge_awarding_service,
    service as badge_service,
)
from .....util.framework.blueprint import create_blueprint


@@ 53,7 53,7 @@ def award_badge_to_user():
    if not initiator:
        abort(400, 'Initiator ID unknown')

    _, event = badge_command_service.award_badge_to_user(
    _, event = badge_awarding_service.award_badge_to_user(
        badge.id, user.id, initiator_id=initiator.id
    )


M byceps/blueprints/board/service.py => byceps/blueprints/board/service.py +2 -2
@@ 23,7 23,7 @@ from ...services.party import service as party_service
from ...services.site import settings_service as site_settings_service
from ...services.ticketing import ticket_service
from ...services.user import service as user_service
from ...services.user_badge import service as badge_service
from ...services.user_badge import awarding_service as badge_awarding_service
from ...services.user_badge.transfer.models import Badge
from ...typing import BrandID, PartyID, UserID



@@ 118,7 118,7 @@ def _get_badges(
    user_ids: Set[UserID], brand_id: BrandID
) -> Dict[UserID, Set[Badge]]:
    """Fetch users' badges that are either global or belong to the brand."""
    badges_by_user_id = badge_service.get_badges_for_users(
    badges_by_user_id = badge_awarding_service.get_badges_awarded_to_users(
        user_ids, featured_only=True
    )


M byceps/blueprints/user/profile/views.py => byceps/blueprints/user/profile/views.py +4 -2
@@ 13,7 13,7 @@ from flask import abort, g
from ....services.orga_team import service as orga_team_service
from ....services.ticketing import attendance_service, ticket_service
from ....services.user import service as user_service
from ....services.user_badge import service as badge_service
from ....services.user_badge import awarding_service as badge_awarding_service
from ....util.framework.blueprint import create_blueprint
from ....util.framework.templating import templated



@@ 29,7 29,9 @@ def view(user_id):
    if user is None:
        abort(404)

    badges_with_awarding_quantity = badge_service.get_badges_for_user(user.id)
    badges_with_awarding_quantity = badge_awarding_service.get_badges_awarded_to_user(
        user.id
    )

    orga_team_membership = orga_team_service.find_membership_for_party(
        user.id, g.party_id

M byceps/blueprints/user_badge/views.py => byceps/blueprints/user_badge/views.py +5 -2
@@ 9,7 9,10 @@ byceps.blueprints.user_badge.views
from flask import abort, g

from ...services.user import service as user_service
from ...services.user_badge import service as badge_service
from ...services.user_badge import (
    awarding_service as badge_awarding_service,
    service as badge_service,
)
from ...util.framework.blueprint import create_blueprint
from ...util.framework.templating import templated



@@ 37,7 40,7 @@ def view(slug):
    if badge is None:
        abort(404)

    awardings = badge_service.get_awardings_of_badge(badge.id)
    awardings = badge_awarding_service.get_awardings_of_badge(badge.id)
    recipient_ids = [awarding.user_id for awarding in awardings]
    recipients = user_service.find_users(
        recipient_ids,

M byceps/services/shop/order/actions/award_badge.py => byceps/services/shop/order/actions/award_badge.py +2 -2
@@ 9,7 9,7 @@ byceps.services.shop.order.actions.award_badge
from .....typing import UserID

from ....user_badge import (
    command_service as badge_command_service,
    awarding_service as badge_awarding_service,
    service as badge_service,
)
from ....user_badge.transfer.models import BadgeAwarding, BadgeID


@@ 35,7 35,7 @@ def award_badge(
    _verify_badge_id(badge_id)

    for _ in range(quantity):
        awarding, _ = badge_command_service.award_badge_to_user(
        awarding, _ = badge_awarding_service.award_badge_to_user(
            badge_id, user_id
        )


A byceps/services/user_badge/awarding_service.py => byceps/services/user_badge/awarding_service.py +166 -0
@@ 0,0 1,166 @@
"""
byceps.services.user_badge.awarding_service
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:Copyright: 2006-2020 Jochen Kupperschmidt
:License: Modified BSD, see LICENSE for details.
"""

from collections import defaultdict
from datetime import datetime
from typing import Dict, Optional, Set, Tuple

from ...database import db
from ...events.user_badge import UserBadgeAwarded
from ...typing import UserID

from ..user import event_service

from .models.awarding import BadgeAwarding as DbBadgeAwarding
from .models.badge import Badge as DbBadge
from .service import _db_entity_to_badge
from .transfer.models import (
    Badge,
    BadgeAwarding,
    BadgeID,
    QuantifiedBadgeAwarding,
)


def award_badge_to_user(
    badge_id: BadgeID, user_id: UserID, *, initiator_id: Optional[UserID] = None
) -> Tuple[BadgeAwarding, UserBadgeAwarded]:
    """Award the badge to the user."""
    awarded_at = datetime.utcnow()

    awarding = DbBadgeAwarding(badge_id, user_id, awarded_at=awarded_at)
    db.session.add(awarding)

    event_data = {'badge_id': str(badge_id)}
    if initiator_id:
        event_data['initiator_id'] = str(initiator_id)
    event = event_service.build_event(
        'user-badge-awarded', user_id, event_data, occurred_at=awarded_at
    )
    db.session.add(event)

    db.session.commit()

    awarding_dto = _db_entity_to_badge_awarding(awarding)

    event = UserBadgeAwarded(
        occurred_at=awarded_at,
        user_id=user_id,
        badge_id=badge_id,
        initiator_id=initiator_id,
    )

    return awarding_dto, event


def count_awardings() -> Dict[BadgeID, int]:
    """Return the number of times each badge has been awarded.

    Because a badge can be awarded multiple times to a user, the number
    of awardings does not represent the number of awardees.
    """
    rows = db.session \
        .query(
            DbBadge.id,
            db.func.count(DbBadgeAwarding.id)
        ) \
        .outerjoin(DbBadgeAwarding) \
        .group_by(DbBadge.id) \
        .all()

    return {badge_id: count for badge_id, count in rows}


def get_awardings_of_badge(badge_id: BadgeID) -> Set[QuantifiedBadgeAwarding]:
    """Return the awardings of this badge."""
    rows = db.session \
        .query(
            DbBadgeAwarding.badge_id,
            DbBadgeAwarding.user_id,
            db.func.count(DbBadgeAwarding.badge_id)
        ) \
        .filter(DbBadgeAwarding.badge_id == badge_id) \
        .group_by(
            DbBadgeAwarding.badge_id,
            DbBadgeAwarding.user_id
        ) \
        .all()

    return {
        QuantifiedBadgeAwarding(badge_id, user_id, quantity)
        for badge_id, user_id, quantity in rows
    }


def get_badges_awarded_to_user(user_id: UserID) -> Dict[Badge, int]:
    """Return all badges that have been awarded to the user (and how often)."""
    rows = db.session \
        .query(
            DbBadgeAwarding.badge_id,
            db.func.count(DbBadgeAwarding.badge_id)
        ) \
        .filter(DbBadgeAwarding.user_id == user_id) \
        .group_by(
            DbBadgeAwarding.badge_id,
        ) \
        .all()

    badge_ids_with_awarding_quantity = {row[0]: row[1] for row in rows}

    badge_ids = set(badge_ids_with_awarding_quantity.keys())

    if badge_ids:
        badges = DbBadge.query \
            .filter(DbBadge.id.in_(badge_ids)) \
            .all()
    else:
        badges = []

    badges_with_awarding_quantity = {}
    for badge in badges:
        quantity = badge_ids_with_awarding_quantity[badge.id]
        badges_with_awarding_quantity[_db_entity_to_badge(badge)] = quantity

    return badges_with_awarding_quantity


def get_badges_awarded_to_users(
    user_ids: Set[UserID], *, featured_only: bool = False
) -> Dict[UserID, Set[Badge]]:
    """Return all badges that have been awarded to the users, indexed
    by user ID.

    If `featured_only` is `True`, only return featured badges.
    """
    if not user_ids:
        return {}

    awardings = DbBadgeAwarding.query \
        .filter(DbBadgeAwarding.user_id.in_(user_ids)) \
        .all()

    badge_ids = {awarding.badge_id for awarding in awardings}
    badges = get_badges(badge_ids, featured_only=featured_only)
    badges_by_id = {badge.id: badge for badge in badges}

    badges_by_user_id: Dict[UserID, Set[Badge]] = defaultdict(set)
    for awarding in awardings:
        badge = badges_by_id.get(awarding.badge_id)
        if badge:
            badges_by_user_id[awarding.user_id].add(badge)

    return dict(badges_by_user_id)


def _db_entity_to_badge_awarding(entity: DbBadgeAwarding) -> BadgeAwarding:
    return BadgeAwarding(
        entity.id,
        entity.badge_id,
        entity.user_id,
        entity.awarded_at
    )

M byceps/services/user_badge/command_service.py => byceps/services/user_badge/command_service.py +3 -47
@@ 6,18 6,14 @@ byceps.services.user_badge.command_service
:License: Modified BSD, see LICENSE for details.
"""

from datetime import datetime
from typing import Optional, Tuple
from typing import Optional

from ...database import db
from ...events.user_badge import UserBadgeAwarded
from ...typing import BrandID, UserID
from ...typing import BrandID

from ..user import event_service
from .models.awarding import BadgeAwarding as DbBadgeAwarding
from .models.badge import Badge as DbBadge
from .service import _db_entity_to_badge
from .transfer.models import Badge, BadgeAwarding, BadgeID
from .transfer.models import Badge, BadgeID


def create_badge(


@@ 78,43 74,3 @@ def delete_badge(badge_id: BadgeID) -> None:
        .delete()

    db.session.commit()


def award_badge_to_user(
    badge_id: BadgeID, user_id: UserID, *, initiator_id: Optional[UserID] = None
) -> Tuple[BadgeAwarding, UserBadgeAwarded]:
    """Award the badge to the user."""
    awarded_at = datetime.utcnow()

    awarding = DbBadgeAwarding(badge_id, user_id, awarded_at=awarded_at)
    db.session.add(awarding)

    event_data = {'badge_id': str(badge_id)}
    if initiator_id:
        event_data['initiator_id'] = str(initiator_id)
    event = event_service.build_event(
        'user-badge-awarded', user_id, event_data, occurred_at=awarded_at
    )
    db.session.add(event)

    db.session.commit()

    awarding_dto = _db_entity_to_badge_awarding(awarding)

    event = UserBadgeAwarded(
        occurred_at=awarded_at,
        user_id=user_id,
        badge_id=badge_id,
        initiator_id=initiator_id,
    )

    return awarding_dto, event


def _db_entity_to_badge_awarding(entity: DbBadgeAwarding) -> BadgeAwarding:
    return BadgeAwarding(
        entity.id,
        entity.badge_id,
        entity.user_id,
        entity.awarded_at
    )

M byceps/services/user_badge/service.py => byceps/services/user_badge/service.py +2 -106
@@ 6,15 6,10 @@ byceps.services.user_badge.service
:License: Modified BSD, see LICENSE for details.
"""

from collections import defaultdict
from typing import Dict, Optional, Set
from typing import Optional, Set

from ...database import db
from ...typing import UserID

from .models.awarding import BadgeAwarding as DbBadgeAwarding
from .models.badge import Badge as DbBadge
from .transfer.models import Badge, BadgeID, QuantifiedBadgeAwarding
from .transfer.models import Badge, BadgeID


def find_badge(badge_id: BadgeID) -> Optional[Badge]:


@@ 60,66 55,6 @@ def get_badges(
    return {_db_entity_to_badge(badge) for badge in badges}


def get_badges_for_user(user_id: UserID) -> Dict[Badge, int]:
    """Return all badges that have been awarded to the user (and how often)."""
    rows = db.session \
        .query(
            DbBadgeAwarding.badge_id,
            db.func.count(DbBadgeAwarding.badge_id)
        ) \
        .filter(DbBadgeAwarding.user_id == user_id) \
        .group_by(
            DbBadgeAwarding.badge_id,
        ) \
        .all()

    badge_ids_with_awarding_quantity = {row[0]: row[1] for row in rows}

    badge_ids = set(badge_ids_with_awarding_quantity.keys())

    if badge_ids:
        badges = DbBadge.query \
            .filter(DbBadge.id.in_(badge_ids)) \
            .all()
    else:
        badges = []

    badges_with_awarding_quantity = {}
    for badge in badges:
        quantity = badge_ids_with_awarding_quantity[badge.id]
        badges_with_awarding_quantity[_db_entity_to_badge(badge)] = quantity

    return badges_with_awarding_quantity


def get_badges_for_users(
    user_ids: Set[UserID], *, featured_only: bool = False
) -> Dict[UserID, Set[Badge]]:
    """Return all badges that have been awarded to the users, indexed
    by user ID.

    If `featured_only` is `True`, only return featured badges.
    """
    if not user_ids:
        return {}

    awardings = DbBadgeAwarding.query \
        .filter(DbBadgeAwarding.user_id.in_(user_ids)) \
        .all()

    badge_ids = {awarding.badge_id for awarding in awardings}
    badges = get_badges(badge_ids, featured_only=featured_only)
    badges_by_id = {badge.id: badge for badge in badges}

    badges_by_user_id: Dict[UserID, Set[Badge]] = defaultdict(set)
    for awarding in awardings:
        badge = badges_by_id.get(awarding.badge_id)
        if badge:
            badges_by_user_id[awarding.user_id].add(badge)

    return dict(badges_by_user_id)


def get_all_badges() -> Set[Badge]:
    """Return all badges."""
    badges = DbBadge.query.all()


@@ 127,45 62,6 @@ def get_all_badges() -> Set[Badge]:
    return {_db_entity_to_badge(badge) for badge in badges}


def count_awardings() -> Dict[BadgeID, int]:
    """Return the number of times each badge has been awarded.

    Because a badge can be awarded multiple times to a user, the number
    of awardings does not represent the number of awardees.
    """
    rows = db.session \
        .query(
            DbBadge.id,
            db.func.count(DbBadgeAwarding.id)
        ) \
        .outerjoin(DbBadgeAwarding) \
        .group_by(DbBadge.id) \
        .all()

    return {badge_id: count for badge_id, count in rows}


def get_awardings_of_badge(badge_id: BadgeID) -> Set[QuantifiedBadgeAwarding]:
    """Return the awardings of this badge."""
    rows = db.session \
        .query(
            DbBadgeAwarding.badge_id,
            DbBadgeAwarding.user_id,
            db.func.count(DbBadgeAwarding.badge_id)
        ) \
        .filter(DbBadgeAwarding.badge_id == badge_id) \
        .group_by(
            DbBadgeAwarding.badge_id,
            DbBadgeAwarding.user_id
        ) \
        .all()

    return {
        QuantifiedBadgeAwarding(badge_id, user_id, quantity)
        for badge_id, user_id, quantity in rows
    }


def _db_entity_to_badge(entity: DbBadge) -> Badge:
    image_url_path = f'/data/global/users/badges/{entity.image_filename}'


M tests/integration/announce/irc/test_user_badge.py => tests/integration/announce/irc/test_user_badge.py +7 -3
@@ 5,7 5,11 @@

from byceps.announce.irc import user_badge  # Load signal handlers.
from byceps.blueprints.user_badge import signals
from byceps.services.user_badge import command_service as badge_command_service
from byceps.services.user_badge import (
    awarding_service,
    command_service as badge_command_service,
)


from .helpers import assert_submitted_data, CHANNEL_ORGA_LOG, mocked_irc_bot



@@ 24,7 28,7 @@ def test_user_badge_awarding_announced_without_initiator(app, make_user):

    user = make_user('Erster')

    _, event = badge_command_service.award_badge_to_user(badge.id, user.id)
    _, event = awarding_service.award_badge_to_user(badge.id, user.id)

    with mocked_irc_bot() as mock:
        signals.user_badge_awarded.send(None, event=event)


@@ 45,7 49,7 @@ def test_user_badge_awarding_announced_with_initiator(

    user = make_user('PathFinder')

    _, event = badge_command_service.award_badge_to_user(
    _, event = awarding_service.award_badge_to_user(
        badge.id, user.id, initiator_id=admin_user.id
    )


M tests/integration/api/v1/user_badge/test_award_badge.py => tests/integration/api/v1/user_badge/test_award_badge.py +3 -3
@@ 4,8 4,8 @@
"""

from byceps.services.user_badge import (
    awarding_service,
    command_service as badge_command_service,
    service as badge_service,
)
from byceps.services.user_badge.transfer.models import QuantifiedBadgeAwarding



@@ 15,7 15,7 @@ def test_award_badge(api_client, api_client_authz_header, user, admin_user):
        'supporter', 'Supporter', 'supporter.svg'
    )

    before = badge_service.get_awardings_of_badge(badge.id)
    before = awarding_service.get_awardings_of_badge(badge.id)
    assert before == set()

    url = f'/api/v1/user_badges/awardings'


@@ 29,5 29,5 @@ def test_award_badge(api_client, api_client_authz_header, user, admin_user):
    response = api_client.post(url, headers=headers, json=json_data)
    assert response.status_code == 204

    actual = badge_service.get_awardings_of_badge(badge.id)
    actual = awarding_service.get_awardings_of_badge(badge.id)
    assert actual == {QuantifiedBadgeAwarding(badge.id, user.id, 1)}

M tests/integration/services/user_badge/test_service_awarding.py => tests/integration/services/user_badge/test_service_awarding.py +16 -15
@@ 9,6 9,7 @@ from byceps.database import db
from byceps.events.user_badge import UserBadgeAwarded
from byceps.services.user import event_service
from byceps.services.user_badge import (
    awarding_service,
    command_service as badge_command_service,
    service as badge_service,
)


@@ 71,7 72,7 @@ def test_award_badge_without_initiator(
    user_events_before = event_service.get_events_for_user(user.id)
    assert len(user_events_before) == 0

    _, event = badge_command_service.award_badge_to_user(badge.id, user.id)
    _, event = awarding_service.award_badge_to_user(badge.id, user.id)

    assert event.__class__ is UserBadgeAwarded
    assert event.user_id == user.id


@@ 96,7 97,7 @@ def test_award_badge_with_initiator(
    user_events_before = event_service.get_events_for_user(user.id)
    assert len(user_events_before) == 0

    _, event = badge_command_service.award_badge_to_user(
    _, event = awarding_service.award_badge_to_user(
        badge.id, user.id, initiator_id=admin_user.id
    )



@@ 119,14 120,14 @@ def test_award_badge_with_initiator(
def test_count_awardings(
    party_app, user1, user2, user3, badge1, badge2, badge3, awardings_scope,
):
    badge_command_service.award_badge_to_user(badge1.id, user1.id)
    badge_command_service.award_badge_to_user(badge1.id, user1.id)
    badge_command_service.award_badge_to_user(badge1.id, user2.id)
    badge_command_service.award_badge_to_user(badge1.id, user3.id)
    badge_command_service.award_badge_to_user(badge3.id, user2.id)
    badge_command_service.award_badge_to_user(badge3.id, user3.id)
    awarding_service.award_badge_to_user(badge1.id, user1.id)
    awarding_service.award_badge_to_user(badge1.id, user1.id)
    awarding_service.award_badge_to_user(badge1.id, user2.id)
    awarding_service.award_badge_to_user(badge1.id, user3.id)
    awarding_service.award_badge_to_user(badge3.id, user2.id)
    awarding_service.award_badge_to_user(badge3.id, user3.id)

    actual = badge_service.count_awardings()
    actual = awarding_service.count_awardings()

    # Remove counts for potential other badges.
    relevant_badge_ids = {badge1.id, badge2.id, badge3.id}


@@ 140,7 141,7 @@ def test_count_awardings(
def test_get_awardings_of_unknown_badge(party_app):
    unknown_badge_id = '00000000-0000-0000-0000-000000000000'

    actual = badge_service.get_awardings_of_badge(unknown_badge_id)
    actual = awarding_service.get_awardings_of_badge(unknown_badge_id)

    assert actual == set()



@@ 148,7 149,7 @@ def test_get_awardings_of_unknown_badge(party_app):
def test_get_awardings_of_unawarded_badge(party_app, badge3):
    badge = badge3

    actual = badge_service.get_awardings_of_badge(badge.id)
    actual = awarding_service.get_awardings_of_badge(badge.id)

    assert actual == set()



@@ 158,11 159,11 @@ def test_get_awardings_of_badge(
):
    badge = badge1

    badge_command_service.award_badge_to_user(badge.id, user1.id)
    badge_command_service.award_badge_to_user(badge.id, user1.id)
    badge_command_service.award_badge_to_user(badge.id, user2.id)
    awarding_service.award_badge_to_user(badge.id, user1.id)
    awarding_service.award_badge_to_user(badge.id, user1.id)
    awarding_service.award_badge_to_user(badge.id, user2.id)

    actual = badge_service.get_awardings_of_badge(badge.id)
    actual = awarding_service.get_awardings_of_badge(badge.id)

    assert actual == {
        QuantifiedBadgeAwarding(badge.id, user1.id, 2),