From 63a4d23d2eed6ae118626e3029dd92e48e171315 Mon Sep 17 00:00:00 2001 From: mohab Date: Wed, 6 Mar 2024 16:45:04 +0200 Subject: [PATCH] chore(BREAKING CHANGE): refactor replyAndDismiss() inside reply_and_dismiss refactor getReplyFromDB() inside database.py add environment_variables to consolidate all .env lookups rewrite util.py into toot_info.py rewrite tests and test_file names accordingly --- connect_to_mastodon.py | 17 ++--- custom_stream_listener.py | 15 +++- db.py | 143 -------------------------------------- fightlike.py | 16 ++--- reply_and_dismiss.py | 48 ++++--------- test_db.py | 84 ---------------------- test_util.py | 33 --------- util.py | 43 ------------ 8 files changed, 38 insertions(+), 361 deletions(-) delete mode 100644 db.py delete mode 100644 test_db.py delete mode 100644 test_util.py delete mode 100644 util.py diff --git a/connect_to_mastodon.py b/connect_to_mastodon.py index 87fcbc7..f455d41 100644 --- a/connect_to_mastodon.py +++ b/connect_to_mastodon.py @@ -1,23 +1,14 @@ -import os -from dotenv import load_dotenv +from environment_variables import getEnvironmentVariables from mastodon import Mastodon -def getAPIBaseURL(): - # Load environment variables. - try: - load_dotenv() - api_base_url = os.getenv("API_BASE_URL") - except Exception as e: - print(e) - - return api_base_url +def getMastodonConnection(): + api_base_url: str = getEnvironmentVariables().api_base_url -def getMastodonConnection(): mastodon = Mastodon( access_token="mastodon_access_token.secret", - api_base_url=getAPIBaseURL() + api_base_url=api_base_url ) return mastodon diff --git a/custom_stream_listener.py b/custom_stream_listener.py index 7bf4d6a..8f2ae8d 100644 --- a/custom_stream_listener.py +++ b/custom_stream_listener.py @@ -1,8 +1,19 @@ from mastodon import StreamListener -from reply_and_dismiss import replyAndDismissNotification +from reply_and_dismiss import replyToToot, dismissNotification class CustomListener(StreamListener): + def on_notification(self, notification): + if (notification["type"] == "mention"): - replyAndDismissNotification(notification) + + print("Fightlike mentioned! Replying to toot...") + + replyToToot(notification) + + print("Done!") + + dismissNotification(notification) + + print("Notification dismissed!") diff --git a/db.py b/db.py deleted file mode 100644 index 9bd01c5..0000000 --- a/db.py +++ /dev/null @@ -1,143 +0,0 @@ -import pytest -import os -from dotenv import load_dotenv -from pymongo.mongo_client import MongoClient -from pymongo.server_api import ServerApi -from bson.son import SON - - -def get_DB_URI(): - # Load environment variables. - try: - load_dotenv() - db_uri = os.getenv("DB_URI") - except Exception as e: - print(e) - - return db_uri - - -def getMongoClinet(): - # Create a new MongoDB client and connect to the server. - client = MongoClient(get_DB_URI(), server_api=ServerApi("1")) - - # Send a ping to confirm a successful connection. - try: - client.admin.command("ping") - print( - "Deployment pinged. Connection successfully established." - ) - except Exception as e: - print(e) - - return client - - -def getCollection(): - # Get database. - db = getMongoClinet().fightlike - - # Get collection. - collection = db["character"] - - return collection - - -def getKeywords(charName: str, flags: [str]) -> [str]: - # Formulate keywords query. - keywordsQuery = { - "name": charName - } - - # Check for and add game flag, if present. - title = "" - - for flag in flags: - if (flag.startswith("!")): - title = flag[1:] - keywordsQuery["game.title"] = flag[1:] - - # Get character document. - charObj = getCollection().find_one(keywordsQuery) - - # Assume charName is slug if name is not in database. - if charObj is None: - del keywordsQuery["name"] - keywordsQuery["slug"] = charName.casefold() - charObj = getCollection().find_one(keywordsQuery) - - # Assume game.title is game's slug. - if charObj is None and title: - del keywordsQuery["game.title"] - keywordsQuery["game.slug"] = title - charObj = getCollection().find_one(keywordsQuery) - - # Assume charName is name again. - if charObj is None: - del keywordsQuery["slug"] - keywordsQuery["name"] = charName - charObj = getCollection().find_one(keywordsQuery) - - try: - return charObj["keywords"] - except Exception: - return None - - -def getReplyFromDB(charName: str, flags: [str]) -> str: - - keywords: [str] | None = getKeywords(charName, flags) - - # If keywords is empty, assume character is not in database. - if keywords is None: - return "Unfortunately, your main is not in Fightlike's database yet. You can fill this form to add them: https://fightlike.mohab.xyz/submit" - - # Compose query. - query = { - "keywords": {"$in": keywords} - } - - # Add flags to query. - for flag in flags: - if (flag.startswith("!")): - continue - elif (flag == "rollback" or flag == "delay"): - query["game.netcode"] = flag - else: - query["game.franchise"] = flag - - # Build pipeline. - pipeline = [ - {"$match": query}, - {"$unwind": "$keywords"}, - {"$match": query}, - {"$group": {"_id": { - "name": "$name", - "game": "$game.title", - "netcode": "$game.netcode" - }, - "count": {"$sum": 1}} - }, - {"$sort": SON([("count", -1)])}, - {"$limit": 10} - ] - - # Get similar characters and compose reply. - reply = f'Fightlike recommends:\n\n' - - # Write utility function to check if submitted slug = firstname in results. - - for entry in getCollection().aggregate(pipeline): - name = entry["_id"]["name"] - netcode = entry["_id"]["netcode"] - game = entry["_id"]["game"] - - if (charName.capitalize() == name - or charName.capitalize() == name.split(" ")[0]): - continue - elif (netcode != "rollback"): - reply += f'- {name} in {game} [Delay Netcode].\n' - else: - reply += f'- {name} in {game}.\n' - - return reply diff --git a/fightlike.py b/fightlike.py index a8a7be8..0e3638a 100644 --- a/fightlike.py +++ b/fightlike.py @@ -1,20 +1,20 @@ from connect_to_mastodon import getMastodonConnection from custom_stream_listener import CustomListener -from reply_and_dismiss import replyAndDismissMissedNotifications +from missed_notifications import handleMissedNotifications -def runFightlike(): +def runFightlike() -> None: - # Local Mastodon connection. + # Reply to, and dismiss, missed notifications, if any. + handleMissedNotifications() + + # Connect to Mastodon (https://botsin.space/). mastodon = getMastodonConnection() - # Instantiate StreamListener (SL). + # Instantiate StreamListener (SL); listen for mentions. notificationListener = CustomListener() - # Reply and dismiss missed notifications. - replyAndDismissMissedNotifications() - - # Pass SL instance to mastodon.stream_user. + # Begin listening to notifications. mastodon.stream_user(listener=notificationListener) diff --git a/reply_and_dismiss.py b/reply_and_dismiss.py index 335b385..b899878 100644 --- a/reply_and_dismiss.py +++ b/reply_and_dismiss.py @@ -1,58 +1,36 @@ -from util import getTootText, getCharName, getFlags -from db import getReplyFromDB +from toot_info import getTootInfo +from database import composeReply from connect_to_mastodon import getMastodonConnection -def replyAndDismissNotification(notification: dict): +def replyToToot(notification: dict) -> None: - # Define local mastodon instance. - mastodon = getMastodonConnection() - - # Pull toot ID and content. - tootID: int = notification["status"]["id"] - tootContent: str = notification["status"]["content"] - - # Pull toot text. - tootText: str = getTootText(tootContent) - - # Pull character name. - charName: str = getCharName(tootText) + tootInfo = getTootInfo(notification) - # Get flags. - flags: [str] = getFlags(tootText) + mastodon = getMastodonConnection() # Get reply from MongoDB. - reply: str = getReplyFromDB(charName, flags) + reply: str = composeReply(tootInfo.charName, tootInfo.flags) # Reply to toot. try: mastodon.status_reply( - to_status=mastodon.status(tootID), + to_status=mastodon.status(tootInfo.tootID), status=reply, - in_reply_to_id=tootID + in_reply_to_id=tootInfo.tootID ) except Exception as e: print(e) + +def dismissNotification(notification: dict) -> None: + + mastodon = getMastodonConnection() + # Pull notification ID. notificationID: int = notification["id"] - # Dismiss notification. try: mastodon.notifications_dismiss(notificationID) except Exception as e: print(e) - - -def replyAndDismissMissedNotifications(): - - # Define local mastodon instance. - mastodon = getMastodonConnection() - - # Pull notifications. - notifications = mastodon.notifications(mentions_only=True) - - # Reply to any missed notification. - if (len(notifications) > 0): - for notification in notifications: - replyAndDismissNotification(notification) diff --git a/test_db.py b/test_db.py deleted file mode 100644 index 609c1e5..0000000 --- a/test_db.py +++ /dev/null @@ -1,84 +0,0 @@ -import db - - -def test_getKeywords(): - - reply = db.getKeywords("bridget", ["!ggst", "rollback", "blazblue"]) - - assert reply == ["all-rounder", "fast", "mixup", "mobility", "poking"] - - -def test_getKeywordsIfCharDoesNotExist(): - - reply = db.getKeywords("mohab", []) - - assert reply is None - - -# Test for 'Nine the Phantom' substring in MongoDB response. -def test_getReplyFromDBWithGameFlag(): - - reply = db.getReplyFromDB("Testament", [ - "!Guilty Gear -STRIVE-", "rollback", "blazblue" - ]) - - assert reply.find( - "Nine the Phantom" - ) >= 0 - - -# Test for 'Carl Clover' substring in MongoDB response. -def test_getReplyFromDBWithoutGameFlag(): - - reply = db.getReplyFromDB("Eddie", [ - "rollback", "blazblue" - ]) - - assert reply.find( - "Carl Clover" - ) >= 0 - - -# Test for 'Jam Kuradoberi' substring in MongoDB response. -def test_getReplyFromDBWithFullName(): - - reply = db.getReplyFromDB("Leo Whitefang", [ - "rollback", "guilty gear" - ]) - - assert reply.find( - "Kum Haehyun" - ) >= 0 - - -# Test for 'Jam Kuradoberi' substring in MongoDB response. -def test_getReplyFromDBWithFirstName(): - - reply = db.getReplyFromDB("leo", [ - "rollback", "guilty gear" - ]) - - assert reply.find( - "Kum Haehyun" - ) >= 0 - - -def test_getReplyFromDBIfCharacterDoesNotExist(): - - reply = db.getReplyFromDB("mohab", []) - - assert reply.find( - "https://fightlike.mohab.xyz/submit" - ) >= 0 - - -# Test for 'Nine the Phantom' substring in MongoDB response. -def test_getReplyFromDBWithGameSlugAndCharSlug(): - - reply = db.getReplyFromDB("bridget", [ - "!ggst", "rollback", "blazblue" - ]) - - assert reply.find( - "Amane Nishiki" - ) >= 0 diff --git a/test_util.py b/test_util.py deleted file mode 100644 index 5523b0c..0000000 --- a/test_util.py +++ /dev/null @@ -1,33 +0,0 @@ -import util - - -# Test parser. -def test_getTootText(): - - tootText = util.getTootText( - '

