~sircmpwn/core.sr.ht

core.sr.ht/srht/webhook/celery.py -rw-r--r-- 2.8 KiB
0e245224Ryan Gonzalez srht.Validation: Don't reject enums with 0 values 19 hours ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""
Import this module only after configuring your database.
"""

from celery import Celery
from srht.database import db
from srht.webhook import Webhook
from werkzeug.local import LocalProxy
import requests

_async_request = None
async_request = LocalProxy(lambda: _async_request)

def make_worker(broker='redis://'):
    worker = Celery('webhooks', broker=broker)

    def task(func):
        def wrapper(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except Exception as ex:
                mail_exception(ex, context=f"webhook process")
                try:
                    db.session.rollback()
                except:
                    pass
                return
        wrapper.__name__ = func.__name__
        return worker.task(wrapper)

    @task
    def async_request(url, payload, headers,
            delivery_table=None, delivery_id=None, timeout=5):
        """
        Performs an HTTP POST asyncronously, and updates the delivery row if a
        table & id is specified.
        """
        r = None
        try:
            r = requests.post(url, data=payload, timeout=timeout, headers=headers)
            response = r.text
            response_status = r.status_code
            response_headers = "\n".join(
                    f"{key}: {value}" for key, value in r.headers.items())
        except requests.exceptions.ReadTimeout:
            response = "Request timed out after 5 seconds."
            response_status = -1
            response_headers = None
        if delivery_table and delivery_id:
            db.session.execute(
                f"""
                UPDATE {delivery_table}
                SET response = :response,
                    response_status = :status,
                    response_headers = :headers
                WHERE id = :delivery_id
                """, {
                    "response": response,
                    "status": response_status,
                    "headers": response_headers,
                    "delivery_id": delivery_id
                })
            db.session.commit()
        return r

    global _async_request
    _async_request = async_request
    return worker

class CeleryWebhook(Webhook):
    def process_delivery(cls, delivery, headers, delay=True, timeout=5):
        """
        Delivers the webhook via celery (or immediately if delay=True).
        """
        Delivery = cls.Delivery
        if delay:
            async_request.delay(delivery.url, delivery.payload, headers,
                    timeout=timeout, delivery_table=Delivery.__tablename__,
                    delivery_id=delivery.id)
        else:
            return async_request(delivery.url, delivery.payload, headers,
                    timeout=timeout, delivery_table=Delivery.__tablename__,
                    delivery_id=delivery.id)