~sirn/fanboi2

ref: 834edf0edc5dd633c0ecea16231b6ed2d728476d fanboi2/fanboi2/filters/proxy.py -rw-r--r-- 4.7 KiB
834edf0eKridsada Thanabulpong Massive cleanup in preparation for 0.30 (#25) 3 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import requests

from ..version import __VERSION__
from . import register_filter


class BlackBoxProxyDetector(object):
    """Provides integration with Black Block Proxy Block service."""

    def __init__(self, **kwargs):
        self.url = kwargs.get('url')
        if not self.url:
            self.url = 'http://proxy.mind-media.com/block/proxycheck.php'

    def check(self, ip_address):
        """Request for IP evaluation and return raw results. Return the
        response as-is if return code is 200 and evaluation result is not
        an error code returned from Black Box Proxy Block.

        :param ip_address: An :type:`str` IP address.
        """
        try:
            result = requests.get(
                self.url,
                headers={'User-Agent': "fanboi2/%s" % __VERSION__},
                params={'ip': ip_address},
                timeout=2)
        except requests.Timeout:
            return
        if result.status_code == 200 and result.content != b'X':
            return result.content

    def evaluate(self, result):
        """Evaluate result returned from the evaluation request. Return
        :type:`True` if evaluation result is 'Y', i.e. a proxy.

        :param result: A result from evaluation request.
        """
        if result == b'Y':
            return True
        return False


class GetIPIntelProxyDetector(object):
    """Provides integration with GetIPIntel proxy detection service."""

    def __init__(self, **kwargs):
        self.url = kwargs.get('url')
        self.flags = kwargs.get('flags')
        self.email = kwargs.get('email')
        if not self.url:
            self.url = 'http://check.getipintel.net/check.php'
        if not self.email:
            raise ValueError('GetIPIntel require an email to be present.')

    def check(self, ip_address):
        """Request for IP evaluation and return raw results. Return the
        response as-is if return code is 200 and evaluation result is
        positive.

        :param ip_address: An :type:`str` IP address.
        """
        params = {'contact': self.email, 'ip': ip_address}
        if self.flags:
            params['flags'] = self.flags
        try:
            result = requests.get(
                self.url,
                headers={'User-Agent': "fanboi2/%s" % __VERSION__},
                params=params,
                timeout=5)
        except requests.Timeout:
            return
        if result.status_code == 200 and float(result.content) >= 0:
            return result.content

    def evaluate(self, result):
        """Evaluate result returned from the evaluation request. Return
        :type:`True` if evaluation result is likely to be a proxy, with
        probability higher than ``0.99`` (for example, ``0.994120``).

        :param result: A result from evaluation request.
        """
        if float(result) > 0.99:
            return True
        return False


DETECTOR_PROVIDERS = {
    'blackbox': BlackBoxProxyDetector,
    'getipintel': GetIPIntelProxyDetector,
}


@register_filter(name='proxy')
class ProxyDetector(object):
    """Base class for dispatching proxy detection into multiple providers."""

    __use_services__ = ('cache',)
    __default_settings__ = {
        'blackbox': {
            'enabled': False,
            'url': 'http://proxy.mind-media.com/block/proxycheck.php',
        },
        'getipintel': {
            'enabled': False,
            'url':  'http://check.getipintel.net/check.php',
            'email': None,
            'flags': None,
        },
    }

    def __init__(self, settings=None, services={}):
        if not settings:
            settings = {}
        self.settings = settings
        self.cache_region = services['cache']

    def _get_cache_key(self, provider, ip_address):
        return 'filters.proxy:provider=%s,ip_address=%s' % (
            provider,
            ip_address)

    def should_reject(self, payload):
        """Returns :type:`True` if the given IP address is identified as
        proxy by one of the providers. Returns :type:`False` if the IP
        address was not identified as a proxy or no providers configured.

        :param payload: A filter payload.
        """
        for provider, settings in self.settings.items():
            if settings['enabled']:
                detector = DETECTOR_PROVIDERS[provider](**settings)
                result = self.cache_region.get_or_create(
                    self._get_cache_key(provider, payload['ip_address']),
                    lambda: detector.check(payload['ip_address']),
                    should_cache_fn=lambda v: v is not None,
                    expiration_time=21600)
                if result is not None and detector.evaluate(result):
                    return True
        return False