~sirn/fanboi2

910c0ab8f63aeac33916d118dbbcdccffe6b53d3 — Kridsada Thanabulpong 4 years ago 5969b0e
Allow post filter to be configured per country.
M CHANGES.rst => CHANGES.rst +6 -1
@@ 1,5 1,10 @@
Next
====

- [Add] Allow post filter to be configured per country.

0.10.1
======
------

- [Add] A ``board`` query string for expanding board from topic API.
- [Add] A ``topic`` query string for expanding post from post API.

M examples/development.ini.sample => examples/development.ini.sample +2 -0
@@ 36,6 36,8 @@ app.proxy_detect.blackbox.url =
app.proxy_detect.getipintel.url =
app.proxy_detect.getipintel.email =
app.proxy_detect.getipintel.flags =
app.geoip2_database =
app.checklist = *:*

[server:main]
use = egg:waitress#main

M examples/production.ini.sample => examples/production.ini.sample +2 -0
@@ 32,6 32,8 @@ app.proxy_detect.blackbox.url =
app.proxy_detect.getipintel.url =
app.proxy_detect.getipintel.email =
app.proxy_detect.getipintel.flags =
app.geoip2_database =
app.checklist = *:*

[server:main]
use = egg:waitress#main

M fanboi2/__init__.py => fanboi2/__init__.py +11 -1
@@ 10,7 10,7 @@ from sqlalchemy.engine import engine_from_config
from fanboi2.cache import cache_region
from fanboi2.models import DBSession, Base, redis_conn, identity
from fanboi2.tasks import celery, configure_celery
from fanboi2.utils import akismet, dnsbl, proxy_detector
from fanboi2.utils import akismet, dnsbl, geoip, proxy_detector, checklist


def remote_addr(request):


@@ 129,12 129,18 @@ def normalize_settings(settings, _environ=os.environ):
        'APP_PROXY_DETECT_GETIPINTEL_FLAGS',
        'app.proxy_detect.getipintel.flags')

    app_geoip2_database = _cget('APP_GEOIP2_DATABASE', 'app.geoip2_database')
    app_checklist = _cget('APP_CHECKLIST', 'app.checklist')

    if app_dnsbl_providers is not None:
        app_dnsbl_providers = aslist(app_dnsbl_providers)

    if app_proxy_detect_providers is not None:
        app_proxy_detect_providers = aslist(app_proxy_detect_providers)

    if app_checklist is not None:
        app_checklist = aslist(app_checklist)

    _settings = copy.deepcopy(settings)
    _settings.update({
        'sqlalchemy.url': sqlalchemy_url,


@@ 152,6 158,8 @@ def normalize_settings(settings, _environ=os.environ):
        'app.proxy_detect.getipintel.url': app_proxy_detect_getipintel_url,
        'app.proxy_detect.getipintel.email': app_proxy_detect_getipintel_email,
        'app.proxy_detect.getipintel.flags': app_proxy_detect_getipintel_flags,
        'app.geoip2_database': app_geoip2_database,
        'app.checklist': app_checklist,
    })

    return _settings


@@ 181,6 189,8 @@ def main(global_config, **settings):  # pragma: no cover
    identity.configure_tz(config.registry.settings['app.timezone'])
    akismet.configure_key(config.registry.settings['app.akismet_key'])
    dnsbl.configure_providers(config.registry.settings['app.dnsbl_providers'])
    geoip.configure_geoip2(config.registry.settings['app.geoip2_database'])
    checklist.configure_checklist(config.registry.settings['app.checklist'])
    proxy_detector.configure_from_config(
        config.registry.settings,
        'app.proxy_detect.')

M fanboi2/tasks.py => fanboi2/tasks.py +29 -15
@@ 4,7 4,7 @@ from sqlalchemy.exc import IntegrityError
from fanboi2.errors import serialize_error
from fanboi2.models import DBSession, Post, Topic, Board, \
    RuleBan, RuleOverride, serialize_model
