~nicoco/sleamdge

710fa4de780ba9a51c6e15789d82ec0d74cb4e71 — nicoco 2 months ago 3d5f401
chore: update to slidge v0.2+
M pyproject.toml => pyproject.toml +1 -1
@@ 11,7 11,7 @@ readme = "README.md"
[tool.poetry.dependencies]
python = "^3.11"
steamio = "1.0.1"
slidge = "~0.1"
slidge = "^0.2.0alpha4"

[tool.poetry.group.dev.dependencies]
pytest-asyncio = "^0.21.0"

M sleamdge/__main__.py => sleamdge/__main__.py +2 -10
@@ 1,12 1,4 @@
import sys

from slidge.__main__ import main as slidge_main


def main() -> None:
    sys.argv.extend(["--legacy", "sleamdge"])
    slidge_main()

from slidge import entrypoint

if __name__ == "__main__":
    main()
    entrypoint("sleamdge")

M sleamdge/client.py => sleamdge/client.py +2 -33
@@ 2,11 2,9 @@ import asyncio
import logging
from asyncio import Future, get_running_loop
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Generic, Optional, TypeVar, Union

import steam
from slidge import global_config
from slixmpp.exceptions import XMPPError

if TYPE_CHECKING:


@@ 19,26 17,7 @@ ClanOrGroup = Union[steam.Clan, steam.Group]
Channel = Union[steam.GroupChannel, steam.ClanChannel]


class Base(steam.Client):
    """
    A client that implements persistent storage of the steam token
    """

    user_jid: str

    def _get_store(self) -> Path:
        return global_config.HOME_DIR / self.user_jid

    def save_token(self) -> None:
        if self.refresh_token is None:
            raise RuntimeError("Can not write refresh token because it is not set")
        self._get_store().write_text(self.refresh_token)

    def remove_token(self) -> None:
        self._get_store().unlink()


class CredentialsValidation(Base):
class CredentialsValidation(steam.Client):
    """
    A client for entering the OTP interactively and validating credentials
    """


@@ 46,32 25,22 @@ class CredentialsValidation(Base):
    def __init__(self, **k) -> None:
        super().__init__(**k)
        self.code_future: Future[str] = get_running_loop().create_future()
        self.token_saved = asyncio.Event()

    async def code(self) -> str:
        return await self.code_future

    async def on_login(self):
        self.save_token()
        await self.close()
        self.token_saved.set()