Find characters like Eddie: @fightlike eddie

' - ) - - assert tootText == "@fightlike eddie" - - -# Test if character name correctly pulled. -def test_getCharName(): - - charName = util.getCharName("@fightlike eddie") - - assert charName == "Eddie" - - -def test_getFullCharName(): - - charName = util.getCharName("@fightlike leo whitefang") - - assert charName == "Leo Whitefang" - - -def test_getFlags(): - - flags = util.getFlags("@fightlike eddie !!ggxxacpr !blazblue !rollback") - - assert flags[0] == "!ggxxacpr" and flags[1] == "blazblue" and flags[2] == "rollback" diff --git a/util.py b/util.py deleted file mode 100644 index e5d3fc4..0000000 --- a/util.py +++ /dev/null @@ -1,43 +0,0 @@ -import pytest -from bs4 import BeautifulSoup - - -# Parse toot content and return text. -def getTootText(tootContent: str): - parsedTootContent: str = BeautifulSoup( - tootContent, "html.parser" - ) - - tootText: str = parsedTootContent.get_text().strip(' \t\n\r') - - beginIndex: int = tootText.find("@fightlike") - - return tootText[beginIndex:] - - -# Pull character name from text. -def getCharName(tootText: str): - def getFullCharName(rawName: str): - - firstName = rawName.split(" ").pop(0).capitalize() - - lastName = rawName.split(" ").pop(-1).capitalize() - - if (firstName == lastName): - return firstName - else: - return firstName + " " + lastName - - tootFillet: str = tootText.split(" !").pop(0) - - if (tootFillet.find("@fightlike") == 0): - charName = getFullCharName(tootFillet[11:]) - else: - charName = getFullCharName(tootFillet[25:]) - - return charName - - -# Pull flags from text. -def getFlags(tootText: str): - return tootText.split(" !")[1:] -- 2.45.2