~liberachat/beryllia

ref: bb37c8118ef45799fe6abe5955094585c829429c beryllia/beryllia/parse/nickserv.py -rw-r--r-- 3.7 KiB
bb37c811 — Jess Porter fix a number of kcheck-related SELECTs that were not ordering output (#57) 3 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from re import compile as re_compile, search as re_search
from typing import Any, Awaitable, Callable, Dict, List, Optional

from irctokens import Line

from .common import IRCParser
from ..database import Database
from ..util import recursive_mx_resolve

RE_COMMAND = re_compile(
    r"^(?P<nickname>\S+)"
    # this is optional in command output
    r"( (?P<account>[(]\S*[)]))?"
    r" (?P<command>\S+):"
    r"( (?P<args>.*))?$"
)

_TYPE_HANDLER = Callable[[Any, str, Optional[str], str], Awaitable[None]]
_HANDLERS: Dict[str, _TYPE_HANDLER] = {}


def _handler(command: str) -> Callable[[_TYPE_HANDLER], _TYPE_HANDLER]:
    def _inner(func: _TYPE_HANDLER) -> _TYPE_HANDLER:
        _HANDLERS[command] = func
        return func

    return _inner


class NickServParser(IRCParser):
    def __init__(self, database: Database):
        super().__init__()
        self._database = database
        self._registration_ids: Dict[str, int] = {}

    async def handle(self, line: Line) -> None:
        message = line.params[1]
        match = RE_COMMAND.search(message)
        if match is None:
            return

        command = match.group("command")
        if not command in _HANDLERS:
            return

        nickname = match.group("nickname")
        account = match.group("account")
        args = match.group("args")

        func = _HANDLERS[command]
        await func(self, nickname, account, args)

    async def _resolve_email(self, registration_id: int, email: str) -> None:
        email_parts = email.split("@", 1)
        if not len(email_parts) == 2:
            # log a warning?
            return

        _, email_domain = email_parts
        resolved = await recursive_mx_resolve(email_domain)

        resolved_ids: List[int] = []
        for record_parent, record_type, record in resolved:
            if record_parent is not None:
                record_parent = resolved_ids[record_parent]

            record_id = await self._database.email_resolve.add(
                registration_id, record_parent, record_type, record
            )
            resolved_ids.append(record_id)

    @_handler("REGISTER")
    async def _handle_REGISTER(
        self, nickname: str, _account: Optional[str], args: str
    ) -> None:

        match = re_search("^(?P<account>\S+) to (?P<email>\S+)$", args)
        if match is None:
            return

        account = match.group("account")
        email = match.group("email")

        registration_id = await self._database.registration.add(
            nickname, account, email
        )
        self._registration_ids[account] = registration_id

        await self._resolve_email(registration_id, email)

    @_handler("DROP")
    async def _handle_DROP(
        self, nickname: str, account: Optional[str], args: str
    ) -> None:

        if not args in self._registration_ids:
            return

        del self._registration_ids[args]

    @_handler("SET:ACCOUNTNAME")
    async def _handle_SET_ACCOUNTNAME(
        self, nickname: str, account: Optional[str], args: str
    ) -> None:

        if not account in self._registration_ids:
            return

        registration_id = self._registration_ids.pop(account)
        self._registration_ids[args] = registration_id

    @_handler("VERIFY:REGISTER")
    async def _handle_VERIFY_REGISTER(
        self, nickname: str, _account: Optional[str], args: str
    ) -> None:

        match = re_search(r"^(?P<account>\S+) ", args)
        if match is None:
            return

        account = match.group("account")
        if not account in self._registration_ids:
            return

        registration_id = self._registration_ids[account]
        await self._database.registration.verify(registration_id)