from fanboi2.utils import akismet, dnsbl, proxy_detector
from fanboi2.utils import akismet, dnsbl, proxy_detector, geoip, checklist

celery = Celery()



@@ 86,18 86,22 @@ def add_topic(request, board_id, title, body):
    :type body: str
    :rtype: tuple
    """
    ip_address = request['remote_addr']
    country_code = geoip.country_code(ip_address)
    country_scope = 'country:%s' % (str(country_code).lower())

    with transaction.manager:
        board = DBSession.query(Board).get(board_id)
        scope = 'board:%s' % (board.slug,)
        board_scope = 'board:%s' % (board.slug,)

        if DBSession.query(RuleBan).\
           filter(RuleBan.listed(request['remote_addr'], scopes=(scope,))).\
           filter(RuleBan.listed(ip_address, scopes=(board_scope,))).\
           count() > 0:
            return 'failure', 'ban_rejected'

        override = {}
        rule_override = DBSession.query(RuleOverride).filter(
            RuleOverride.listed(request['remote_addr'], scopes=(scope,))).\
            RuleOverride.listed(ip_address, scopes=(board_scope,))).\
            first()

        if rule_override is not None:


@@ 107,16 111,19 @@ def add_topic(request, board_id, title, body):
        if board_status != 'open':
            return 'failure', 'status_rejected', board_status

        if akismet.spam(request, body):
        if checklist.enabled(country_scope, 'akismet') and \
           akismet.spam(request, body):
            return 'failure', 'spam_rejected'

        if dnsbl.listed(request['remote_addr']):
        if checklist.enabled(country_scope, 'dnsbl') and \
           dnsbl.listed(ip_address):
            return 'failure', 'dnsbl_rejected'

        if proxy_detector.detect(request['remote_addr']):
        if checklist.enabled(country_scope, 'proxy_detect') and \
           proxy_detector.detect(ip_address):
            return 'failure', 'proxy_rejected'

        post = Post(body=body, ip_address=request['remote_addr'])
        post = Post(body=body, ip_address=ip_address)
        post.topic = Topic(board=board, title=title)
        DBSession.add(post)
        DBSession.flush()


@@ 141,13 148,17 @@ def add_post(self, request, topic_id, body, bumped):
    :type bumped: bool
    :rtype: tuple
    """
    ip_address = request['remote_addr']
    country_code = geoip.country_code(ip_address)
    country_scope = 'country:%s' % (str(country_code).lower())

    with transaction.manager:
        topic = DBSession.query(Topic).get(topic_id)
        board = topic.board
        scope = 'board:%s' % (board.slug,)
        board_scope = 'board:%s' % (board.slug,)

        if DBSession.query(RuleBan).\
           filter(RuleBan.listed(request['remote_addr'], scopes=(scope,))).\
           filter(RuleBan.listed(ip_address, scopes=(board_scope,))).\
           count() > 0:
            return 'failure', 'ban_rejected'



@@ 156,7 167,7 @@ def add_post(self, request, topic_id, body, bumped):

        override = {}
        rule_override = DBSession.query(RuleOverride).filter(
            RuleOverride.listed(request['remote_addr'], scopes=(scope,))).\
            RuleOverride.listed(ip_address, scopes=(board_scope,))).\
            first()

        if rule_override is not None:


@@ 166,20 177,23 @@ def add_post(self, request, topic_id, body, bumped):
        if not board_status in ('open', 'restricted'):
            return 'failure', 'status_rejected', board_status

        if akismet.spam(request, body):
        if checklist.enabled(country_scope, 'akismet') and \
           akismet.spam(request, body):
            return 'failure', 'spam_rejected'

        if dnsbl.listed(request['remote_addr']):
        if checklist.enabled(country_scope, 'dnsbl') and \
           dnsbl.listed(ip_address):
            return 'failure', 'dnsbl_rejected'

        if proxy_detector.detect(request['remote_addr']):
        if checklist.enabled(country_scope, 'proxy_detect') and \
           proxy_detector.detect(ip_address):
            return 'failure', 'proxy_rejected'

        post = Post(
            topic=topic,
            body=body,
            bumped=bumped,
            ip_address=request['remote_addr'])
            ip_address=ip_address)

        try:
            DBSession.add(post)

