~liberachat/beryllia

ref: 9eb98b89a15f8a920fba58bf6a956e27451f76ef beryllia/beryllia/database/kline.py -rw-r--r-- 4.4 KiB
9eb98b89 — jesopo fix klinedel regex 3 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Collection, Dict, Optional, Sequence, Tuple

from .common import Table
from ..normalise import SearchType
from ..util import lex_glob_pattern, glob_to_sql


@dataclass
class DBKLine(object):
    mask: str
    source: str
    oper: str
    duration: int
    reason: str
    ts: datetime
    expire: datetime


class KLineTable(Table):
    async def get(self, id: int) -> DBKLine:
        query = """
            SELECT mask, source, oper, duration, reason, ts, expire
            FROM kline
            WHERE id = $1
        """
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(query, id)

        return DBKLine(*row)

    async def exists(self, id: int) -> bool:
        query = """
            SELECT 1
                FROM kline
            WHERE id = $1
        """
        async with self.pool.acquire() as conn:
            return bool(await conn.fetchval(query, id))

    async def list_active(self) -> Dict[str, int]:
        query = """
            SELECT kline.mask, kline.id FROM kline

            LEFT JOIN kline_remove
            ON kline.id = kline_remove.kline_id

            WHERE kline_remove.ts IS NULL
            AND kline.expire > NOW()::TIMESTAMP
        """
        async with self.pool.acquire() as conn:
            return dict(await conn.fetch(query))

    async def find_active(self, mask: str) -> Optional[int]:
        query = """
            SELECT kline.id FROM kline

            LEFT JOIN kline_remove
            ON  kline.id = kline_remove.kline_id
            AND kline_remove.ts IS NULL

            WHERE kline.mask = $1
            AND kline.expire > NOW()

            ORDER BY kline.ts DESC
            LIMIT 1
        """
        async with self.pool.acquire() as conn:
            return await conn.fetchval(query, mask)

    async def add(
        self, source: str, oper: str, mask: str, duration: int, reason: str
    ) -> int:

        utcnow = datetime.utcnow()
        query = """
            INSERT INTO kline
            (mask, search_mask, source, oper, duration, reason, ts, expire)
            VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
            RETURNING id
        """
        args = [
            mask,
            str(self.to_search(mask, SearchType.MASK)),
            source,
            oper,
            duration,
            reason,
            utcnow,
            utcnow + timedelta(seconds=duration),
        ]
        async with self.pool.acquire() as conn:
            return await conn.fetchval(query, *args)

    async def reject_hit(self, kline_id: int) -> None:
        query = """
            UPDATE kline
            SET last_reject = NOW()::TIMESTAMP
            WHERE id = $1
        """

        async with self.pool.acquire() as conn:
            await conn.execute(query, kline_id)

    async def find_by_ts(
        self, ts: datetime, count: int, fudge: int = 1
    ) -> Collection[Tuple[int, datetime]]:

        query = """
            SELECT id, ts FROM kline
            WHERE ABS(
                EXTRACT(
                    EPOCH FROM (
                        DATE_TRUNC('minute', ts) - $1
                    )
                )
            ) / 60 <= $2
            LIMIT $3
        """
        async with self.pool.acquire() as conn:
            return await conn.fetch(query, ts, fudge, count)

    async def find_last_by_oper(self, oper: str, count: int) -> Sequence[int]:
        query = """
            SELECT id FROM kline
            WHERE oper = $1
            ORDER BY ts DESC
            LIMIT $2
        """
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(query, oper, count)
        return [r[0] for r in rows]

    async def _find_klines(
        self, where: str, args: Sequence[Any], count: int
    ) -> Collection[Tuple[int, datetime]]:

        query = f"""
            SELECT id, ts
            FROM kline
            {where}
            LIMIT {count}
        """

        async with self.pool.acquire() as conn:
            rows = await conn.fetch(query, *args)

        return rows

    async def find_by_mask_glob(
        self, mask: str, count: int
    ) -> Collection[Tuple[int, datetime]]:

        pattern = glob_to_sql(lex_glob_pattern(mask))
        param = str(self.to_search(pattern, SearchType.MASK))
        return await self._find_klines("WHERE search_mask LIKE $1", [param], count)