~homeworkprod/byceps

ref: 4237b3ec9496efe95dcce82bea3207ab9de4d520 byceps/byceps/services/orga_presence/service.py -rw-r--r-- 2.9 KiB
4237b3ec — Jochen Kupperschmidt Move ticketing blueprint into `site` subpackage 1 year, 11 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
"""
byceps.services.orga_presence.service
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:Copyright: 2006-2020 Jochen Kupperschmidt
:License: Modified BSD, see LICENSE for details.
"""

from datetime import date, datetime
from itertools import groupby
from typing import Iterator, List, Sequence, Tuple

import pendulum
from pendulum import DateTime

from ...database import db
from ...typing import PartyID
from ...util.datetime.range import create_adjacent_ranges, DateTimeRange

from .models import Presence, Task
from .transfer.models import PresenceTimeSlot, TaskTimeSlot, TimeSlot


def get_presences(party_id: PartyID) -> List[PresenceTimeSlot]:
    """Return all presences for that party."""
    presences = Presence.query \
        .for_party(party_id) \
        .options(db.joinedload('orga')) \
        .all()

    return [_presence_to_time_slot(presence) for presence in presences]


def get_tasks(party_id: PartyID) -> List[TaskTimeSlot]:
    """Return all tasks for that party."""
    tasks = Task.query \
        .for_party(party_id) \
        .all()

    return [_task_to_time_slot(task) for task in tasks]


def _presence_to_time_slot(presence: Presence) -> PresenceTimeSlot:
    return PresenceTimeSlot.from_(
        presence.orga, presence.starts_at, presence.ends_at,
    )


def _task_to_time_slot(task: Task) -> TaskTimeSlot:
    return TaskTimeSlot.from_(task.title, task.starts_at, task.ends_at)


# -------------------------------------------------------------------- #


def get_hour_ranges(time_slots: List[TimeSlot]) -> Iterator[DateTimeRange]:
    """Yield hour ranges based on the time slots."""
    time_slot_ranges = [time_slot.range for time_slot in time_slots]
    hour_starts = _get_hour_starts(time_slot_ranges)
    return create_adjacent_ranges(hour_starts)


def _get_hour_starts(dt_ranges: Sequence[DateTimeRange]) -> Iterator[datetime]:
    min_starts_at = _find_earliest_start(dt_ranges)
    max_ends_at = _find_latest_end(dt_ranges)

    period = pendulum.period(min_starts_at, max_ends_at)
    hour_starts = period.range('hours')

    return _to_datetimes_without_tzinfo(hour_starts)


def _find_earliest_start(dt_ranges: Sequence[DateTimeRange]) -> datetime:
    return min(dt_range.start for dt_range in dt_ranges)


def _find_latest_end(dt_ranges: Sequence[DateTimeRange]) -> datetime:
    return max(dt_range.end for dt_range in dt_ranges)


def _to_datetimes_without_tzinfo(dts: Sequence[DateTime]) -> Iterator[datetime]:
    for dt in dts:
        yield dt.replace(tzinfo=None)


def get_days_and_hour_totals(
    hour_ranges: Sequence[DateTimeRange],
) -> Iterator[Tuple[date, int]]:
    """Yield (day, relevant hours total) pairs."""

    def get_date(dt_range: DateTimeRange) -> date:
        return dt_range.start.date()

    for day, hour_ranges_for_day in groupby(hour_ranges, key=get_date):
        hour_total = len(list(hour_ranges_for_day))
        yield day, hour_total