M fanboi2/tests/test_app.py => fanboi2/tests/test_app.py +31 -12
@@ 184,6 184,8 @@ class TestNormalizeSettings(unittest.TestCase):
        self.assertEqual(result['app.proxy_detect.getipintel.url'], '')
        self.assertEqual(result['app.proxy_detect.getipintel.email'], '')
        self.assertEqual(result['app.proxy_detect.getipintel.flags'], '')
        self.assertEqual(result['app.geoip2_database'], '')
        self.assertEqual(result['app.checklist'], [])

    def test_settings(self):
        r = self._makeOne({


@@ 202,6 204,8 @@ class TestNormalizeSettings(unittest.TestCase):
            'app.proxy_detect.getipintel.url': 'http://www.example.com/bar',
            'app.proxy_detect.getipintel.email': 'test@example.com',
            'app.proxy_detect.getipintel.flags': 'm',
            'app.geoip2_database': '/var/geoip2/database',
            'app.checklist': 'country:th/\ncountry:jp/proxy_detect */*',
        })

        self.assertEqual(r['sqlalchemy.url'], 'postgresql://localhost:5432/foo')


@@ 234,10 238,13 @@ class TestNormalizeSettings(unittest.TestCase):
            r['app.proxy_detect.getipintel.email'],
            'test@example.com'
        )
        self.assertEqual(
            r['app.proxy_detect.getipintel.flags'],
            'm'
        )
        self.assertEqual(r['app.proxy_detect.getipintel.flags'], 'm')
        self.assertEqual(r['app.geoip2_database'], '/var/geoip2/database')
        self.assertEqual(r['app.checklist'], [
            'country:th/',
            'country:jp/proxy_detect',
            '*/*',
        ])

    def test_environ(self):
        r = self._makeOne({}, environ={


@@ 256,6 263,8 @@ class TestNormalizeSettings(unittest.TestCase):
            'APP_PROXY_DETECT_GETIPINTEL_URL': 'http://www.example.com/bar',
            'APP_PROXY_DETECT_GETIPINTEL_EMAIL': 'test@example.com',
            'APP_PROXY_DETECT_GETIPINTEL_FLAGS': 'm',
            'APP_GEOIP2_DATABASE': '/var/geoip2/database',
            'APP_CHECKLIST': 'country:th/\ncountry:jp/proxy_detect */*',
        })

        self.assertEqual(r['sqlalchemy.url'], 'postgresql://localhost:5432/foo')


@@ 288,10 297,13 @@ class TestNormalizeSettings(unittest.TestCase):
            r['app.proxy_detect.getipintel.email'],
            'test@example.com'
        )
        self.assertEqual(
            r['app.proxy_detect.getipintel.flags'],
            'm'
        )
        self.assertEqual(r['app.proxy_detect.getipintel.flags'], 'm')
        self.assertEqual(r['app.geoip2_database'], '/var/geoip2/database')
        self.assertEqual(r['app.checklist'], [
            'country:th/',
            'country:jp/proxy_detect',
            '*/*',
        ])

    def test_override(self):
        r = self._makeOne({


@@ 310,6 322,8 @@ class TestNormalizeSettings(unittest.TestCase):
            'app.proxy_detect.getipintel.url': 'http://www.example.com/bar',
            'app.proxy_detect.getipintel.email': 'test@example.com',
            'app.proxy_detect.getipintel.flags': 'm',
            'app.geoip2_database': '/var/geoip2/database1',
            'app.checklist': '*/*',
        }, environ={
            'SQLALCHEMY_URL': 'postgresql://localhost:5432/baz',
            'REDIS_URL': 'redis://127.0.0.2:6379/0',


@@ 326,6 340,8 @@ class TestNormalizeSettings(unittest.TestCase):
            'APP_PROXY_DETECT_GETIPINTEL_URL': 'http://www.example.com/buz',
            'APP_PROXY_DETECT_GETIPINTEL_EMAIL': 'fuzz@example.com',
            'APP_PROXY_DETECT_GETIPINTEL_FLAGS': 'f',
            'APP_GEOIP2_DATABASE': '/var/geoip2/database2',
            'APP_CHECKLIST': 'country:th/\ncountry:jp/proxy_detect */*',
        })

        self.assertEqual(r['sqlalchemy.url'], 'postgresql://localhost:5432/baz')


