~sirn/fanboi2

ref: b60470797be426937be7e9b5a2435138c1863881 fanboi2/fanboi2/utils.py -rw-r--r-- 4.3 KiB
b6047079Kridsada Thanabulpong Use Vagrant cloud for VM box. 7 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import datetime
import hashlib
import requests
from pyramid.renderers import JSON
from sqlalchemy.orm import Query
from .models import redis_conn
from .version import __VERSION__


def serialize_request(request):
    """Serialize :class:`pyramid.request.Request` into a :type:`dict`.

    :param request: A :class:`pyramid.request.Request` object to serialize.

    :type request: pyramid.response.Request or dict
    :rtype: dict
    """

    if isinstance(request, dict):
        return request

    return {
        'application_url': request.application_url,
        'remote_addr': request.remote_addr,
        'user_agent': request.user_agent,
        'referrer': request.referrer,
        'url': request.url,
    }


class Akismet(object):
    """Basic integration between Pyramid and Akismet."""

    def __init__(self):
        self.key = None

    def configure_key(self, key):
        """Configure this :class:`Akismet` instance with the provided key.

        :param key: A :type:`str` Akismet API key.

        :type key: str
        :rtype: None
        """
        self.key = key

    def _api_post(self, name, data=None):
        """Make a request to Akismet API and return the response.

        :param name: A :type:`str` of API method name to request.
        :param data: A :type:`dict` payload.

        :type name: str
        :type data: dict
        :rtype: requests.models.Response
        """
        return requests.post(
            'https://%s.rest.akismet.com/1.1/%s' % (self.key, name),
            headers={'User-Agent': "Fanboi2/%s | Akismet/0.1" % __VERSION__},
            data=data)

    def spam(self, request, message):
        """Returns :type:`True` if `message` is spam. Always returns
        :type:`False` if Akismet key is not set.

        :param request: A :class:`pyramid.request.Request` object.
        :param message: A :type:`str` to identify.

        :type request: pyramid.request.Request or dict
        :type message: str
        :rtype: bool
        """
        if self.key:
            request = serialize_request(request)
            return self._api_post('comment-check', data={
                'blog': request['application_url'],
                'user_ip': request['remote_addr'],
                'user_agent': request['user_agent'],
                'referrer': request['referrer'],
                'permalink': request['url'],
                'comment_type': 'comment',
                'comment_content': message,
            }).content == b'true'
        return False


akismet = Akismet()


class RateLimiter(object):
    """Rate limit to throttle content posting to every specific seconds."""

    def __init__(self, request, namespace=None):
        request = serialize_request(request)
        self.key = "rate:%s:%s" % (
            namespace,
            hashlib.md5(request['remote_addr'].encode('utf8')).hexdigest(),
        )

    def limit(self, seconds=10):
        """Mark user as rate limited for `seconds`.

        :param seconds: A number of seconds :type:`int` to rate limited for.

        :type seconds: int
        :rtype: None
        """
        redis_conn.set(self.key, 1)
        redis_conn.expire(self.key, seconds)

    def limited(self):
        """Returns true if content should be limited from posting.
        :rtype: bool
        """
        return redis_conn.exists(self.key)

    def timeleft(self):
        """Returns seconds left until user is no longer throttled.
        :rtype: int
        """
        return redis_conn.ttl(self.key)


json_renderer = JSON()


def _datetime_adapter(obj, request):
    """Serialize :type:`datetime.datetime` object into a string.

    :param obj: A :class:`datetime.datetime` object.
    :param request: A :class:`pyramid.request.Request` object.

    :type obj: datetime.datetime
    :type request: pyramid.request.Request
    """
    return obj.isoformat()


def _sqlalchemy_query_adapter(obj, request):
    """Serialize SQLAlchemy query into a list.

    :param obj: An iterable SQLAlchemy's :class:`sqlalchemy.orm.Query` object.
    :param request: A :class:`pyramid.request.Request` object.

    :type obj: sqlalchemy.orm.Query
    :type request: pyramid.request.Request
    """
    return [item for item in obj]


json_renderer.add_adapter(datetime.datetime, _datetime_adapter)
json_renderer.add_adapter(Query, _sqlalchemy_query_adapter)