~sirn/fanboi2

071b5268d2d6b9d57e16f6e17f9a2f027e308155 — Kridsada Thanabulpong 3 months ago 722f75d
filters/dnsbl: support ipv6
2 files changed, 16 insertions(+), 5 deletions(-)

M fanboi2/filters/dnsbl.py
M fanboi2/tests/test_filters.py
M fanboi2/filters/dnsbl.py => fanboi2/filters/dnsbl.py +6 -5
@@ 1,5 1,5 @@
import socket
from ipaddress import ip_interface, ip_network
from ipaddress import ip_address, ip_interface, ip_network

from . import register_filter



@@ 21,11 21,12 @@ class DNSBL(object):
        :param payload: A filter payload.
        """
        for provider in self.providers:
            ipaddr = ip_address(payload["ip_address"])
            lookup = ".".join(ipaddr.reverse_pointer.split(".")[:-2])
            try:
                check = ".".join(reversed(payload["ip_address"].split(".")))
                res = socket.gethostbyname("%s.%s." % (check, provider))
                ipaddr = ip_interface("%s/255.0.0.0" % (res,))
                if ipaddr.network == ip_network("127.0.0.0/8"):
                res = socket.gethostbyname("%s.%s." % (lookup, provider))
                result = ip_interface("%s/255.0.0.0" % (res,))
                if result.network == ip_network("127.0.0.0/8"):
                    return True
            except (socket.gaierror, ValueError):
                continue

M fanboi2/tests/test_filters.py => fanboi2/tests/test_filters.py +10 -0
@@ 132,6 132,16 @@ class TestDNSBL(unittest.TestCase):
        lookup_call.assert_called_with("254.100.0.10.xbl.spamhaus.org.")

    @unittest.mock.patch("socket.gethostbyname")
    def test_should_reject_ipv6(self, lookup_call):
        lookup_call.return_value = "127.0.0.2"
        dnsbl = self._make_one()
        self.assertTrue(dnsbl.should_reject({"ip_address": "fe80:c9cd::1"}))
        lookup_call.assert_called_with(
            "1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0."
            + "d.c.9.c.0.8.e.f.xbl.spamhaus.org."
        )

    @unittest.mock.patch("socket.gethostbyname")
    def test_should_reject_false(self, lookup_call):
        import socket