@@ 358,7 374,10 @@ class TestNormalizeSettings(unittest.TestCase):
            r['app.proxy_detect.getipintel.email'],
            'fuzz@example.com'
        )
        self.assertEqual(
            r['app.proxy_detect.getipintel.flags'],
            'f'
        )
        self.assertEqual(r['app.proxy_detect.getipintel.flags'], 'f')
        self.assertEqual(r['app.geoip2_database'], '/var/geoip2/database2')
        self.assertEqual(r['app.checklist'], [
            'country:th/',
            'country:jp/proxy_detect',
            '*/*',
        ])

M fanboi2/tests/test_tasks.py => fanboi2/tests/test_tasks.py +113 -1
@@ 242,6 242,24 @@ class TestAddTopicTask(TaskMixin, ModelMixin, unittest.TestCase):
        self.assertEqual(DBSession.query(Topic).count(), 0)
        self.assertEqual(result.result, ('failure', 'spam_rejected'))

    @unittest.mock.patch('fanboi2.utils.Akismet.spam')
    @unittest.mock.patch('fanboi2.utils.Checklist.enabled')
    def test_add_topic_spam_disabled(self, checklist, akismet):
        import transaction
        from fanboi2.models import Topic
        def disable_akismet(scope, target):
            return target != 'akismet'
        checklist.side_effect = disable_akismet
        akismet.return_value = True
        request = {'remote_addr': '127.0.0.1'}
        with transaction.manager:
            board = self._makeBoard(title='Foobar', slug='foobar')
            board_id = board.id  # board is not bound outside transaction!
        result = self._makeOne(request, board_id, 'Foobar', 'Hello, world!')
        self.assertTrue(result.successful())
        self.assertEqual(DBSession.query(Topic).count(), 1)
        akismet.assert_not_called()

    @unittest.mock.patch('fanboi2.utils.Dnsbl.listed')
    def test_add_topic_dnsbl(self, dnsbl):
        from fanboi2.models import Topic


@@ 255,6 273,24 @@ class TestAddTopicTask(TaskMixin, ModelMixin, unittest.TestCase):
        self.assertEqual(DBSession.query(Topic).count(), 0)
        self.assertEqual(result.result, ('failure', 'dnsbl_rejected'))

    @unittest.mock.patch('fanboi2.utils.Dnsbl.listed')
    @unittest.mock.patch('fanboi2.utils.Checklist.enabled')
    def test_add_topic_dnsbl_disabled(self, checklist, dnsbl):
        import transaction
        from fanboi2.models import Topic
        def disable_dnsbl(scope, target):
            return target != 'dnsbl'
        checklist.side_effect = disable_dnsbl
        dnsbl.return_value = True
        request = {'remote_addr': '127.0.0.1'}
        with transaction.manager:
            board = self._makeBoard(title='Foobar', slug='foobar')
            board_id = board.id  # board is not bound outside transaction!
        result = self._makeOne(request, board_id, 'Foobar', 'Hello, world!')
        self.assertTrue(result.successful())
        self.assertEqual(DBSession.query(Topic).count(), 1)
        dnsbl.assert_not_called()

    @unittest.mock.patch('fanboi2.utils.ProxyDetector.detect')
    def test_add_topic_proxy(self, proxy):
        from fanboi2.models import Topic


