~sirn/fanboi2

ref: 8b56ac65160b4b491c3a1bf0864cb47f9e996cd1 fanboi2/fanboi2/utils.py -rw-r--r-- 3.6 KiB
8b56ac65Kridsada Thanabulpong Fix broken error report due to Celery no longer raise original exceptions. 7 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import hashlib
import requests
import socket
from IPy import IP
from .models import redis_conn
from .version import __VERSION__


def serialize_request(request):
    """Serialize :class:`pyramid.util.Request` into a :type:`dict`."""

    if isinstance(request, dict):
        return request

    return {
        'application_url': request.application_url,
        'remote_addr': request.remote_addr,
        'user_agent': request.user_agent,
        'referrer': request.referrer,
        'url': request.url,
    }


class Dnsbl(object) :
    """Utility class for checking IP address against DNSBL providers."""

    def __init__(self):
        self.providers = []

    def configure_providers(self, providers):
        if isinstance(providers, str):
            providers = providers.split()
        self.providers = providers

    def listed(self, ip_address):
        """Returns :type:`True` if the given IP address is listed in the
        DNSBL providers. Returns :type:`False` if not listed or no DNSBL
        providers present.
        """
        if self.providers:
            for provider in self.providers:
                try:
                    check = '.'.join(reversed(ip_address.split('.')))
                    res = socket.gethostbyname("%s.%s." % (check, provider))
                    if IP(res).make_net('255.0.0.0') == IP('127.0.0.0/8'):
                        return True
                except (socket.gaierror, ValueError):
                    continue
        return False


dnsbl = Dnsbl()


class Akismet(object):
    """Basic integration between Pyramid and Akismet."""

    def __init__(self):
        self.key = None

    def configure_key(self, key):
        self.key = key

    def _api_post(self, name, data=None):
        return requests.post(
            'https://%s.rest.akismet.com/1.1/%s' % (self.key, name),
            headers={'User-Agent': "Fanboi2/%s | Akismet/0.1" % __VERSION__},
            data=data,
            timeout=2)

    def spam(self, request, message):
        """Returns :type:`True` if `message` is spam. Always returns
        :type:`False` if Akismet key is not set or the request to Akismet
        was timed out.
        """
        if self.key:
            request = serialize_request(request)
            try:
                return self._api_post('comment-check', data={
                    'blog': request['application_url'],
                    'user_ip': request['remote_addr'],
                    'user_agent': request['user_agent'],
                    'referrer': request['referrer'],
                    'permalink': request['url'],
                    'comment_type': 'comment',
                    'comment_content': message,
                }).content == b'true'
            except requests.Timeout:
                return False
        return False


class RateLimiter(object):
    """Rate limit to throttle content posting to every specific seconds."""

    def __init__(self, request, namespace=None):
        request = serialize_request(request)
        self.key = "rate:%s:%s" % (
            namespace,
            hashlib.md5(request['remote_addr'].encode('utf8')).hexdigest(),
        )

    def limit(self, seconds=10):
        """Mark user as rate limited for `seconds`."""
        redis_conn.set(self.key, 1)
        redis_conn.expire(self.key, seconds)

    def limited(self):
        """Returns true if content should be limited from posting."""
        return redis_conn.exists(self.key)

    def timeleft(self):
        """Returns seconds left until user is no longer throttled."""
        return redis_conn.ttl(self.key)


akismet = Akismet()