~sircmpwn/core.sr.ht

core.sr.ht/srht/api.py -rw-r--r-- 3.3 KiB
b695e020Drew DeVault Add "internal_anon" path for internal auth tokens 3 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import requests
from flask import current_app, request
from srht.crypto import encrypt_request_authorization
from werkzeug.local import LocalProxy

_default = 1

def paginated_response(id_col, query,
        order_by=_default, serialize=_default, per_page=50, **kwargs):
    """
    Returns a standard paginated response for a given SQLAlchemy query result.
    The id_col should be the column to paginate by (generally the primary key),
    which is compared <= the ?start=<n> argument.

    order_by will ordered by the ID column, descending, by default. Otherwise
    provide a SQLAlchemy expression, e.g. SomeType.some_col.desc().

    serialize is a function which will serialize each record into a dict. The
    default is lambda r: r.to_dict().

    per_page is the number of results per page to return. Default is 50.
    """
    total = query.count()
    start = request.args.get('start') or -1
    if isinstance(start, str):
        start = int(start)
    if start != -1:
        query = query.filter(id_col <= start)
    if order_by is _default:
        order_by = id_col.desc()
    if order_by is not None:
        if isinstance(order_by, tuple):
            query = query.order_by(*order_by)
        else:
            query = query.order_by(order_by)
    records = query.limit(per_page + 1).all()
    if len(records) != per_page + 1:
        next_id = None
    else:
        next_id = str(records[-1].id)
        records = records[:per_page]
    if serialize is _default:
        serialize = lambda r: r.to_dict(**kwargs)
    return {
        "next": next_id,
        "results": [serialize(record) for record in records],
        "total": total,
        "results_per_page": per_page,
    }

def get_authorization(user):
    return encrypt_request_authorization(user)

def get_results(url, user):
    response = {"next": -1}
    while response.get("next") is not None:
        rurl = f"{url}?start={response['next']}"
        r = requests.get(rurl, headers=get_authorization(user))
        if r.status_code != 200:
            raise Exception(r.text)
        response = r.json()
        yield from response["results"]

def ensure_webhooks(user, baseurl, webhooks):
    """
    Ensures that the specified webhooks are rigged up. Webhooks should be a
    dict whose key is the webhook URL and whose values are the list of events
    to send to that URL, or None to unconfigure this webhook.
    """
    for webhook in get_results(baseurl, user):
        url = webhook["url"]
        if url not in webhooks:
            continue
        if webhook["events"] == webhooks[url]:
            del webhooks[url]
            continue # This webhook already configured
        # This webhook is set up incorrectly, delete it
        webhook_url = f"{baseurl}/{webhook['id']}"
        r = requests.delete(webhook_url,
                headers=get_authorization(user))
        if r.status_code != 204:
            raise Exception(f"Failed to remove invalid webhook: {r.text}")
        if webhooks[url] is None:
            del webhooks[url]
    for url, events in webhooks.items():
        if not events:
            continue
        r = requests.post(baseurl,
                headers=get_authorization(user),
                json={"events": events, "url": url})
        if r.status_code != 201:
            raise Exception(f"Failed to create webhook: {r.text}")