~sirn/fanboi2

ref: 025ad9d8716d5c1a8786a4f7dd6c0e7bd8907745 fanboi2/fanboi2/utils.py -rw-r--r-- 5.5 KiB
025ad9d8Kridsada Thanabulpong Merge branch 'develop' into feature/experiment-view2 7 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import datetime
import hashlib
import requests
import socket
from IPy import IP
from pyramid.renderers import JSON
from sqlalchemy.orm import Query
from .models import redis_conn
from .version import __VERSION__


def serialize_request(request):
    """Serialize :class:`pyramid.request.Request` into a :type:`dict`.

    :param request: A :class:`pyramid.request.Request` object to serialize.

    :type request: pyramid.response.Request or dict
    :rtype: dict
    """

    if isinstance(request, dict):
        return request

    return {
        'application_url': request.application_url,
        'remote_addr': request.remote_addr,
        'user_agent': request.user_agent,
        'referrer': request.referrer,
        'url': request.url,
    }


class Dnsbl(object) :
    """Utility class for checking IP address against DNSBL providers."""

    def __init__(self):
        self.providers = []

    def configure_providers(self, providers):
        if isinstance(providers, str):
            providers = providers.split()
        self.providers = providers

    def listed(self, ip_address):
        """Returns :type:`True` if the given IP address is listed in the
        DNSBL providers. Returns :type:`False` if not listed or no DNSBL
        providers present.
        """
        if self.providers:
            for provider in self.providers:
                try:
                    check = '.'.join(reversed(ip_address.split('.')))
                    res = socket.gethostbyname("%s.%s." % (check, provider))
                    if IP(res).make_net('255.0.0.0') == IP('127.0.0.0/8'):
                        return True
                except (socket.gaierror, ValueError):
                    continue
        return False


dnsbl = Dnsbl()


class Akismet(object):
    """Basic integration between Pyramid and Akismet."""

    def __init__(self):
        self.key = None

    def configure_key(self, key):
        """Configure this :class:`Akismet` instance with the provided key.

        :param key: A :type:`str` Akismet API key.

        :type key: str
        :rtype: None
        """
        self.key = key

    def _api_post(self, name, data=None):
        """Make a request to Akismet API and return the response.

        :param name: A :type:`str` of API method name to request.
        :param data: A :type:`dict` payload.

        :type name: str
        :type data: dict
        :rtype: requests.models.Response
        """
        return requests.post(
            'https://%s.rest.akismet.com/1.1/%s' % (self.key, name),
            headers={'User-Agent': "Fanboi2/%s | Akismet/0.1" % __VERSION__},
            data=data,
            timeout=2)

    def spam(self, request, message):
        """Returns :type:`True` if `message` is spam. Always returns
        :type:`False` if Akismet key is not set or the request to Akismet
        was timed out.

        :param request: A :class:`pyramid.request.Request` object.
        :param message: A :type:`str` to identify.

        :type request: pyramid.request.Request or dict
        :type message: str
        :rtype: bool
        """
        if self.key:
            request = serialize_request(request)
            try:
                return self._api_post('comment-check', data={
                    'blog': request['application_url'],
                    'user_ip': request['remote_addr'],
                    'user_agent': request['user_agent'],
                    'referrer': request['referrer'],
                    'permalink': request['url'],
                    'comment_type': 'comment',
                    'comment_content': message,
                }).content == b'true'
            except requests.Timeout:
                return False
        return False


akismet = Akismet()


class RateLimiter(object):
    """Rate limit to throttle content posting to every specific seconds."""

    def __init__(self, request, namespace=None):
        request = serialize_request(request)
        self.key = "rate:%s:%s" % (
            namespace,
            hashlib.md5(request['remote_addr'].encode('utf8')).hexdigest(),
        )

    def limit(self, seconds=10):
        """Mark user as rate limited for `seconds`.

        :param seconds: A number of seconds :type:`int` to rate limited for.

        :type seconds: int
        :rtype: None
        """
        redis_conn.set(self.key, 1)
        redis_conn.expire(self.key, seconds)

    def limited(self):
        """Returns true if content should be limited from posting.
        :rtype: bool
        """
        return redis_conn.exists(self.key)

    def timeleft(self):
        """Returns seconds left until user is no longer throttled.
        :rtype: int
        """
        return redis_conn.ttl(self.key)


json_renderer = JSON()


def _datetime_adapter(obj, request):
    """Serialize :type:`datetime.datetime` object into a string.

    :param obj: A :class:`datetime.datetime` object.
    :param request: A :class:`pyramid.request.Request` object.

    :type obj: datetime.datetime
    :type request: pyramid.request.Request
    """
    return obj.isoformat()


def _sqlalchemy_query_adapter(obj, request):
    """Serialize SQLAlchemy query into a list.

    :param obj: An iterable SQLAlchemy's :class:`sqlalchemy.orm.Query` object.
    :param request: A :class:`pyramid.request.Request` object.

    :type obj: sqlalchemy.orm.Query
    :type request: pyramid.request.Request
    """
    return [item for item in obj]


json_renderer.add_adapter(datetime.datetime, _datetime_adapter)
json_renderer.add_adapter(Query, _sqlalchemy_query_adapter)