~liberachat/beryllia

ref: 85e6639bd0c98cafc5dfac576e93a4ebe9091d19 beryllia/beryllia/parse/nickserv.py -rw-r--r-- 4.4 KiB
85e6639b — Jess Porter record account freezes, with freeze reason %tags (#56) 3 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from re import compile as re_compile, search as re_search
from typing import Any, Awaitable, Callable, Dict, List, Optional

from irctokens import Line

from .common import IRCParser, RE_EMBEDDEDTAG
from ..database import Database
from ..util import recursive_mx_resolve

RE_COMMAND = re_compile(
    r"^(?P<nickname>\S+)"
    # this is optional in command output
    r"( (?P<account>[(]\S*[)]))?"
    r" (?P<command>\S+):"
    r"( (?P<args>.*))?$"
)

_TYPE_HANDLER = Callable[[Any, str, Optional[str], str], Awaitable[None]]
_HANDLERS: Dict[str, _TYPE_HANDLER] = {}


def _handler(command: str) -> Callable[[_TYPE_HANDLER], _TYPE_HANDLER]:
    def _inner(func: _TYPE_HANDLER) -> _TYPE_HANDLER:
        _HANDLERS[command] = func
        return func

    return _inner


class NickServParser(IRCParser):
    def __init__(self, database: Database):
        super().__init__()
        self._database = database
        self._registration_ids: Dict[str, int] = {}

    async def handle(self, line: Line) -> None:
        message = line.params[1]
        match = RE_COMMAND.search(message)
        if match is None:
            return

        command = match.group("command")
        if not command in _HANDLERS:
            return

        nickname = match.group("nickname")
        account = match.group("account")
        args = match.group("args")

        func = _HANDLERS[command]
        await func(self, nickname, account, args)

    async def _resolve_email(self, registration_id: int, email: str) -> None:
        email_parts = email.split("@", 1)
        if not len(email_parts) == 2:
            # log a warning?
            return

        _, email_domain = email_parts
        resolved = await recursive_mx_resolve(email_domain)

        resolved_ids: List[int] = []
        for record_parent, record_type, record in resolved:
            if record_parent is not None:
                record_parent = resolved_ids[record_parent]

            record_id = await self._database.email_resolve.add(
                registration_id, record_parent, record_type, record
            )
            resolved_ids.append(record_id)

    @_handler("REGISTER")
    async def _handle_REGISTER(
        self, nickname: str, _account: Optional[str], args: str
    ) -> None:

        match = re_search("^(?P<account>\S+) to (?P<email>\S+)$", args)
        if match is None:
            return

        account = match.group("account")
        email = match.group("email")

        registration_id = await self._database.registration.add(
            nickname, account, email
        )
        self._registration_ids[account] = registration_id

        await self._resolve_email(registration_id, email)

    @_handler("DROP")
    async def _handle_DROP(
        self, nickname: str, account: Optional[str], args: str
    ) -> None:

        if not args in self._registration_ids:
            return

        del self._registration_ids[args]

    @_handler("SET:ACCOUNTNAME")
    async def _handle_SET_ACCOUNTNAME(
        self, nickname: str, account: Optional[str], args: str
    ) -> None:

        if not account in self._registration_ids:
            return

        registration_id = self._registration_ids.pop(account)
        self._registration_ids[args] = registration_id

    @_handler("VERIFY:REGISTER")
    async def _handle_VERIFY_REGISTER(
        self, nickname: str, _account: Optional[str], args: str
    ) -> None:

        match = re_search(r"^(?P<account>\S+) ", args)
        if match is None:
            return

        account = match.group("account")
        if not account in self._registration_ids:
            return

        registration_id = self._registration_ids[account]
        await self._database.registration.verify(registration_id)

    @_handler("FREEZE:ON")
    async def _handle_FREEZE_ON(
        self, soper: str, soper_account: Optional[str], args: str
    ) -> None:

        match = re_search(r"(?P<account>\S+) \(reason: (?P<reason>.*)\)$", args)
        if match is None:
            return

        soper = soper_account or soper

        account = match.group("account")
        reason = match.group("reason")

        freeze_id = await self._database.account_freeze.add(account, soper, reason)

        tags = list(RE_EMBEDDEDTAG.finditer(reason))
        for tag_match in tags:
            tag = tag_match.group(1)
            if await self._database.freeze_tag.exists(freeze_id, tag):
                continue

            await self._database.freeze_tag.add(freeze_id, tag, soper)