class SteamClient(Base):
class SteamClient(steam.Client):
    """
    The main steam client of registered users
    """

    def __init__(self, session: "Session", **k) -> None:
        self.session = session
        self.user_jid = session.user.bare_jid
        self.__outbox = Pending[int]()
        self.__outbox_reaction = Pending[tuple[int, str, bool]]()
        super().__init__(**k)

    async def login_from_token(self) -> None:
        await self.login(refresh_token=self._get_store().read_text())

    async def code(self) -> str:
        return await self.session.input(
            "You have been disconnected, please enter the code "

M sleamdge/contact.py => sleamdge/contact.py +3 -3
@@ 1,6 1,6 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, AsyncIterator, Optional

import steam
from slidge import LegacyContact, LegacyRoster


@@ 26,11 26,11 @@ class Roster(LegacyRoster[ID32, "Contact"]):
            raise XMPPError("bad-request", f"This is not a valid integer: {local}")
        return ID32(local_int)

    async def fill(self) -> None:
    async def fill(self) -> AsyncIterator["Contact"]:
        for user in await self.session.steam.user.friends():
            c = await self.by_steam_user(user)
            c.is_friend = True
            await c.add_to_roster()
            yield c


class Contact(LegacyContact[ID32]):

M sleamdge/gateway.py => sleamdge/gateway.py +11 -9
@@ 7,7 7,7 @@ from slidge.command.register import FormField, RegistrationType
from slixmpp import JID

from .client import CredentialsValidation
from .session import Session
from .types import ChannelId


class Gateway(BaseGateway):


@@ 35,6 35,10 @@ class Gateway(BaseGateway):

    GROUPS = True

    LEGACY_MSG_ID_TYPE = int
    LEGACY_CONTACT_ID_TYPE = int
    LEGACY_ROOM_ID_TYPE = ChannelId.from_str

    def __init__(self):
        super().__init__()
        self.__pending = dict[JID, tuple[asyncio.Task, CredentialsValidation]]()


@@ 49,20 53,18 @@ class Gateway(BaseGateway):
        assert isinstance(password, str)

        client = CredentialsValidation()
        client.user_jid = user_jid.bare

        task = self.xmpp.loop.create_task(client.login(username, password))
        self.__pending[JID(user_jid.bare)] = (task, client)
        return {}

    async def validate_two_factor_code(self, user: GatewayUser, code: str):
    async def validate_two_factor_code(self, user: GatewayUser, code: str) -> dict:
        task, client = self.__pending.pop(user.jid)
        client.code_future.set_result(code)
        await asyncio.wait_for(client.token_saved.wait(), timeout=120)

    async def unregister(self, user: GatewayUser):
        session: "Session" = self.get_session_from_user(user)  # type: ignore
        await session.steam.close()
        session.steam.remove_token()
        await client.wait_until_ready()
        token = client.refresh_token
        await client.close()
        return {"refresh_token": token}


log = logging.getLogger(__name__)

M sleamdge/group.py => sleamdge/group.py +14 -29
@@ 1,8 1,7 @@
from datetime import datetime
from enum import IntEnum
from typing import TYPE_CHECKING, NamedTuple, Optional, Union, cast
from typing import TYPE_CHECKING, AsyncIterator, Optional, Union

from slidge import LegacyBookmarks, LegacyMUC, LegacyParticipant, MucType
from slidge.util.types import HoleBound
from slixmpp.exceptions import XMPPError
from steam import (
    Clan,


@@ 15,27 14,13 @@ from steam import (
)

from .contact import Contact
from .types import ChannelId, Parent, ParentType
from .util import EMOJIS_SET, emojize

if TYPE_CHECKING:
    from .session import Session


class ParentType(IntEnum):
    GROUP = 1
    CLAN = 2


class Parent(NamedTuple):
    type: ParentType
    id: int


class ChannelId(NamedTuple):
    parent: Parent
    channel_id: int


class Bookmarks(LegacyBookmarks[ChannelId, "LegacyMUC"]):
    session: "Session"



@@ 51,14 36,11 @@ class Bookmarks(LegacyBookmarks[ChannelId, "LegacyMUC"]):
        return await self.by_legacy_id(ChannelId(parent, channel.id))

    async def legacy_id_to_jid_local_part(self, legacy_id: ChannelId) -> str:
        return f"{legacy_id.parent.type}S{legacy_id.parent.id}S{legacy_id.channel_id}"
        return str(legacy_id)

    async def jid_local_part_to_legacy_id(self, local_part: str) -> ChannelId:
        try:
            parent_type, parent_id, channel_id = (int(x) for x in local_part.split("S"))
            return ChannelId(
                Parent(cast(ParentType, parent_type), parent_id), channel_id
            )
            return ChannelId.from_str(local_part)
        except ValueError as e:
            raise XMPPError("bad-request", f"This is not a valid sleamdge MUC id: {e}")



@@ 153,25 135,28 @@ class MUC(LegacyMUC[ChannelId, int, Participant, int]):
        else:
            self.type = MucType.GROUP

    async def fill_participants(self):
    async def fill_participants(self) -> AsyncIterator[Participant]:
        g = (await self.get_steam_channel()).group
        if not g:
            return
        try:
            for m in await g.chunk():
                await self.get_participant_by_legacy_id(m.id)
                yield await self.get_participant_by_legacy_id(m.id)
        except AttributeError:
            # workaround for https://github.com/Gobot1234/steam.py/issues/565
            for m in g.members:
                await self.get_participant_by_legacy_id(m.id)
                yield await self.get_participant_by_legacy_id(m.id)

    async def backfill(
        self,
        oldest_message_id: Optional[int] = None,
        oldest_message_date: Optional[datetime] = None,
        after: Optional[HoleBound] = None,
        before: Optional[HoleBound] = None,
    ):
        c = await self.get_steam_channel()
        async for msg in c.history(before=oldest_message_date):
        async for msg in c.history(
            before=None if before is None else before.timestamp,
            after=None if after is None else after.timestamp,
        ):
            part = await self.get_participant_by_legacy_id(msg.author.id)
            part.send_text(
                msg.clean_content,

M sleamdge/session.py => sleamdge/session.py +13 -2
@@ 5,7 5,7 @@ from typing import Optional, Union

import steam
from PIL import Image
from slidge import BaseSession, FormField, SearchResult
from slidge import BaseSession, FormField, SearchResult, global_config
from slixmpp.exceptions import XMPPError

from .client import SteamClient


@@ 30,8 30,19 @@ class Session(BaseSession[int, Recipient]):
    def xmpp_to_legacy_msg_id(i: str) -> int:
        return int(i)

    async def __login_task(self):
        refresh_token = self.user.legacy_module_data.get("refresh_token")
        if refresh_token is None:
            refresh_token = (
                global_config.HOME_DIR / str(self.user_jid.bare)
            ).read_text()
            self.legacy_module_data_set({"refresh_token": refresh_token})
        assert isinstance(refresh_token, str)
        await self.steam.login(refresh_token=refresh_token)
        self.logged = False

    async def login(self) -> None:
        self.login_task = asyncio.create_task(self.steam.login_from_token())
        self.login_task = asyncio.create_task(self.__login_task())
        await self.steam.wait_until_ready()
        self.contacts.user_legacy_id = self.steam.user.id
        self.emoticons = {e.name: e for e in self.steam.emoticons}

A sleamdge/types.py => sleamdge/types.py +34 -0
@@ 0,0 1,34 @@
from enum import IntEnum


class ParentType(IntEnum):
    GROUP = 1
    CLAN = 2


class Parent:
    __slots__ = "type", "id"

    def __init__(self, type_: ParentType, parent_id: int):
        self.type = type_
        self.id = parent_id


class ChannelId:
    __slots__ = "parent", "channel_id"

    def __init__(self, parent: Parent, channel_id: int):
        self.parent = parent
        self.channel_id = channel_id

    def __repr__(self):
        return f"{self.parent.type.value}-{self.parent.id}-{self.channel_id}"

    def __hash__(self):
        return hash(str(self))

    @classmethod
    def from_str(cls, string):
        parent_type_int, parent_id, channel_id = (int(x) for x in string.split("-"))
        parent = Parent(ParentType(parent_type_int), parent_id)
        return cls(parent, channel_id)