~homeworkprod/byceps

ref: 4237b3ec9496efe95dcce82bea3207ab9de4d520 byceps/byceps/services/newsletter/service.py -rw-r--r-- 7.3 KiB
4237b3ec — Jochen Kupperschmidt Move ticketing blueprint into `site` subpackage 1 year, 11 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
"""
byceps.services.newsletter.service
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:Copyright: 2006-2020 Jochen Kupperschmidt
:License: Modified BSD, see LICENSE for details.
"""

from operator import itemgetter
from typing import (
    Dict,
    Iterable,
    Iterator,
    Optional,
    Sequence,
    Set,
    Tuple,
    Union,
)

from ...database import BaseQuery, db
from ...typing import UserID

from ..user.models.user import User as DbUser

from .models import (
    List as DbList,
    Subscriber,
    SubscriptionUpdate as DbSubscriptionUpdate,
)
from .transfer.models import List, ListID
from .types import SubscriptionState


def find_list(list_id: ListID) -> Optional[List]:
    """Return the list with that ID, or `None` if not found."""
    list_ = DbList.query.get(list_id)

    if list_ is None:
        return None

    return _db_entity_to_list(list_)


def get_all_lists() -> Sequence[List]:
    """Return all lists."""
    lists = DbList.query.all()

    return [_db_entity_to_list(list_) for list_ in lists]


def count_subscribers_for_list(list_id: ListID) -> int:
    """Return the number of users that are currently subscribed to that list."""
    return _build_query_for_current_subscribers(list_id).count()


def get_subscribers(list_id: ListID) -> Iterable[Subscriber]:
    """Yield screen name and email address of the initialized users that
    are currently subscribed to the list.
    """
    subscriber_id_rows = _build_query_for_current_subscribers(list_id).all()

    subscriber_ids = set(map(itemgetter(0), subscriber_id_rows))

    return _get_subscriber_details(subscriber_ids)


def _build_query_for_current_subscribers(list_id: ListID) -> BaseQuery:
    """Build a query to return the most recent subscription state
    (grouped by user and list).

    The generated SQL should be equivalent to this:

        SELECT
          nso.user_id
        FROM newsletter_subscription_updates AS nso
          JOIN (
            SELECT
              user_id,
              list_id,
              MAX(expressed_at) AS latest_expressed_at
            FROM newsletter_subscription_updates
            GROUP BY
              user_id,
              list_id
          ) AS nsi
            ON nso.user_id = nsi.user_id
              AND nso.list_id = nsi.list_id
              AND nso.expressed_at = nsi.latest_expressed_at
        WHERE nso.state = 'requested'
          AND nso.list_id = <list_id>
    """
    subquery = _build_query_for_latest_expressed_at().subquery()

    return db.session \
        .query(
            DbSubscriptionUpdate.user_id
        ) \
        .join(subquery, db.and_(
            DbSubscriptionUpdate.user_id == subquery.c.user_id,
            DbSubscriptionUpdate.list_id == subquery.c.list_id,
            DbSubscriptionUpdate.expressed_at == subquery.c.latest_expressed_at
        )) \
        .filter(DbSubscriptionUpdate._state == SubscriptionState.requested.name) \
        .filter(DbSubscriptionUpdate.list_id == list_id)


def _get_subscriber_details(user_ids: Set[UserID]) -> Iterator[Subscriber]:
    """Yield screen name and email address of each eligible user."""
    if not user_ids:
        return []

    rows = db.session \
        .query(
            DbUser.screen_name,
            DbUser.email_address,
        ) \
        .filter(DbUser.id.in_(user_ids)) \
        .filter(DbUser.email_address != None) \
        .filter_by(initialized=True) \
        .filter_by(email_address_verified=True) \
        .filter_by(suspended=False) \
        .filter_by(deleted=False) \
        .all()

    for row in rows:
        yield Subscriber(row.screen_name, row.email_address)


def count_subscriptions_by_state(
    list_id: ListID,
) -> Dict[Union[SubscriptionState, str], int]:
    """Return the totals for each state as well as an overall total."""
    rows = _build_query_for_current_state(list_id) \
        .all()

    totals = {state: 0 for state in SubscriptionState}

    for state_name, count in rows:
        state = SubscriptionState[state_name]
        totals[state] = count

    totals['total'] = sum(totals.values())

    return totals


def _build_query_for_current_state(list_id: ListID) -> BaseQuery:
    """Build a query to return the number of currently requested and
    declined subscription states for that list.

    The generated SQL should be equivalent to this:

        SELECT
          nso.state,
          COUNT(nso.state)
        FROM newsletter_subscription_updates AS nso
          JOIN (
            SELECT
              user_id,
              list_id,
              MAX(expressed_at) AS latest_expressed_at
            FROM newsletter_subscription_updates
            GROUP BY
              user_id,
              list_id
          ) AS nsi
            ON nso.user_id = nsi.user_id
              AND nso.list_id = nsi.list_id
              AND nso.expressed_at = nsi.latest_expressed_at
        WHERE list_id = {list_id}
        GROUP BY
          list_id,
          state
    """
    subquery = _build_query_for_latest_expressed_at().subquery()

    return db.session \
        .query(
            DbSubscriptionUpdate._state,
            db.func.count(DbSubscriptionUpdate._state),
        ) \
        .join(subquery, db.and_(
            DbSubscriptionUpdate.user_id == subquery.c.user_id,
            DbSubscriptionUpdate.list_id == subquery.c.list_id,
            DbSubscriptionUpdate.expressed_at == subquery.c.latest_expressed_at
        )) \
        .filter_by(list_id=list_id) \
        .group_by(
            DbSubscriptionUpdate.list_id,
            DbSubscriptionUpdate._state,
        )


def _build_query_for_latest_expressed_at() -> BaseQuery:
    """Build a query to return the most recent time the subscription
    state was set (grouped by user and list).

    The generated SQL should be equivalent to this:

        SELECT user_id, list_id, MAX(expressed_at) AS latest_expressed_at
        FROM newsletter_subscription_updates
        GROUP BY user_id, list_id
    """
    return db.session \
        .query(
            DbSubscriptionUpdate.user_id,
            DbSubscriptionUpdate.list_id,
            db.func.max(DbSubscriptionUpdate.expressed_at)
                .label('latest_expressed_at')
        ) \
        .group_by(
            DbSubscriptionUpdate.user_id,
            DbSubscriptionUpdate.list_id
        )


def get_subscription_state(
    user_id: UserID, list_id: ListID
) -> SubscriptionState:
    """Return the user's current subscription state for that list."""
    current_subscription = DbSubscriptionUpdate.query \
        .filter_by(user_id=user_id) \
        .filter_by(list_id=list_id) \
        .order_by(DbSubscriptionUpdate.expressed_at.desc()) \
        .first()

    if current_subscription is None:
        return SubscriptionState.declined

    return current_subscription.state


def get_subscription_updates_for_user(
    user_id: UserID,
) -> Sequence[DbSubscriptionUpdate]:
    """Return subscription updates made by the user, for any list."""
    return DbSubscriptionUpdate.query \
        .filter_by(user_id=user_id) \
        .all()


def is_subscribed(user_id: UserID, list_id: ListID) -> bool:
    """Return if the user is subscribed to the list or not."""
    subscription_state = get_subscription_state(user_id, list_id)
    return subscription_state == SubscriptionState.requested


def _db_entity_to_list(list_: DbList) -> List:
    return List(
        list_.id,
        list_.title,
    )