0d1854fbe7e6f801c0a938be9d4aab6aa6f14062 — Kridsada Thanabulpong 2 months ago 7957648
Evaluate scope with banwords.
M fanboi2/services/banword.py => fanboi2/services/banword.py +7 -4
@@ 58,15 58,18 @@ .order_by(desc(Banword.id))
          )
  
-     def is_banned(self, text):
+     def is_banned(self, text, scopes=None):
          """Verify whether the given text includes any of the banwords.
  
          :param text: A text to check.
          """
+         if not scopes:
+             scopes = {}
          for banword in self.list_active():
-             banword_re = re.compile(banword.expr)
-             if banword_re.search(text):
-                 return True
+             if not banword.scope or self.scope_svc.evaluate(banword.scope, scopes):
+                 banword_re = re.compile(banword.expr)
+                 if banword_re.search(text):
+                     return True
          return False
  
      def banword_from_id(self, id_):

M fanboi2/tests/test_integrations_api.py => fanboi2/tests/test_integrations_api.py +41 -0
@@ 509,6 509,47 @@ from . import mock_service
  
          board = self._make(Board(title="Foobar", slug="foo"))
+         self._make(Banword(expr="https?:\\/\\/bit\\.ly", scope="board:foo"))
+         self.dbsession.commit()
+         request = mock_service(
+             self.request,
+             {
+                 IBanQueryService: BanQueryService(self.dbsession, ScopeService()),
+                 IBanwordQueryService: BanwordQueryService(
+                     self.dbsession, ScopeService()
+                 ),
+                 IBoardQueryService: BoardQueryService(self.dbsession),
+             },
+         )
+         request.method = "POST"
+         request.matchdict["board"] = board.slug
+         request.content_type = "application/json"
+         request.client_addr = "127.0.0.1"
+         request.json_body = {}
+         request.json_body["title"] = "title"
+         request.json_body["body"] = "foo\nhttps://bit.ly/spam\nbar"
+         with self.assertRaises(BanwordRejectedError):
+             board_topics_post(request)
+         self.assertEqual(self.dbsession.query(Topic).count(), 0)
+ 
+     def test_board_topics_post_banword_banned_unscoped(self):
+         from ..errors import BanwordRejectedError
+         from ..interfaces import (
+             IBanQueryService,
+             IBanwordQueryService,
+             IBoardQueryService,
+         )
+         from ..models import Banword, Board, Topic
+         from ..services import (
+             BoardQueryService,
+             BanQueryService,
+             BanwordQueryService,
+             ScopeService,
+         )
+         from ..views.api import board_topics_post
+         from . import mock_service
+ 
+         board = self._make(Board(title="Foobar", slug="foo"))
          self._make(Banword(expr="https?:\\/\\/bit\\.ly"))
          self.dbsession.commit()
          request = mock_service(

M fanboi2/tests/test_integrations_board.py => fanboi2/tests/test_integrations_board.py +43 -0
@@ 813,6 813,49 @@ from . import mock_service
  
          board = self._make(Board(title="Foobar", slug="foo"))
+         self._make(Banword(expr="https?:\\/\\/bit\\.ly", scope="board:foo"))
+         self.dbsession.commit()
+         request = mock_service(
+             self.request,
+             {
+                 IBoardQueryService: BoardQueryService(self.dbsession),
+                 IBanQueryService: BanQueryService(self.dbsession, ScopeService()),
+                 IBanwordQueryService: BanwordQueryService(
+                     self.dbsession, ScopeService()
+                 ),
+             },
+         )
+         request.method = "POST"
+         request.matchdict["board"] = board.slug
+         request.content_type = "application/x-www-form-urlencoded"
+         request.session = testing.DummySession()
+         request.client_addr = "127.0.0.1"
+         request.POST = MultiDict({})
+         request.POST["title"] = "title"
+         request.POST["body"] = "foo\nhttps://bit.ly/spam\nbar"
+         request.POST["csrf_token"] = request.session.get_csrf_token()
+         renderer = self.config.testing_add_renderer("boards/new_error.mako")
+         board_new_post(request)
+         renderer.assert_(request=request, board=board, name="banword_rejected")
+         self.assertEqual(self.dbsession.query(Topic).count(), 0)
+ 
+     def test_board_new_post_banword_banned_unscoped(self):
+         from ..interfaces import (
+             IBanQueryService,
+             IBanwordQueryService,
+             IBoardQueryService,
+         )
+         from ..models import Board, Topic, Banword
+         from ..services import (
+             BoardQueryService,
+             BanQueryService,
+             BanwordQueryService,
+             ScopeService,
+         )
+         from ..views.boards import board_new_post
+         from . import mock_service
+ 
+         board = self._make(Board(title="Foobar", slug="foo"))
          self._make(Banword(expr="https?:\\/\\/bit\\.ly"))
          self.dbsession.commit()
          request = mock_service(

M fanboi2/tests/test_services_ban.py => fanboi2/tests/test_services_ban.py +9 -0
@@ 143,6 143,15 @@ self.assertFalse(ban_query_svc.is_banned("10.0.4.255"))
          self.assertFalse(ban_query_svc.is_banned("10.0.5.1"))
  
+     def test_is_banned_unset(self):
+         from ..models import Ban
+ 
+         self._make(Ban(ip_address="10.0.4.0/24"))
+         self.dbsession.commit()
+         ban_query_svc = self._make_one(True)
+         self.assertTrue(ban_query_svc.is_banned("10.0.4.1", {"foo": "bar"}))
+         self.assertTrue(ban_query_svc.is_banned("10.0.4.255", {"foo": "bar"}))
+ 
      def test_is_banned_inactive(self):
          from ..models import Ban
  

M fanboi2/tests/test_services_banword.py => fanboi2/tests/test_services_banword.py +40 -7
@@ 82,18 82,51 @@ def test_is_banned(self):
          from ..models import Banword
  
-         self._make(Banword(expr=r"https?:\/\/bit\.ly"))
+         self._make(Banword(expr=r"https?:\/\/bit\.ly", scope="foo:bar"))
          self._make(Banword(expr=r"https?:\/\/goo\.gl"))
-         self._make(Banword(expr=r"https?:\/\/youtu\.be", active=False))
-         self._make(Banword(expr=r"https?:\/\/example\.com", active=False))
          self.dbsession.commit()
-         banword_query_svc = self._make_one()
-         self.assertTrue(banword_query_svc.is_banned("a\nb\nhttps://bit.ly/Spam\nd"))
+         banword_query_svc = self._make_one(True)
+         self.assertTrue(
+             banword_query_svc.is_banned("a\nb\nhttps://bit.ly/Spam\nd", {"foo": "bar"})
+         )
          self.assertTrue(banword_query_svc.is_banned("a\nb\nhttps://goo.gl/Spam\nd"))
-         self.assertFalse(banword_query_svc.is_banned("a\nb\nhttps://youtu.be/Spam\nd"))
+         self.assertTrue(banword_query_svc.is_banned("a\nb\nhttps://bit.ly/Spam\nd"))
+ 
+     def test_is_banned_non_scope(self):
+         from ..models import Banword
+ 
+         self._make(Banword(expr=r"https?:\/\/bit\.ly", scope="foo:bar"))
+         self.dbsession.commit()
+         banword_query_svc = self._make_one(False)
          self.assertFalse(
-             banword_query_svc.is_banned("a\nb\nhttps://example.com/Spam\nd")
+             banword_query_svc.is_banned("a\nb\nhttps://bit.ly/Spam\nd", {"foo": "bar"})
          )
+         self.assertFalse(banword_query_svc.is_banned("a\nb\nhttps://bit.ly/Spam\nd"))
+         self.assertFalse(banword_query_svc.is_banned("a\nb\nhttps://goo.gl/Spam\nd"))
+ 
+     def test_is_banned_unset(self):
+         from ..models import Banword
+ 
+         self._make(Banword(expr=r"https?:\/\/goo\.gl"))
+         self._make(Banword(expr=r"https?:\/\/bit\.ly"))
+         self.dbsession.commit()
+         banword_query_svc = self._make_one(True)
+         self.assertTrue(
+             banword_query_svc.is_banned("a\nb\nhttps://goo.gl/Spam\nd", {"foo": "bar"})
+         )
+         self.assertTrue(
+             banword_query_svc.is_banned("a\nb\nhttps://bit.ly/Spam\nd", {"foo": "bar"})
+         )
+ 
+     def test_is_banned_inactive(self):
+         from ..models import Banword
+ 
+         self._make(Banword(expr=r"https?:\/\/bit\.ly", active=False))
+         self._make(Banword(expr=r"https?:\/\/goo\.gl", active=False))
+         self.dbsession.commit()
+         banword_query_svc = self._make_one(True)
+         self.assertFalse(banword_query_svc.is_banned("a\nb\nhttps://bit.ly/Spam\nd"))
+         self.assertFalse(banword_query_svc.is_banned("a\nb\nhttps://goo.gl/Spam\nd"))
  
      def test_banword_from_id(self):
          from ..models import Banword

M fanboi2/views/api.py => fanboi2/views/api.py +6 -6
@@ 91,11 91,12 @@ raise ParamsInvalidError(form.errors)
  
      ban_query_svc = request.find_service(IBanQueryService)
-     if ban_query_svc.is_banned(request.client_addr, scopes={"board": board.slug}):
+     ban_scope = {"board": board.slug}
+     if ban_query_svc.is_banned(request.client_addr, scopes=ban_scope):
          raise BanRejectedError()
  
      banword_query_svc = request.find_service(IBanwordQueryService)
-     if banword_query_svc.is_banned(form.body.data):
+     if banword_query_svc.is_banned(form.body.data, scopes=ban_scope):
          raise BanwordRejectedError()
  
      rate_limiter_svc = request.find_service(IRateLimiterService)


@@ 182,13 183,12 @@ raise ParamsInvalidError(form.errors)
  
      ban_query_svc = request.find_service(IBanQueryService)
-     if ban_query_svc.is_banned(
-         request.client_addr, scopes={"board": board.slug, "topic": topic.title}
-     ):
+     ban_scope = {"board": board.slug, "topic": topic.title}
+     if ban_query_svc.is_banned(request.client_addr, scopes=ban_scope):
          raise BanRejectedError()
  
      banword_query_svc = request.find_service(IBanwordQueryService)
-     if banword_query_svc.is_banned(form.body.data):
+     if banword_query_svc.is_banned(form.body.data, scopes=ban_scope):
          raise BanwordRejectedError()
  
      rate_limiter_svc = request.find_service(IRateLimiterService)

M fanboi2/views/boards.py => fanboi2/views/boards.py +6 -6
@@ 120,7 120,8 @@ return {"board": board, "form": form}
  
      ban_query_svc = request.find_service(IBanQueryService)
-     if ban_query_svc.is_banned(request.client_addr, scopes={"board": board.slug}):
+     ban_scope = {"board": board.slug}
+     if ban_query_svc.is_banned(request.client_addr, scopes=ban_scope):
          response = render_to_response(
              "boards/new_error.mako",
              {"board": board, "name": "ban_rejected"},


@@ 130,7 131,7 @@ return response
  
      banword_query_svc = request.find_service(IBanwordQueryService)
-     if banword_query_svc.is_banned(form.body.data):
+     if banword_query_svc.is_banned(form.body.data, scopes=ban_scope):
          response = render_to_response(
              "boards/new_error.mako",
              {"board": board, "name": "banword_rejected"},


@@ 283,9 284,8 @@ return {"board": board, "topic": topic, "form": form}
  
      ban_query_svc = request.find_service(IBanQueryService)
-     if ban_query_svc.is_banned(
-         request.client_addr, scopes={"board": board.slug, "topic": topic.title}
-     ):
+     ban_scope = {"board": board.slug, "topic": topic.title}
+     if ban_query_svc.is_banned(request.client_addr, scopes=ban_scope):
          response = render_to_response(
              "topics/show_error.mako",
              {"board": board, "topic": topic, "name": "ban_rejected"},


@@ 295,7 295,7 @@ return response
  
      banword_query_svc = request.find_service(IBanwordQueryService)
-     if banword_query_svc.is_banned(form.body.data):
+     if banword_query_svc.is_banned(form.body.data, scopes=ban_scope):
          response = render_to_response(
              "topics/show_error.mako",
              {"board": board, "topic": topic, "name": "banword_rejected"},