~sirn/fanboi2

ref: 98796226802a26b82c888365ad5b9cd331006792 fanboi2/fanboi2/__init__.py -rw-r--r-- 7.5 KiB
98796226Kridsada Thanabulpong Bump copyright year. 3 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import copy
import hashlib
import os
from functools import lru_cache
from ipaddress import ip_address
from pyramid.config import Configurator
from pyramid.path import AssetResolver
from pyramid.settings import aslist
from sqlalchemy.engine import engine_from_config
from fanboi2.cache import cache_region
from fanboi2.models import DBSession, Base, redis_conn, identity
from fanboi2.tasks import celery, configure_celery
from fanboi2.utils import akismet, dnsbl, geoip, proxy_detector, checklist


def remote_addr(request):
    """Similar to Pyramid's :attr:`request.remote_addr` but will fallback
    to ``HTTP_X_FORWARDED_FOR`` when ``REMOTE_ADDR`` is either a private
    address or a loopback address. If multiple forwarded IPs are given
    in ``HTTP_X_FORWARDED_FOR``, only the first one will be returned.

    :param request: A :class:`pyramid.request.Request` object.

    :type request: pyramid.request.Request
    :rtype: str
    """
    ipaddr = ip_address(request.environ.get('REMOTE_ADDR', '255.255.255.255'))
    if ipaddr.is_private:
        return request.environ.get('HTTP_X_FORWARDED_FOR', str(ipaddr)).\
            split(",")[0].\
            strip()
    return str(ipaddr)


def route_name(request):
    """Returns :attr:`name` of current :attr:`request.matched_route`.

    :param request: A :class:`pyramid.request.Request` object.

    :type request: pyramid.request.Request
    :rtype: str
    """
    if request.matched_route:
        return request.matched_route.name


@lru_cache(maxsize=10)
def _get_asset_hash(path):
    """Returns an MD5 hash of the given assets path.

    :param path: An asset specification to the asset file.

    :type param: str
    :rtype: str
    """
    if ':' in path:
        package, path = path.split(':')
        resolver = AssetResolver(package)
    else:
        resolver = AssetResolver()
    fullpath = resolver.resolve(path).abspath()
    md5 = hashlib.md5()
    with open(fullpath, 'rb') as f:
        for chunk in iter(lambda: f.read(128 * md5.block_size), b''):
            md5.update(chunk)
    return md5.hexdigest()


def tagged_static_path(request, path, **kwargs):
    """Similar to Pyramid's :meth:`request.static_path` but append first 8
    characters of file hash as query string ``h`` to it forcing proxy server
    and browsers to expire cache immediately after the file is modified.

    :param request: A :class:`pyramid.request.Request` object.
    :param path: An asset specification to the asset file.
    :param kwargs: Arguments to pass to :meth:`request.static_path`.

    :type request: pyramid.request.Request
    :type path: str
    :type kwargs: dict
    :rtype: str
    """
    kwargs['_query'] = {'h': _get_asset_hash(path)[:8]}
    return request.static_path(path, **kwargs)


