~sirn/fanboi2

b6d9420ce840ac1633539fc78daea1634cf0ffa8 — Kridsada Thanabulpong 3 months ago 071b526
filters/proxy: skip proxy check for ipv6
2 files changed, 42 insertions(+), 11 deletions(-)

M fanboi2/filters/proxy.py
M fanboi2/tests/test_filters.py
M fanboi2/filters/proxy.py => fanboi2/filters/proxy.py +30 -11
@@ 1,5 1,6 @@
import math
import requests
from ipaddress import ip_address

from ..version import __VERSION__
from . import register_filter


@@ 13,18 14,26 @@ class BlackBoxProxyDetector(object):
        if not self.url:
            self.url = "http://proxy.mind-media.com/block/proxycheck.php"

    def check(self, ip_address):
    def can_check(self, ipaddr):
        """Returns :type:`True` if this detector can check the given
        IP address.

        :param ipaddr: An :type:`str` IP address.
        """
        return ip_address(ipaddr).version == 4

    def check(self, ipaddr):
        """Request for IP evaluation and return raw results. Return the
        response as-is if return code is 200 and evaluation result is not
        an error code returned from Black Box Proxy Block.

        :param ip_address: An :type:`str` IP address.
        :param ipaddr: An :type:`str` IP address.
        """
        try:
            result = requests.get(
                self.url,
                headers={"User-Agent": "fanboi2/%s" % __VERSION__},
                params={"ip": ip_address},
                params={"ip": ipaddr},
                timeout=2,
            )
        except requests.Timeout:


@@ 56,14 65,22 @@ class GetIPIntelProxyDetector(object):
        if not self.email:
            raise ValueError("GetIPIntel require an email to be present.")

    def check(self, ip_address):
    def can_check(self, ipaddr):
        """Returns :type:`True` if this detector can check the given
        IP address.

        :param ipaddr: An :type:`str` IP address.
        """
        return ip_address(ipaddr).version == 4

    def check(self, ipaddr):
        """Request for IP evaluation and return raw results. Return the
        response as-is if return code is 200 and evaluation result is
        positive.

        :param ip_address: An :type:`str` IP address.
        :param ipaddr: An :type:`str` IP address.
        """
        params = {"contact": self.email, "ip": ip_address}
        params = {"contact": self.email, "ip": ipaddr}
        if self.flags:
            params["flags"] = self.flags
        try:


@@ 116,8 133,8 @@ class ProxyDetector(object):
        self.settings = settings
        self.cache_region = services["cache"]

    def _get_cache_key(self, provider, ip_address):
        return "filters.proxy:provider=%s,ip_address=%s" % (provider, ip_address)
    def _get_cache_key(self, provider, ipaddr):
        return "filters.proxy:provider=%s,ip_address=%s" % (provider, ipaddr)

    def should_reject(self, payload):
        """Returns :type:`True` if the given IP address is identified as


@@ 129,10 146,12 @@ class ProxyDetector(object):
        for provider, settings in self.settings.items():
            if settings["enabled"]:
                detector = DETECTOR_PROVIDERS[provider](**settings)
                ip_address = payload["ip_address"]
                ipaddr = payload["ip_address"]
                if not detector.can_check(ipaddr):
                    return False
                result = self.cache_region.get_or_create(
                    self._get_cache_key(provider, ip_address),
                    lambda: detector.check(ip_address),
                    self._get_cache_key(provider, ipaddr),
                    lambda: detector.check(ipaddr),
                    should_cache_fn=lambda v: v is not None,
                    expiration_time=21600,
                )

M fanboi2/tests/test_filters.py => fanboi2/tests/test_filters.py +12 -0
@@ 394,6 394,18 @@ class TestProxyDetector(unittest.TestCase):

    @unittest.mock.patch("fanboi2.filters.proxy.BlackBoxProxyDetector.check")
    @unittest.mock.patch("fanboi2.filters.proxy.GetIPIntelProxyDetector.check")
    def test_check_ipv6(self, getipintel_check, blackbox_check):
        from . import make_cache_region

        settings = self._make_config()
        cache_svc = make_cache_region({})
        proxy_detector = self._make_one(settings, cache_svc)
        self.assertFalse(proxy_detector.should_reject({"ip_address": "fe80:c9cd::1"}))
        assert not blackbox_check.called
        assert not getipintel_check.called

    @unittest.mock.patch("fanboi2.filters.proxy.BlackBoxProxyDetector.check")
    @unittest.mock.patch("fanboi2.filters.proxy.GetIPIntelProxyDetector.check")
    def test_check_cached(self, getipintel_check, blackbox_check):
        from . import make_cache_region