~sirn/fanboi2

ref: 025ad9d8716d5c1a8786a4f7dd6c0e7bd8907745 fanboi2/fanboi2/tasks.py -rw-r--r-- 2.8 KiB
025ad9d8Kridsada Thanabulpong Merge branch 'develop' into feature/experiment-view2 7 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import transaction
from celery import Celery
from sqlalchemy.exc import IntegrityError
from .models import DBSession, Post, Topic, Board
from .utils import akismet, dnsbl

celery = Celery()


def configure_celery(settings):  # pragma: no cover
    """Returns a Celery configuration object.

    :param settings: A settings :type:`dict`.

    :type settings: dict
    :rtype: dict
    """
    return {
        'BROKER_URL': settings['celery.broker'],
        'CELERY_RESULT_BACKEND': settings['celery.broker'],
        'CELERY_ACCEPT_CONTENT': ['json'],
        'CELERY_TASK_SERIALIZER': 'json',
        'CELERY_RESULT_SERIALIZER': 'json',
        'CELERY_EVENT_SERIALIZER': 'json',
        'CELERY_TIMEZONE': settings['app.timezone'],
    }


@celery.task()
def add_topic(request, board_id, title, body):
    """Insert a topic to the database.

    :param request: A serialized request :type:`dict` returned from
                    :meth:`fanboi2.utils.serialize_request`.
    :param board_id: An :type:`int` referencing board ID.
    :param title: A :type:`str` topic title.
    :param body: A :type:`str` topic body.

    :type request: dict
    :type board_id: int
    :type title: str
    :type body: str
    :rtype: tuple
    """
    if akismet.spam(request, body):
        return 'failure', 'spam'

    if dnsbl.listed(request['remote_addr']):
        return 'failure', 'dnsbl'

    with transaction.manager:
        board = DBSession.query(Board).get(board_id)
        post = Post(body=body, ip_address=request['remote_addr'])
        post.topic = Topic(board=board, title=title)
        DBSession.add(post)
        DBSession.flush()
        return 'topic', post.topic_id


@celery.task(bind=True, max_retries=4)  # 5 total.
def add_post(self, request, topic_id, body, bumped):
    """Insert a post to a topic.

    :param self: A :class:`celery.Task` object.
    :param request: A serialized request :type:`dict` returned from
                    :meth:`fanboi2.utils.serialize_request`.
    :param topic_id: An :type:`int` referencing topic ID.
    :param body: A :type:`str` post body.
    :param bumped: A :type:`bool` specifying bump status.

    :type self: celery.Task
    :type request: dict
    :type topic_id: int
    :type body: str
    :type bumped: bool
    :rtype: tuple
    """
    if akismet.spam(request, body):
        return 'failure', 'spam'

    with transaction.manager:
        topic = DBSession.query(Topic).get(topic_id)
        if topic.status != "open":
            return 'failure', topic.status

        post = Post(topic=topic, body=body, bumped=bumped)
        post.ip_address = request['remote_addr']

        try:
            DBSession.add(post)
            DBSession.flush()
        except IntegrityError as e:
            raise self.retry(exc=e)

        return 'post', post.id