~liberachat/beryllia

ref: 40693269e2bb6942f7df8ae7bd6560a4445cd2c3 beryllia/beryllia/database/kline_kill.py -rw-r--r-- 4.1 KiB
40693269 — jesopo fix a number of kcheck-related SELECTs that were not ordering output 3 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from dataclasses import dataclass
from datetime import datetime
from ipaddress import IPv4Address, IPv6Address
from ipaddress import IPv4Network, IPv6Network
from typing import Any, Collection, Optional, Sequence, Tuple, Union

from .common import NickUserHost, Table
from ..normalise import SearchType
from ..util import lex_glob_pattern, glob_to_sql


@dataclass
class DBKLineKill(NickUserHost):
    id: int
    nickname: str
    username: str
    hostname: str
    ip: Optional[Union[IPv4Address, IPv6Address]]
    ts: datetime

    # nick user host
    def nuh(self) -> str:
        return f"{self.nickname}!{self.username}@{self.hostname}"


class KLineKillTable(Table):
    async def add(
        self,
        kline_id: int,
        nickname: str,
        username: str,
        hostname: str,
        ip: Optional[Union[IPv4Address, IPv6Address]],
    ):

        query = """
            INSERT INTO kline_kill (
                nickname,
                search_nick,
                username,
                search_user,
                hostname,
                search_host,
                ip,
                kline_id,
                ts
            )
            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW()::timestamp)
        """
        args = [
            nickname,
            str(self.to_search(nickname, SearchType.NICK)),
            username,
            str(self.to_search(username, SearchType.USER)),
            hostname,
            str(self.to_search(hostname, SearchType.HOST)),
            ip,
            kline_id,
        ]
        async with self.pool.acquire() as conn:
            await conn.execute(query, *args)

    async def _find_klines(
        self, where: str, args: Sequence[Any], count: int
    ) -> Collection[Tuple[int, datetime]]:

        query = f"""
            SELECT DISTINCT(kline.id), kline.ts
            FROM kline_kill
            INNER JOIN kline
            ON kline_kill.kline_id = kline.id
            {where}
            ORDER BY kline.ts DESC
            LIMIT {count}
        """

        async with self.pool.acquire() as conn:
            rows = await conn.fetch(query, *args)

        return rows

    async def find_by_nick(
        self, nickname: str, count: int
    ) -> Collection[Tuple[int, datetime]]:

        pattern = glob_to_sql(lex_glob_pattern(nickname))
        param = str(self.to_search(pattern, SearchType.NICK))
        return await self._find_klines("WHERE search_nick LIKE $1", [param], count)

    async def find_by_host(
        self, hostname: str, count: int
    ) -> Collection[Tuple[int, datetime]]:

        pattern = glob_to_sql(lex_glob_pattern(hostname))
        param = str(self.to_search(pattern, SearchType.HOST))
        return await self._find_klines("WHERE search_host LIKE $1", [param], count)

    async def find_by_ip(
        self, ip: Union[IPv4Address, IPv6Address], count: int
    ) -> Collection[Tuple[int, datetime]]:

        return await self._find_klines("WHERE ip = $1", [ip], count)

    async def find_by_cidr(
        self, cidr: Union[IPv4Network, IPv6Network], count: int
    ) -> Collection[Tuple[int, datetime]]:

        return await self._find_klines("WHERE ip << $1", [cidr], count)

    async def find_by_ip_glob(
        self, glob: str, count: int
    ) -> Collection[Tuple[int, datetime]]:

        pattern = glob_to_sql(lex_glob_pattern(glob))
        param = str(self.to_search(pattern, SearchType.HOST))
        return await self._find_klines("WHERE TEXT(ip) LIKE $1", [param], count)

    async def find_by_kline(self, kline_id: int) -> Collection[DBKLineKill]:
        query = """
            SELECT id, nickname, username, hostname, ip, ts
            FROM kline_kill
            WHERE kline_id = $1
        """

        async with self.pool.acquire() as conn:
            rows = await conn.fetch(query, kline_id)

        return [DBKLineKill(*row) for row in rows]

    async def set_kline(self, kill_id: int, kline_id: int):

        query = """
            UPDATE kline_kill
            SET kline_id = $1
            WHERE id = $2
        """
        async with self.pool.acquire() as conn:
            await conn.execute(query, kline_id, kill_id)