@@ 268,6 304,24 @@ class TestAddTopicTask(TaskMixin, ModelMixin, unittest.TestCase):
        self.assertEqual(DBSession.query(Topic).count(), 0)
        self.assertEqual(result.result, ('failure', 'proxy_rejected'))

    @unittest.mock.patch('fanboi2.utils.ProxyDetector.detect')
    @unittest.mock.patch('fanboi2.utils.Checklist.enabled')
    def test_add_topic_proxy_disabled(self, checklist, proxy):
        import transaction
        from fanboi2.models import Topic
        def disable_proxy_detect(scope, target):
            return target != 'proxy_detect'
        checklist.side_effect = disable_proxy_detect
        proxy.return_value = True
        request = {'remote_addr': '127.0.0.1'}
        with transaction.manager:
            board = self._makeBoard(title='Foobar', slug='foobar')
            board_id = board.id  # board is not bound outside transaction!
        result = self._makeOne(request, board_id, 'Foobar', 'Hello, world!')
        self.assertTrue(result.successful())
        self.assertEqual(DBSession.query(Topic).count(), 1)
        proxy.assert_not_called()


class TestAddPostTask(TaskMixin, ModelMixin, unittest.TestCase):



@@ 485,8 539,28 @@ class TestAddPostTask(TaskMixin, ModelMixin, unittest.TestCase):
        self.assertEqual(DBSession.query(Post).count(), 0)
        self.assertEqual(result.result, ('failure', 'spam_rejected'))

    @unittest.mock.patch('fanboi2.utils.Akismet.spam')
    @unittest.mock.patch('fanboi2.utils.Checklist.enabled')
    def test_add_post_spam_disabled(self, checklist, akismet):
        import transaction
        from fanboi2.models import Post
        def disable_akismet(scope, target):
            return target != 'akismet'
        checklist.side_effect = disable_akismet
        akismet.return_value = True
        request = {'remote_addr': '127.0.0.1'}
        with transaction.manager:
            board = self._makeBoard(title='Foobar', slug='foobar')
            topic = self._makeTopic(board=board, title='Hello, world!')
            topic_id = topic.id  # topic is not bound outside transaction!
        result = self._makeOne(request, topic_id, 'Hi!', True)
        self.assertTrue(result.successful())
        self.assertEqual(DBSession.query(Post).count(), 1)
        akismet.assert_not_called()

    @unittest.mock.patch('fanboi2.utils.Dnsbl.listed')
    def test_add_post_dnsbl(self, dnsbl):
    @unittest.mock.patch('fanboi2.utils.Checklist.enabled')
    def test_add_post_dnsbl(self, checklist, dnsbl):
        import transaction
        from fanboi2.models import Post
        dnsbl.return_value = True


@@ 500,6 574,25 @@ class TestAddPostTask(TaskMixin, ModelMixin, unittest.TestCase):
        self.assertEqual(DBSession.query(Post).count(), 0)
        self.assertEqual(result.result, ('failure', 'dnsbl_rejected'))

    @unittest.mock.patch('fanboi2.utils.Dnsbl.listed')
    @unittest.mock.patch('fanboi2.utils.Checklist.enabled')
    def test_add_post_dnsbl_disabled(self, checklist, dnsbl):
        import transaction
        from fanboi2.models import Post
        def disable_dnsbl(scope, target):
            return target != 'dnsbl'
        checklist.side_effect = disable_dnsbl
        dnsbl.return_value = True
        request = {'remote_addr': '127.0.0.1'}
        with transaction.manager:
            board = self._makeBoard(title='Foobar', slug='foobar')
            topic = self._makeTopic(board=board, title='Hello, world!')
            topic_id = topic.id  # topic is not bound outside transaction!
        result = self._makeOne(request, topic_id, 'Hi!', True)
        self.assertTrue(result.successful())
        self.assertEqual(DBSession.query(Post).count(), 1)
        dnsbl.assert_not_called()

    @unittest.mock.patch('fanboi2.utils.ProxyDetector.detect')
    def test_add_post_proxy(self, proxy):
        import transaction


