~sircmpwn/paste.sr.ht

ref: 728842d6e88d44cda6e585914e23108f6015d09f paste.sr.ht/pastesrht/blueprints/public.py -rw-r--r-- 5.4 KiB
728842d6 — Francis Dinh Normalize linefeeds when submitting via web form 1 year, 8 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import humanize
import pygments
import stat
from flask import Blueprint, render_template, request, redirect, Response
from flask import url_for, abort
from flask_login import current_user
from hashlib import sha1
from io import StringIO
from jinja2 import Markup
from pastesrht.types import User, Paste, PasteFile, Blob
from pastesrht.webhooks import PasteWebhook
from pygments import highlight
from pygments.formatters import HtmlFormatter
from pygments.lexers import guess_lexer, guess_lexer_for_filename, TextLexer
from srht.database import db
from srht.flask import loginrequired, paginate_query
from srht.validation import Validation

public = Blueprint("public", __name__)

@public.route("/")
def index():
    if current_user:
        return render_template("new-paste.html")
    return render_template("index.html")

@public.route("/new-paste", methods=["POST"])
@loginrequired
def new_paste_POST():
    valid = Validation(request)
    contents = valid.require("contents", friendly_name="File contents")
    contents = contents.replace("\r\n", "\n").replace("\r", "\n")
    filename = valid.optional("filename")
    commit = valid.require("commit")
    filename = filename.strip() if filename else filename

    paste = None
    paste_id = valid.optional("paste-id")
    if paste_id is not None:
        paste = Paste.query.filter(Paste.id == int(paste_id)).one_or_none()
        if commit == "force":
            sha = sha1()
            for f in paste.files:
                sha.update((f.filename if f.filename else "\0").encode())
                sha.update(f.blob.sha.encode())
            sha.update(str(current_user.id).encode())
            paste.sha = sha.hexdigest()
            PasteWebhook.deliver(PasteWebhook.Events.paste_create,
                    paste.to_dict(),
                    PasteWebhook.Subscription.user_id == paste.user_id)
            db.session.commit()
            return redirect(url_for(".paste_GET",
                user=current_user.username, sha=paste.sha))
    if not paste:
        paste = Paste()
        paste.user_id = current_user.id
        db.session.add(paste)

    for f in paste.files:
        if f.filename == filename:
            # TOOD: Edit this file?
            valid.error("A file with this name already exists in this paste.",
                    field="filename")

    if not valid.ok:
        return render_template("new-paste.html", paste=paste,
            highlight_file=highlight_file, stat=stat, humanize=humanize,
            **valid.kwargs)

    sha = sha1()
    sha.update(contents.encode())
    sha = sha.hexdigest()

    blob = Blob.query.filter(Blob.sha == sha).one_or_none()
    if not blob:
        blob = Blob()
        blob.contents = contents
        blob.sha = sha
        db.session.add(blob)

    db.session.flush()

    paste_file = PasteFile()
    paste_file.paste_id = paste.id
    paste_file.blob_id = blob.id
    paste_file.filename = filename
    db.session.add(paste_file)
    db.session.commit()

    if commit == "no":
        return render_template("new-paste.html", paste=paste,
            highlight_file=highlight_file, stat=stat, humanize=humanize)

    sha = sha1()
    for f in paste.files:
        sha.update((f.filename if f.filename else "\0").encode())
        sha.update(f.blob.sha.encode())
    sha.update(str(current_user.id).encode())
    paste.sha = sha.hexdigest()
    PasteWebhook.deliver(PasteWebhook.Events.paste_create,
            paste.to_dict(),
            PasteWebhook.Subscription.user_id == paste.user_id)
    db.session.commit()
    return redirect(url_for(".paste_GET",
        user=current_user.username, sha=paste.sha))

def _get_shebang(data):
    if not data.startswith('#!'):
        return None
    endline = data.find('\n')
    if endline == -1:
        shebang = data
    else:
        shebang = data[:endline]
    return shebang

def _get_lexer(name, data):
    try:
        return guess_lexer_for_filename(name, data)
    except pygments.util.ClassNotFound:
        try:
            shebang = _get_shebang(data)
            if not shebang:
                return TextLexer()

            return guess_lexer(shebang)
        except pygments.util.ClassNotFound:
            return TextLexer()

def highlight_file(name, content):
    lexer = _get_lexer(name, content)
    formatter = HtmlFormatter()
    style = formatter.get_style_defs('.highlight')
    html = f"<style>{style}</style>" + highlight(content, lexer, formatter)
    return Markup(html)

@public.route("/~<user>/<sha>")
def paste_GET(user, sha):
    user = User.query.filter(User.username == user).one_or_none()
    if not user:
        abort(404)
    paste = (Paste.query
            .filter(Paste.user_id == user.id)
            .filter(Paste.sha == sha)).one_or_none()
    if not paste:
        abort(404)
    return render_template("paste.html", paste=paste,
            highlight_file=highlight_file, stat=stat, humanize=humanize)

@public.route("/blob/<sha>")
def blob_GET(sha):
    blob = Blob.query.filter(Blob.sha == sha).one_or_none()
    if not blob:
        abort(404)
    return Response(blob.contents, mimetype="text/plain")

@public.route("/pastes")
@loginrequired
def pastes():
    pastes = (Paste.query
            .filter(Paste.user_id == current_user.id)
            .filter(Paste.sha != None))
    # TODO: Search
    pastes = pastes.order_by(Paste.updated.desc())
    pastes, pagination = paginate_query(pastes)
    return render_template("pastes.html", pastes=pastes,
            stat=stat, humanize=humanize, **pagination)