~nhanb/mcross

ref: b26c4f1a14ae45daad956c64f42fb6dd47ff8886 mcross/src/mcross/gui/controller.py -rw-r--r-- 6.8 KiB
b26c4f1aBùi Thành Nhân bring back Windows support 1 year, 1 month ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import logging
import threading
import time
import traceback
from ssl import SSLCertVerificationError
from tkinter import READABLE, Tk, messagebox

import curio

from ..transport import (MAX_REQUEST_SECONDS, GeminiUrl,
                         NonAbsoluteUrlWithoutContextError,
                         UnsupportedProtocolError, get)
from .model import Model
from .view import WAITING_CURSOR, View

statusbar_logger = logging.getLogger("statusbar")


class Controller:
    def __init__(self):
        self.root = Tk()
        self.root.alt_shortcuts = set()
        self.model = Model()
        self.view = View(self.root, self.model)
        self.root.title("McRoss Browser")
        self.root.geometry("800x600")

        self.gui_ops = curio.UniversalQueue(withfd=True)
        self.coro_ops = curio.UniversalQueue()

        # When in the middle of an action, this flag is set to False to prevent user
        # from clicking other random stuff:
        self.allow_user_interaction = True

        def put_coro_op(func):
            def inner(*args):
                if self.allow_user_interaction:
                    self.coro_ops.put(self.show_waiting_cursor_during_task(func, *args))

            return inner

        # Make sure queued GUI operations are executed as soon as they become available
        if hasattr(self.root, "createfilehandler"):
            self.root.createfilehandler(self.gui_ops, READABLE, self.process_gui_ops)
        else:
            print("Running poll-based workaround for Windows.")
            # This is way more inefficient (5% CPU usage on a Surface Go at idle) but
            # hey it's better than not working at all!

            def after_cb():
                self.process_gui_ops()
                self.root.after(10, after_cb)

            self.root.after(10, after_cb)

        self.view.go_callback = put_coro_op(self.go_callback)
        self.view.link_click_callback = put_coro_op(self.link_click_callback)
        self.view.back_callback = put_coro_op(self.back_callback)
        self.view.forward_callback = put_coro_op(self.forward_callback)

    async def main(self):
        while True:
            coro = await self.coro_ops.get()
            await coro

    def run(self):
        threading.Thread(target=curio.run, args=(self.main,), daemon=True).start()
        self.root.mainloop()

    async def put_gui_op(self, func, *args, **kwargs):
        await self.gui_ops.put((func, args, kwargs))

    def process_gui_ops(self, *args):
        while not self.gui_ops.empty():
            func, args, kwargs = self.gui_ops.get()
            func(*args, **kwargs)

    async def show_waiting_cursor_during_task(self, func, *args):
        async def show():
            self.view.text.config(cursor=WAITING_CURSOR)
            self.root.config(cursor=WAITING_CURSOR)
            self.view.allow_changing_cursor = False

        async def hide():
            self.view.text.config(cursor="xterm")
            self.root.config(cursor="arrow")
            self.view.allow_changing_cursor = True

        await show()
        self.allow_user_interaction = False

        try:
            await func(*args)
        except Exception:
            # a catch-all here so that our show_waiting...() coroutine can be yielded
            traceback.print_exc()

        self.allow_user_interaction = True
        await hide()

    async def go_callback(self, url: str):
        url = GeminiUrl.parse_absolute_url(url)
        await self.visit_link(url)

    async def link_click_callback(self, raw_url):
        try:
            url = GeminiUrl.parse(raw_url, self.model.history.get_current_url())
            await self.visit_link(url)
        except NonAbsoluteUrlWithoutContextError:
            await self.put_gui_op(
                messagebox.showwarning,
                "Ambiguous link",
                "Cannot visit relative urls without a current_url context",
            )
        except UnsupportedProtocolError as e:
            await self.put_gui_op(
                messagebox.showinfo,
                "Unsupported protocol",
                f"{e} links are unsupported (yet?)",
            )
        except SSLCertVerificationError:
            await self.put_gui_op(
                messagebox.showerror,
                "Invalid server certificate",
                "Server is NOT using a valid CA-approved TLS certificate.",
            )

    async def visit_link(self, url: GeminiUrl):
        try:
            resp = await self.load_page(url)
            self.model.history.visit(resp.url)
            await self.put_gui_op(self.view.render_page)

        except curio.errors.TaskTimeout:
            await self.put_gui_op(
                statusbar_logger.info,
                f"Request timed out: {MAX_REQUEST_SECONDS}s",
            )
            await self.put_gui_op(
                messagebox.showwarning,
                "Request timed out",
                f"Request to {url.without_protocol()} took longer than {MAX_REQUEST_SECONDS}s",
            )

        except (ConnectionError, OSError) as e:
            await self.put_gui_op(statusbar_logger.info, str(e))
            raise

    async def back_callback(self):
        self.model.history.go_back()
        await self.load_page(self.model.history.get_current_url())
        await self.put_gui_op(self.view.render_page)

    async def forward_callback(self):
        self.model.history.go_forward()
        await self.load_page(self.model.history.get_current_url())
        self.view.render_page()

    async def load_page(self, url: GeminiUrl):
        await self.put_gui_op(statusbar_logger.info, f"Requesting {url}...")
        start = time.time()
        resp = await get(url)
        request_time = time.time() - start
        await self.put_gui_op(
            statusbar_logger.info,
            f"{resp.status} {resp.meta} (took {request_time:.2f}s)",
        )

        # Support whatever encoding that python supports
        body_string = ""
        if resp.body and resp.charset:
            try:
                body_string = resp.body.decode(resp.charset)
            except LookupError:
                await self.put_gui_op(
                    self.model.update_content,
                    "\n".join(
                        [
                            "Error:",
                            f"{resp.status} {resp.meta}",
                            f"Unsupported charset: {resp.charset}",
                        ]
                    ),
                    "text/plain",
                )
                return resp

        # Sucessfully decoded body string!
        if resp.status.startswith("2"):
            await self.put_gui_op(
                self.model.update_content, body_string, resp.mime_type
            )
        else:
            await self.put_gui_op(
                self.model.update_content,
                f"Error:\n{resp.status} {resp.meta}\n{body_string}",
                "text/plain",
            )
        return resp