@@ 515,6 608,25 @@ class TestAddPostTask(TaskMixin, ModelMixin, unittest.TestCase):
        self.assertEqual(DBSession.query(Post).count(), 0)
        self.assertEqual(result.result, ('failure', 'proxy_rejected'))

    @unittest.mock.patch('fanboi2.utils.ProxyDetector.detect')
    @unittest.mock.patch('fanboi2.utils.Checklist.enabled')
    def test_add_post_proxy_disabled(self, checklist, proxy):
        import transaction
        from fanboi2.models import Post
        def disable_proxy_detect(scope, target):
            return target != 'proxy_detect'
        checklist.side_effect = disable_proxy_detect
        proxy.return_value = True
        request = {'remote_addr': '127.0.0.1'}
        with transaction.manager:
            board = self._makeBoard(title='Foobar', slug='foobar')
            topic = self._makeTopic(board=board, title='Hello, world!')
            topic_id = topic.id  # topic is not bound outside transaction!
        result = self._makeOne(request, topic_id, 'Hi!', True)
        self.assertTrue(result.successful())
        self.assertEqual(DBSession.query(Post).count(), 1)
        proxy.assert_not_called()

    def test_add_post_retry(self):
        import transaction
        from sqlalchemy.exc import IntegrityError

M fanboi2/tests/test_utils.py => fanboi2/tests/test_utils.py +139 -0
@@ 84,6 84,145 @@ class TestDnsBl(unittest.TestCase):
        self.assertEqual(dnsbl.listed('10.0.100.2'), False)


class TestGeoIP(unittest.TestCase):

    def _makeOne(self, providers=None):
        from fanboi2.utils import GeoIP
        geoip = GeoIP()
        return geoip

    def _makeGeoIP2(self, country_code=None):
        class MockGeoIP2Response(object):
            @property
            def country_code(self):
                return country_code

        class MockGeoIP2(object):
            def country(self, ip_address):
                return MockGeoIP2Response()

        return MockGeoIP2()

    @unittest.mock.patch('geoip2.database.Reader')
    def test_init(self, reader):
        reader.return_value = self._makeGeoIP2()
        geoip = self._makeOne()
        geoip.configure_geoip2('/tmp/some/path')

    @unittest.mock.patch('geoip2.database.Reader')
    def test_init_no_path(self, reader):
        geoip = self._makeOne()
        geoip.configure_geoip2(None)
        assert not reader.called

    @unittest.mock.patch('geoip2.database.Reader')
    def test_init_file_not_found(self, reader):
        reader.side_effect = FileNotFoundError
        geoip = self._makeOne()
        geoip.configure_geoip2('/tmp/some/path')
        assert not geoip.geoip2

    @unittest.mock.patch('geoip2.database.Reader')
    def test_init_invalid_database(self, reader):
        from maxminddb.errors import InvalidDatabaseError
        reader.side_effect = InvalidDatabaseError
        geoip = self._makeOne()
        geoip.configure_geoip2('/tmp/some/path')
        assert not geoip.geoip2

    @unittest.mock.patch('geoip2.database.Reader')
    def test_country_code(self, reader):
        reader.return_value = self._makeGeoIP2(country_code='TH')
        geoip = self._makeOne()
        geoip.configure_geoip2('/tmp/some/path')
        self.assertEqual(geoip.country_code('127.0.0.1'), 'TH')

    @unittest.mock.patch('geoip2.database.Reader')
    def test_country_code_not_inited(self, reader):
        reader.return_value = self._makeGeoIP2()
        geoip = self._makeOne()
        self.assertIsNone(geoip.country_code('127.0.0.1'))

    @unittest.mock.patch('geoip2.database.Reader')
    def test_country_code_address_not_found(self, reader):
        from geoip2.errors import AddressNotFoundError
        class MockNotFoundGeoIP2(object):
            def country(self, ip_address):
                raise AddressNotFoundError
        reader.return_value = MockNotFoundGeoIP2()
        geoip = self._makeOne()
        geoip.configure_geoip2('/tmp/some/path')
        self.assertIsNone(geoip.country_code('127.0.0.1'))

    @unittest.mock.patch('geoip2.database.Reader')
    def test_country_code_not_found(self, reader):
        reader.return_value = self._makeGeoIP2()
        geoip = self._makeOne()
        geoip.configure_geoip2('/tmp/some/path')
        self.assertIsNone(geoip.country_code('127.0.0.1'))


