~sirn/fanboi2

ref: 98796226802a26b82c888365ad5b9cd331006792 fanboi2/fanboi2/utils/proxy.py -rw-r--r-- 5.2 KiB
98796226Kridsada Thanabulpong Bump copyright year. 3 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import requests
from ..version import __VERSION__
from ..cache import cache_region as cache_region_


class BlackBoxProxyDetector(object):
    """Provides integration with Black Block Proxy Block service."""

    def __init__(self, config):
        self.url = config.get('url')
        if not self.url:
            self.url = 'http://www.shroomery.org/ythan/proxycheck.php'

    def check(self, ip_address):
        """Request for IP evaluation and return raw results. Return the
        response as-is if return code is 200 and evaluation result is not
        an error code returned from Black Box Proxy Block.

        :param ip_address: An :type:`str` IP address.

        :type ip_address: str
        :rtype: str or None
        """
        try:
            result = requests.get(
                self.url,
                headers={'User-Agent': "Fanboi2/%s" % __VERSION__},
                params={'ip': ip_address},
                timeout=2)
        except requests.Timeout:
            return
        if result.status_code == 200 and result.content != b'X':
            return result.content

    def evaluate(self, result):
        """Evaluate result returned from the evaluation request. Return
        :type:`True` if evaluation result is 'Y', i.e. a proxy.

        :param result: A result from evaluation request.

        :type result: str
        :rtype: bool
        """
        if result == b'Y':
            return True
        return False


class GetIPIntelProxyDetector(object):
    """Provides integration with GetIPIntel proxy detection service."""

    def __init__(self, config):
        self.url = config.get('url')
        self.flags = config.get('flags')
        self.email = config.get('email')
        if not self.url:
            self.url = 'http://check.getipintel.net/check.php'
        if not self.email:
            raise ValueError('GetIPIntel require an email to be present.')

    def check(self, ip_address):
        """Request for IP evaluation and return raw results. Return the
        response as-is if return code is 200 and evaluation result is
        positive.

        :param ip_address: An :type:`str` IP address.

        :type ip_address: str
        :rtype: str or None
        """
        params = {'contact': self.email, 'ip': ip_address}
        if self.flags:
            params['flags'] = self.flags
        try:
            result = requests.get(
                self.url,
                headers={'User-Agent': "Fanboi2/%s" % __VERSION__},
                params=params,
                timeout=5)
        except requests.Timeout:
            return
        if result.status_code == 200 and float(result.content) >= 0:
            return result.content

    def evaluate(self, result):
        """Evaluate result returned from the evaluation request. Return
        :type:`True` if evaluation result is likely to be a proxy, with
        probability higher than ``0.99`` (for example, ``0.994120``).

        :param result: A result from evaluation request.

        :type result: str
        :rtype: bool
        """
        if float(result) > 0.99:
            return True
        return False


DETECTOR_PROVIDERS = {
    'blackbox': BlackBoxProxyDetector,
    'getipintel': GetIPIntelProxyDetector,
}


class ProxyDetector(object):
    """Base class for dispatching proxy detection into multiple providers."""

    def __init__(self, cache_region=cache_region_):
        self.providers = []
        self.instances = {}
        self.cache_region = cache_region

    def configure_from_config(self, config, key=None):
        """Configure and initialize proxy detectors. The configuration dict
        may contains provider-specific configuration using the same dotted-
        name as the provider itself, for example ``getipintel.url``.

        If the configuration key is prefixed with other dotted names, ``key``
        may be given to extract from that prefix.

        :param config: Configuration :type:`dict`.
        :param key: Key prefix to extract configuration.
        """
        if key is None:
            key = ''
        self.providers = config.get('%sproviders' % (key,), [])
        for provider in self.providers:
            class_ = DETECTOR_PROVIDERS[provider]
            provider_key = "%s%s." % (key, provider)
            provider_config = {}
            for k, v in config.items():
                if k.startswith(provider_key):
                    provider_config[k[len(provider_key):]] = v
            self.instances[provider] = class_(provider_config)

    def detect(self, ip_address):
        """Detect if the given ``ip_address`` is a proxy using providers
        configured via :meth:``configure_from_config``.

        :param ip_address: An IP address to perform a proxy check against.
        :type ip_address: str
        :rtype: bool
        """
        for provider in self.providers:
            detector = self.instances[provider]
            result = self.cache_region.get_or_create(
                'proxy:%s:%s' % (provider, ip_address),
                lambda: detector.check(ip_address),
                should_cache_fn=lambda v: v is not None,
                expiration_time=21600)
            if result is not None and detector.evaluate(result):
                return True
        return False