def normalize_settings(settings, _environ=os.environ):
    """Normalize settings to the correct format and merge it with environment
    equivalent if relevant key exists.

    :param settings: A settings :type:`dict`.

    :type settings: dict
    :rtype: dict
    """
    def _cget(env_key, settings_key):
        settings_val = settings.get(settings_key, '')
        return _environ.get(env_key, settings_val)

    sqlalchemy_url = _cget('SQLALCHEMY_URL', 'sqlalchemy.url')
    redis_url = _cget('REDIS_URL', 'redis.url')
    celery_broker_url = _cget('CELERY_BROKER_URL', 'celery.broker')
    dogpile_url = _cget('DOGPILE_URL', 'dogpile.arguments.url')
    session_url = _cget('SESSION_URL', 'session.url')
    session_secret = _cget('SESSION_SECRET', 'session.secret')

    app_timezone = _cget('APP_TIMEZONE', 'app.timezone')
    app_secret = _cget('APP_SECRET', 'app.secret')
    app_akismet_key = _cget('APP_AKISMET_KEY', 'app.akismet_key')
    app_dnsbl_providers = _cget('APP_DNSBL_PROVIDERS', 'app.dnsbl_providers')

    app_proxy_detect_providers = _cget(
        'APP_PROXY_DETECT_PROVIDERS',
        'app.proxy_detect.providers')

    app_proxy_detect_blackbox_url = _cget(
        'APP_PROXY_DETECT_BLACKBOX_URL',
        'app.proxy_detect.blackbox.url')

    app_proxy_detect_getipintel_url = _cget(
        'APP_PROXY_DETECT_GETIPINTEL_URL',
        'app.proxy_detect.getipintel.url')

    app_proxy_detect_getipintel_email = _cget(
        'APP_PROXY_DETECT_GETIPINTEL_EMAIL',
        'app.proxy_detect.getipintel.email')

    app_proxy_detect_getipintel_flags = _cget(
        'APP_PROXY_DETECT_GETIPINTEL_FLAGS',
        'app.proxy_detect.getipintel.flags')

    app_geoip2_database = _cget('APP_GEOIP2_DATABASE', 'app.geoip2_database')
    app_checklist = _cget('APP_CHECKLIST', 'app.checklist')

    if app_dnsbl_providers is not None:
        app_dnsbl_providers = aslist(app_dnsbl_providers)

    if app_proxy_detect_providers is not None:
        app_proxy_detect_providers = aslist(app_proxy_detect_providers)

    if app_checklist is not None:
        app_checklist = aslist(app_checklist)

    _settings = copy.deepcopy(settings)
    _settings.update({
        'sqlalchemy.url': sqlalchemy_url,
        'redis.url': redis_url,
        'celery.broker': celery_broker_url,
        'dogpile.arguments.url': dogpile_url,
        'session.url': session_url,
        'session.secret': session_secret,
        'app.timezone': app_timezone,
        'app.secret': app_secret,
        'app.akismet_key': app_akismet_key,
        'app.dnsbl_providers': app_dnsbl_providers,
        'app.proxy_detect.providers': app_proxy_detect_providers,
        'app.proxy_detect.blackbox.url': app_proxy_detect_blackbox_url,
        'app.proxy_detect.getipintel.url': app_proxy_detect_getipintel_url,
        'app.proxy_detect.getipintel.email': app_proxy_detect_getipintel_email,
        'app.proxy_detect.getipintel.flags': app_proxy_detect_getipintel_flags,
        'app.geoip2_database': app_geoip2_database,
        'app.checklist': app_checklist,
    })

    return _settings


def main(global_config, **settings):  # pragma: no cover
    """This function returns a Pyramid WSGI application.

    :param global_config: A :type:`dict` containing global config.
    :param settings: A :type:`dict` containing values from INI.

    :type global_config: dict
    :type settings: dict
    :rtype: pyramid.router.Router
    """
    config = Configurator(settings=normalize_settings(settings))
    config.include('pyramid_mako')
    config.include('pyramid_beaker')

    engine = engine_from_config(config.registry.settings, 'sqlalchemy.')
    DBSession.configure(bind=engine)
    Base.metadata.bind = engine

    cache_region.configure_from_config(config.registry.settings, 'dogpile.')
    redis_conn.from_url(config.registry.settings['redis.url'])
    celery.config_from_object(configure_celery(config.registry.settings))
    identity.configure_tz(config.registry.settings['app.timezone'])
    akismet.configure_key(config.registry.settings['app.akismet_key'])
    dnsbl.configure_providers(config.registry.settings['app.dnsbl_providers'])
    geoip.configure_geoip2(config.registry.settings['app.geoip2_database'])
    checklist.configure_checklist(config.registry.settings['app.checklist'])
    proxy_detector.configure_from_config(
        config.registry.settings,
        'app.proxy_detect.')

    config.set_request_property(remote_addr)
    config.set_request_property(route_name)
    config.add_request_method(tagged_static_path)
    config.add_route('robots', '/robots.txt')

    config.include('fanboi2.serializers')
    config.include('fanboi2.views.pages', route_prefix='/pages')
    config.include('fanboi2.views.api', route_prefix='/api')
    config.include('fanboi2.views.boards', route_prefix='/')
    config.add_static_view('static', 'static', cache_max_age=3600)
    config.scan()

    return config.make_wsgi_app()