class TestChecklist(unittest.TestCase):

    def _makeOne(self, data=None):
        from fanboi2.utils import Checklist
        checklist = Checklist()
        checklist.configure_checklist(data)
        return checklist

    def test_init(self):
        checklist = self._makeOne(['scope1/', 'scope2/foo,bar', '*/*'])
        self.assertDictEqual(checklist.data, {
            'scope1': [],
            'scope2': ['foo', 'bar'],
            '*': ['*'],
        })

    def test_init_none(self):
        checklist = self._makeOne(None)
        self.assertDictEqual(checklist.data, {})

    def test_init_empty(self):
        checklist = self._makeOne('')
        self.assertDictEqual(checklist.data, {})

    def test_fetch(self):
        checklist = self._makeOne(['scope1/', 'scope2/foo,bar', '*/*'])
        self.assertListEqual(checklist.fetch('scope1'), [])
        self.assertListEqual(checklist.fetch('scope2'), ['foo', 'bar'])
        self.assertListEqual(checklist.fetch('scope3'), ['*'])

    def test_fetch_empty(self):
        checklist = self._makeOne([])
        self.assertListEqual(checklist.fetch('scope1'), ['*'])
        self.assertListEqual(checklist.fetch('scope2'), ['*'])
        self.assertListEqual(checklist.fetch('scope3'), ['*'])

    def test_enabled(self):
        checklist = self._makeOne(['scope1/', 'scope2/foo,bar', '*/*'])
        self.assertFalse(checklist.enabled('scope1', 'foo'))
        self.assertFalse(checklist.enabled('scope1', 'bar'))
        self.assertFalse(checklist.enabled('scope1', 'baz'))
        self.assertTrue(checklist.enabled('scope2', 'foo'))
        self.assertTrue(checklist.enabled('scope2', 'bar'))
        self.assertFalse(checklist.enabled('scope2', 'baz'))
        self.assertTrue(checklist.enabled('scope3', 'foo'))
        self.assertTrue(checklist.enabled('scope3', 'bar'))
        self.assertTrue(checklist.enabled('scope3', 'baz'))

    def test_enabled_empty(self):
        checklist = self._makeOne([])
        self.assertTrue(checklist.enabled('scope1', 'foo'))
        self.assertTrue(checklist.enabled('scope1', 'bar'))
        self.assertTrue(checklist.enabled('scope1', 'baz'))
        self.assertTrue(checklist.enabled('scope2', 'foo'))
        self.assertTrue(checklist.enabled('scope2', 'bar'))
        self.assertTrue(checklist.enabled('scope2', 'baz'))
        self.assertTrue(checklist.enabled('scope3', 'foo'))
        self.assertTrue(checklist.enabled('scope3', 'bar'))
        self.assertTrue(checklist.enabled('scope3', 'baz'))


