~sircmpwn/core.sr.ht

core.sr.ht/srht/database.py -rw-r--r-- 5.2 KiB
fb5df72aDrew DeVault srht.graphql: add missing import 22 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import hashlib
import sys
from alembic import command, context
from alembic.config import Config, CommandLine
from argparse import ArgumentParser
from datetime import datetime
from logging.config import dictConfig
from prometheus_client import Histogram
from sqlalchemy import create_engine, event, engine_from_config, pool
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
from srht.config import cfg
from timeit import default_timer
from werkzeug.local import LocalProxy

Base = declarative_base()

_db = None
db = LocalProxy(lambda: _db)

_metrics = type("metrics", tuple(), {
    m.describe()[0].name: m
    for m in [
        Histogram("sql_query_duration", "Duration of SQL queries"),
    ]
})

class DbSession():
    def __init__(self, connection_string, assign_global=True):
        global Base, _db
        self.engine = create_engine(connection_string)
        self.session = scoped_session(sessionmaker(
            autocommit=False,
            autoflush=False,
            bind=self.engine))
        Base.query = self.session.query_property()
        if assign_global:
            _db = self

    def init(self):
        @event.listens_for(Base, 'before_insert', propagate=True)
        def before_insert(mapper, connection, target):
            if hasattr(target, '_no_autoupdate'):
                return
            if hasattr(target, 'created'):
                target.created = datetime.utcnow()
            if hasattr(target, 'updated'):
                target.updated = datetime.utcnow()

        @event.listens_for(Base, 'before_update', propagate=True)
        def before_update(mapper, connection, target):
            if hasattr(target, '_no_autoupdate'):
                return
            if hasattr(target, 'updated'):
                target.updated = datetime.utcnow()

        @event.listens_for(self.engine, 'before_cursor_execute')
        def before_cursor_execute(conn, cursor, statement,
                    parameters, context, executemany):
            self._execute_start_time = default_timer()

        @event.listens_for(self.engine, 'after_cursor_execute')
        def after_cursor_execute(conn, cursor, statement,
                    parameters, context, executemany):
            _metrics.sql_query_duration.observe(
                    max(default_timer() - self._execute_start_time, 0))

    def create(self):
        Base.metadata.create_all(bind=self.engine)

def alembic(site, alembic_module, argv=None):
    """
    Automatically rigs up the Alembic config and shells out to it.
    """
    cmdline = CommandLine()
    cmdline.parser.add_argument("-a", "--auto",
        action="store_true",
        help="Specify -a to check config for automatic migrations and abort if "
            "unset (generally only package post-upgrade scripts will specify "
            "this)")
    if argv == None:
        argv = sys.argv[1:]
    options = cmdline.parser.parse_args(argv)
    if options.auto:
        if cfg(site, "migrate-on-upgrade", default="no") != "yes":
            print("Skipping automatic database migrations")
            print(f"Set [{site}]migrate-on-upgrade=yes in config.ini to enable")
            sys.exit(0)

    config = Config()
    script_location = list(alembic_module.__path__)[0]
    config.set_main_option("script_location", script_location)
    if site != "core.sr.ht":
        config.set_main_option("sqlalchemy.url", cfg(site, "connection-string"))

    dictConfig({
        "root": { "level": "WARN", "handlers": ["console"] },
        "loggers": {
            "sqlalchemy": { "level": "WARN", "handlers": [] },
            "alembic": { "level": "INFO", "handlers": [] },
        },
        "handlers": {
            "console": {
                "class": "logging.StreamHandler",
                "level": "NOTSET",
                "stream": "ext://sys.stderr",
                "formatter": "generic",
            }
        },
        "formatters": {
            "generic": {
                "format": "%(levelname)-5.5s [%(name)s] %(message)s",
                "datefmt": "%H:%M:%S",
            }
        },
        "version": 1,
    })

    cmdline.run_cmd(config, options)

def alembic_env(version_table="alembic_version"):
    target_metadata = Base.metadata
    config = context.config

    def run_migrations_offline():
        url = config.get_main_option("sqlalchemy.url")
        context.configure(url=url,
                target_metadata=target_metadata,
                include_schemas=True,
                version_table=version_table)

        with context.begin_transaction():
            context.run_migrations()

    def run_migrations_online():
        engine = engine_from_config(
                config.get_section(config.config_ini_section),
                prefix='sqlalchemy.', poolclass=pool.NullPool)

        connection = engine.connect()
        context.configure(connection=connection,
            target_metadata=target_metadata,
            include_schemas=True,
            version_table=version_table)

        try:
            with context.begin_transaction():
                context.run_migrations()
        finally:
            connection.close()

    if context.is_offline_mode():
        run_migrations_offline()
    else:
        run_migrations_online()