@@ 3,13 3,15 @@ import logging
from asyncio import Future, get_running_loop
from datetime import datetime
from pathlib import Path
-from typing import TYPE_CHECKING, Generic, TypeVar, Union
+from typing import TYPE_CHECKING, Generic, Optional, TypeVar, Union
import steam
from slidge import global_config
from slixmpp.exceptions import XMPPError
if TYPE_CHECKING:
+ from .contact import Contact
+ from .group import Participant
from .session import Session
@@ 76,6 78,18 @@ class SteamClient(Base):
"you received via email or steam guard"
)
+ async def get_contact_or_participant(
+ self, message: steam.Message
+ ) -> Optional[Union["Contact", "Participant"]]:
+ if isinstance(message, steam.UserMessage):
+ return await self.session.contacts.by_steam_user(
+ message.channel.participant
+ )
+ elif isinstance(message, (steam.GroupMessage, steam.ClanMessage)):
+ muc = await self.session.bookmarks.by_steam_channel(message.channel)
+ return await muc.get_participant_by_legacy_id(message.author.id)
+ return None
+
async def on_typing(self, user: steam.User, when: datetime):
if user == self.user:
return
@@ 86,12 100,8 @@ class SteamClient(Base):
if self.__outbox.set_if_pending(message.id):
return
- if isinstance(message, steam.UserMessage):
- c = await self.session.contacts.by_steam_user(message.channel.participant)
- elif isinstance(message, (steam.GroupMessage, steam.ClanMessage)):
- muc = await self.session.bookmarks.by_steam_channel(message.channel)
- c = await muc.get_participant_by_legacy_id(message.author.id)
- else:
+ c = await self.get_contact_or_participant(message)
+ if c is None:
return
c.send_text(
@@ 165,15 175,11 @@ class SteamClient(Base):
async def update_reactions(self, reaction: steam.MessageReaction) -> None:
message = reaction.message
- if isinstance(message, steam.UserMessage):
- c = await self.session.contacts.by_steam_user(message.channel.participant)
- elif isinstance(message, (steam.GroupMessage, steam.ClanMessage)):
- muc = await self.session.bookmarks.by_steam_channel(message.channel)
- c = await muc.get_participant_by_legacy_id(reaction.user.id)
- else:
- return
if not reaction.emoticon:
return
+ c = await self.get_contact_or_participant(message)
+ if c is None:
+ return
self.session.log.debug("Reaction: %s", reaction)
await c.update_reaction(reaction)