~sircmpwn/lists.sr.ht

lists.sr.ht/listssrht/oauth.py -rw-r--r-- 2.2 KiB View raw
bbaf8ed2Drew DeVault Deliver list deletion webhook via UI 3 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from email.utils import parseaddr
from srht.config import cfg
from srht.database import db
from srht.oauth import AbstractOAuthService, DelegatedScope
from listssrht.types import Email, OAuthToken, Subscription, User
import sqlalchemy as sa

client_id = cfg("lists.sr.ht", "oauth-client-id")
client_secret = cfg("lists.sr.ht", "oauth-client-secret")

class ListsOAuthService(AbstractOAuthService):
    def __init__(self):
        super().__init__(client_id, client_secret, delegated_scopes=[
            DelegatedScope("lists", "mailing lists", True),
            DelegatedScope("email", "emails", False),
            DelegatedScope("patches", "patches", True),
            DelegatedScope("subs", "subscriptions", True),
        ], user_class=User, token_class=OAuthToken)

    def lookup_or_register(self, token, token_expires, scopes):
        user = super().lookup_or_register(token, token_expires, scopes)
        if not user.id:
            db.session.flush()
            # Rewire existing subscriptions
            for sub in Subscription.query.filter(
                    Subscription.email == user.email).all():
                sub.email = None
                sub.user_id = user.id
            # Rewire existing emails
            for email in (Email.query
                    .filter(Email.sender_id is None)
                    .filter(sa.cast(Email.headers["From"], sa.String)
                        .ilike("%" + user.email + "%"))):
                name, addr = parseaddr(email.headers["From"])
                if addr == user.email:
                    email.sender_id = user.id
        db.session.commit()
        return user

    def profile_update_hook(self, user, payload):
        if user.email != payload["email"]:
            # Adopt emails sent with this address
            for email in (Email.query
                    .filter(Email.sender_id is None)
                    .filter(sa.cast(Email.headers["From"], sa.String)
                        .ilike("%" + user.email + "%"))):
                name, addr = parseaddr(email.headers["From"])
                if addr == payload["email"]:
                    email.sender_id = user.id
        super().profile_update_hook(user, payload)