class TestAkismet(RegistryMixin, unittest.TestCase):

    def _makeOne(self, key='hogehoge'):

M fanboi2/utils/__init__.py => fanboi2/utils/__init__.py +4 -0
@@ 1,10 1,14 @@
from .akismet import Akismet
from .dnsbl import Dnsbl
from .geoip import GeoIP
from .proxy import ProxyDetector
from .rate_limiter import RateLimiter
from .checklist import Checklist
from .request import serialize_request


dnsbl = Dnsbl()
akismet = Akismet()
proxy_detector = ProxyDetector()
geoip = GeoIP()
checklist = Checklist()

A fanboi2/utils/checklist.py => fanboi2/utils/checklist.py +42 -0
@@ 0,0 1,42 @@
class Checklist(object):
    """Utility for evaluating checklist."""

    def __init__(self):
        self.data = {}

    def configure_checklist(self, config):
        """Configure the checklist with the given ``config``.

        :param config: List of strings containing checklist configuration.
        :type config: str[]
        """
        if config:
            for data in config:
                scope, items = data.split('/', 1)
                self.data[scope] = [r for r in items.split(',') if r]

    def fetch(self, scope):
        """Fetch the enabled rules according to the given scope. This method
        will return ``['*']`` if no rules was defined for the given scope.
        In which the application should treat such rule as enable all.

        :param scope: Name of the scope to use for lookup.
        :type scope: str
        :rtype: str[]
        """
        data = self.data.get(scope, None)
        if data is None:
            data = self.data.get('*', ['*'])
        return data

    def enabled(self, scope, target):
        """Check if the given ``target`` was enabled in the given ``scope``.

        :param scope: Name of the scope to use for lookup.
        :param target: Name of the target rule to check against.
        :type scope: str
        :type target: str
        :rtype: bool
        """
        items = self.fetch(scope)
        return target in items or '*' in items

M fanboi2/utils/dnsbl.py => fanboi2/utils/dnsbl.py +1 -1
@@ 2,7 2,7 @@ import socket
from ipaddress import ip_interface, ip_network


class Dnsbl(object) :
class Dnsbl(object):
    """Utility class for checking IP address against DNSBL providers."""

    def __init__(self):

A fanboi2/utils/geoip.py => fanboi2/utils/geoip.py +48 -0
@@ 0,0 1,48 @@
import logging
import geoip2.database
from maxminddb.errors import InvalidDatabaseError
from geoip2.errors import AddressNotFoundError


log = logging.getLogger(__name__)


class GeoIP(object):
    """Utility for looking up IP address against GeoIP database."""

    def __init__(self):
        self.geoip2 = None

    def configure_geoip2(self, path):
        """Configure and initialize GeoIP2 database with the given ``path``
        and warn if GeoIP2 database could not be loaded. If no GeoIP2 database
        could be initialized, all of its operations will return :type:`None`.

        :param path: Path to the GeoIP2 database file (mmdb).
        :type path: str
        """
        if path is not None:
            try:
                self.geoip2 = geoip2.database.Reader(path)
            except (FileNotFoundError, InvalidDatabaseError):
                pass
        if self.geoip2 is not None:
            return
        log.warn(
            'GeoIP2 database does not exists or invalid. ' +
            'Functionalities relying on GeoIP will not work.')

    def country_code(self, ip_address):
        """Resolve the given ``ip_address`` to 2-letter country code.

        :param ip_address: IP address to lookup.
        :type ip_address: str
        """
        response = None
        if self.geoip2 is not None:
            try:
                response = self.geoip2.country(ip_address)
            except AddressNotFoundError:
                pass
        if response is not None:
            return response.country_code

M setup.py => setup.py +1 -0
@@ 28,6 28,7 @@ requires = [
    'python3-memcached',
    'pytz',
    'requests',
    'geoip2',

    # Frontend
    'MarkupSafe',