~gg/toy-browser

6320dfec3fe9bb23900295680c009d1aca67109a — Gui Goncalves 3 years ago master
Initial commit
8 files changed, 587 insertions(+), 0 deletions(-)

A .gitignore
A TODO
A cache.py
A connection.py
A html.py
A test.html
A ui.py
A util.py
A  => .gitignore +1 -0
@@ 1,1 @@
/__pycache__

A  => TODO +27 -0
@@ 1,27 @@
# Chapter 1
- Yet another scheme is data, which allow inlining HTML content into the URL
	itself. Try navigating to data:text/html,Hello world! in a real browser to see
	what happens. Add support for this scheme to your browser. The data scheme is
	especially convenient for making testcases without having to put them in
	separate files.

-  In addition to HTTP and HTTP, there are other schemes, such as view-source;
	 navigating in a real browser to view-source:browser.engineering/http.html
	 shows the HTML source of this chapter rather than its rendered output. Add
	 support for the view-source scheme. Your browser should print the entire HTML
	 file as if it was text.  Hint: To do so, you can utilize the entities from
	 the previous exercise, and add an extra transform() method that adjusts the
	 input to show() when in view-source mode, like this: show(transform(body)).

- RFC 1945: Hypertext Transfer Protocol -- HTTP/1.0
	https://tools.ietf.org/html/rfc1945

- RFC 2616: Hypertext Transfer Protocol -- HTTP/1.1
	https://tools.ietf.org/html/rfc2616

- RFC 3986: Uniform Resource Identifier (URI): Generic Syntax
	https://tools.ietf.org/html/rfc3986

- RFC 8446: The Transport Layer Security (TLS) Protocol Version 1.3
	https://tools.ietf.org/html/rfc8446


A  => cache.py +46 -0
@@ 1,46 @@
import json
import os
from time import time

CACHE_DIR = "/tmp/foo-browser/cache"

# TODO store statuses
# FIXME no need to read the entire file to know if it's expired
def has(key):
    cache_file = get_cache_filename(key)
    if not os.path.exists(cache_file):
        return False
    with open(cache_file, "r") as fp:
        [exp, _] = json.load(fp)
        now = time()
        # TODO delete the cached file if it's expired
        return exp > now

def get(key):
    cache_file = get_cache_filename(key)
    with open(cache_file, "r") as fp:
        [_, body] = json.load(fp)
        return body

def set(key, body, max_age):
    now = time()
    exp = now + max_age
    cache_file = get_cache_filename(key)
    maybe_make_cache_dir()
    write_to_cache_file(cache_file, (exp, body))

def get_cache_filename(resource):
    normalised = normalise_resource(resource)
    return f"{CACHE_DIR}/{normalised}"

def normalise_resource(resource):
    return resource.replace("/", "_")

def maybe_make_cache_dir():
    if not os.path.exists(CACHE_DIR):
        os.makedirs(CACHE_DIR)

def write_to_cache_file(name, body):
    with open(name, "w+") as fp:
        json.dump(body, fp)


A  => connection.py +166 -0
@@ 1,166 @@
import gzip
import socket
import ssl

import util
import cache

def request(url, redirects=5, **kwargs):
    if cache.has(url):
        util.debug(f"Cache hit for {url}")
        return {}, cache.get(url)
    else:
        util.debug(f"Cache miss. Connecting to {url}")

    if url.startswith("/"):
        scheme = kwargs["scheme"]
        host = kwargs["host"]
        port = kwargs["port"]
        path = url
    else:
        scheme, host, port, path = parse_url(url)
    s = socket.socket(
        family=socket.AF_INET,
        type=socket.SOCK_STREAM,
        proto=socket.IPPROTO_TCP
    )
    s.connect((host, port))
    if scheme == "https":
        ctx = ssl.create_default_context()
        s = ctx.wrap_socket(s, server_hostname=host)
    s.send(get_request(host, path))

    response = s.makefile("rb", newline="\r\n")
    version, status, explanation = extract_statusline(response)
    headers = extract_headers(response)
    if status >= 300 and status <= 399:
        assert redirects > 0, "Redirect loop"
        location = headers["location"]
        util.debug(f"Redirecting to {location} ({redirects} left)")
        s.close()
        return request(
            location,
            redirects=redirects - 1,
            scheme=scheme,
            port=port,
            host=host
        )
    body = extract_body(response, headers)
    s.close()

    maybe_store_in_cache(url, headers, status, body, "GET")
    return headers, body

# TODO support view-source:
# TODO support data:
def parse_url(url):
    scheme, url = url.split("://", 1)
    assert scheme in ["http", "https"], f"Unknown scheme: {scheme}"
    try:
        host, path = url.split("/", 1)
        path = "/" + path
    except ValueError:
        host = url
        path = "/"
    port = 80 if scheme == "http" else 443
    if ":" in host:
        host, port = host.split(":", 1)
        port = int(port)
    return scheme, host, port, path

# TODO Make it easier to pass additional headers
def get_request(host, path):
    req = f"""GET {path} HTTP/1.1\r
Host: {host}\r
User-Agent: foo-browser\r
Accept-Encoding: gzip\r
Connection: close\r\n\r\n"""
    return req.encode()

def extract_statusline(response):
    statusline = response.readline().decode("ascii")
    version, code, explanation =  statusline.split(" ", 2)
    code = int(code)
    return version, code, explanation

def extract_headers(response):
    headers = {}
    while True:
        line = response.readline().decode("ascii")
        if line == "\r\n": break
        header, value = line.split(":", 1)
        headers[header.lower()] = value.strip()
    return headers

def extract_body(response, headers):
    encoding = headers.get("content-encoding")
    transfer_encoding = headers.get("transfer-encoding")
    cache_control = get_cache_control(headers)
    util.debug(cache_control)
    if transfer_encoding == "chunked":
        return handle_chunked_body(response, encoding)
    if encoding == "gzip":
        response = gzip.open(response)
    return response.read().decode()

def handle_chunked_body(response, encoding=None):
    body = read_chunked_body(response)
    if encoding == "gzip":
        return gzip.decompress(body).decode()
    return body.decode()

def read_chunked_body(response):
    result = b""
    while True:
        line = response.readline().strip(b"\r\n")
        if line == b"" or line == b"0":
            return result
        decoded = line.decode("ascii")
        chunk_size = int(decoded, base=16)
        content = response.read(chunk_size)
        result += content

def maybe_store_in_cache(resource, headers, status, body, verb):
    if should_keep_cache(headers, status, verb):
        store_in_cache(resource, body, get_max_age(headers))
    else:
        util.debug("Skipping cache")

def get_max_age(headers):
    for directive in get_cache_control(headers):
        if directive and directive[0] == "max-age":
            return directive[1]
    return 0

def should_keep_cache(headers, status, verb):
    cached_statuses = [200, 301, 404]
    return get_max_age(headers) > 0 and status in cached_statuses and verb == "GET"

def store_in_cache(resource, body, max_age):
    util.debug(f"Storing cache for {resource} (Max-Age: {max_age})")
    cache.set(resource, body, max_age)

def get_cache_control(headers):
    def parse_directive(directive):
        directive = directive.lower()
        if directive == "no-cache":
            return "no-cache"
        if directive.startswith("max-age"):
            key, value = directive.split("=", 1)
            return key.lower(), int(value)

    header = headers.get("cache-control", "")
    return [parse_directive(directive) for directive in header.split(sep=",")]

if __name__ == "__main__":
    import sys
    import html
    if len(sys.argv) > 1:
        url = sys.argv[1]
    else:
        # url = "https://example.org"
        url = "http://browser.engineering/redirect"
        # url = "view-source:browser.engineering/http.html"
    headers, body = request(url)
    print(html.HTMLParser(body).parse())


A  => html.py +139 -0
@@ 1,139 @@
class Text:
    def __init__(self, text, parent):
        self.text = text
        self.children = []
        self.parent = parent

    def __repr__(self):
        return repr(self.text)

class Element:
    def __init__(self, tag, parent, attributes={}):
        self.tag = tag
        self.children = []
        self.parent = parent
        self.attrs = attributes

    def __repr__(self):
        attributes = ""
        for key, value in self.attrs.items():
            attributes += f"{key}={value} "
        return f"<{self.tag} {attributes}>"

class HTMLParser:
    HEAD_TAGS = [
        "base", "basefont", "bgsound", "noscript",
        "link", "meta", "title", "style", "script",
    ]
    SELF_CLOSING_TAGS = [
        "area", "base", "br", "col", "embed", "hr", "img", "input",
        "link", "meta", "param", "source", "track", "wbr",
    ]

    def __init__(self, body):
        self.body = body
        self.unfinished = []

    def parse(self):
        in_tag = False
        text = ""
        for c in self.body:
            if c == "<":
                in_tag = True
                if text: self.add_text(text)
                text = ""
            elif c == ">" and in_tag:
                in_tag = False
                tag, *attrs = text.lower().split(" ", 1)
                attrs_dict = self.parse_attrs(attrs)
                self.add_tag(tag, attrs_dict)
                text = ""
            else:
                text += c
        if not in_tag and text:
            self.add_text(text)
        return self.finish()

    def parse_attrs(self, attrs):
        if attrs == []: return {}
        attrs = attrs[0]
        result = {}
        # FIXME this doesn't handle whitespace inside attrs
        # FIXME attrs can be separated by ANY whitespace, not just ' '
        for pair in attrs.split(" "):
            try:
                key, value = pair.split("=", 1)
                if len(value) > 2 and value[0] in ["'", '"']:
                    value = value[1:-1]
                result[key] = value
            except ValueError:
                key = pair
                result[key] = True
        return result

    def add_text(self, text):
        if text.isspace(): return
        self.implicit_tags(None)
        parent = self.unfinished[-1]
        node = Text(text.strip(), parent)
        parent.children.append(node)

    def add_tag(self, text, attrs):
        if text.startswith("!"): return
        self.implicit_tags(text)
        if self.is_close_tag(text):
            if len(self.unfinished) == 1: return
            node = self.unfinished.pop()
            parent = self.unfinished[-1]
            parent.children.append(node)
        elif text in HTMLParser.SELF_CLOSING_TAGS:
            parent = self.unfinished[-1]
            node = Element(text, parent, attrs)
            parent.children.append(node)
        else:
            parent = self.unfinished[-1] if self.unfinished else None
            node = Element(text, parent, attrs)
            self.unfinished.append(node)

    def is_close_tag(self, text):
        return text.startswith("/")

    def finish(self):
        while self.unfinished:
            node = self.unfinished.pop()
            if not self.unfinished: return node
            parent = self.unfinished[-1]
            parent.children.append(node)

    def implicit_tags(self, tag):
        while True:
            open_tags = [node.tag for node in self.unfinished]
            if open_tags == [] and tag != "html":
                self.add_tag("html")
            elif open_tags == ["html"] \
                and tag not in ["/html", "head", "body"]:
                if tag in HTMLParser.HEAD_TAGS:
                    self.add_tag("head")
                else:
                    self.add_tag("body")
            elif open_tags == ["html", "head"] and tag != "/head" \
                and tag not in HTMLParser.HEAD_TAGS:
                self.add_tag("/head")
            else:
                break

def print_tree(node, indent=0):
    print(" " * indent, node)
    for child in node.children:
        print_tree(child, indent + 2)

if __name__ == "__main__":
    import connection
    #     with open("test.html", "r") as fp:
    # body = fp.read()
    url = "http://browser.engineering/html.html"
    _, body = connection.request(url)
    parser = HTMLParser(body)
    tree = parser.parse()
    print_tree(tree)


A  => test.html +21 -0
@@ 1,21 @@
<!doctype html>
<html>
  <head>
    <title>This is my title</title>
  </head>
  <body>
    <h1>Hello, world!</h1>
    <p>This is a paragraph. It may span multiple lines.</p>
    <p>It contains <b>bold</b> text. It might also contain <i>italics</i>.</p>
    <small><p>Text could be tiny...</small>
    <big><p>Text could also be huge!</p></big>
    <p><small>a</small><big>A</big></p>
    <p>
    Some<br>
    Paragraphs<br />
    Support<br/>
    Line breaks!
    </p>
  </body>
</html>


A  => ui.py +182 -0
@@ 1,182 @@
import time
import tkinter
import tkinter.font

import util
import html

SCROLL_STEP = 100
WIDTH, HEIGHT = 800, 600
HSTEP, VSTEP = 13, 18

class Layout:
    def __init__(self, tokens):
        self.display_list = []
        self.line = []
        self.x = HSTEP
        self.y = VSTEP
        self.weight = "normal"
        self.style = "roman"
        self.size = 14
        self.tokens = tokens

    def calculate(self):
        # TODO Create decorator for this kind of profiling
        now = time.time()
        body = self.find_body(self.tokens)
        ellapsed = time.time() - now
        util.debug(f"find_body took {ellapsed} s")
        self.recurse(body)
        self.flush()

    def find_body(self, tree):
        if isinstance(tree, html.Element) and tree.tag == "body":
            return tree
        for child in tree.children:
            body = self.find_body(child)
            if body: return body

    def recurse(self, tree):
        if isinstance(tree, html.Text):
            self.handle_text(tree.text)
        else:
            self.open(tree.tag)
            for child in tree.children:
                self.recurse(child)
            self.close(tree.tag)

    def handle_text(self, text):
        font = tkinter.font.Font(
            family="Noto Serif",
            size=self.size,
            weight=self.weight,
            slant=self.style
        )
        for word in text.split():
            w = font.measure(word)
            self.line.append((self.x, word, font))
            if self.x + w >= WIDTH - HSTEP:
                self.flush()
            self.x += w + font.measure(" ")

    def flush(self):
        if not self.line: return
        metrics = [font.metrics() for x, word, font in self.line]
        max_ascent = max([metric["ascent"] for metric in metrics])
        baseline = self.y + 1.2 * max_ascent
        for x, word, font in self.line:
            y = baseline - font.metrics("ascent")
            self.display_list.append((x, y, word, font))
        self.x = HSTEP
        self.line = []
        max_descent = max([metric["descent"] for metric in metrics])
        self.y = baseline + 1.2 * max_descent

    def open(self, tag):
        if tag == "i":
            self.style = "italic"
        elif tag == "b":
            self.weight = "bold"
        elif tag == "small":
            self.size -= 2
        elif tag == "big":
            self.size += 2
        elif tag == "br" or tag == "br/":
            self.flush()


    def close(self, tag):
        if tag == "i":
            self.style = "roman"
        elif tag == "b":
            self.weight = "normal"
        elif tag == "big":
            self.size -= 2
        elif tag == "small":
            self.size += 2
        elif tag == "p":
            self.flush()
            self.y += VSTEP


class Browser:
    def __init__(self):
        self.body = None
        self.scroll = 0
        self.window = tkinter.Tk()
        self.canvas = tkinter.Canvas(
            self.window,
            width=WIDTH,
            height=HEIGHT
        )
        self.canvas.pack(fill="both", expand=1)
        self._bind_events()

    def _bind_events(self):
        self.window.bind("<Configure>", self.on_configure)
        self.window.bind("<Up>", self.scroll_up)
        self.window.bind("<Down>", self.scroll_down)
        self.window.bind("<MouseWheel>", self.on_mouse_wheel)
        self.window.bind("q", self.browser_quit)

    # FIXME no need to reinstantiate Layout, I think
    def layout(self, tokens):
        now = time.time()
        layout = Layout(tokens)
        layout.calculate()
        self.display_list = layout.display_list
        elapsed = time.time() - now
        util.debug(f"LAYOUT\t|\t{elapsed}")
        self.render()

    def render(self):
        now = time.time()
        self.canvas.delete("all")
        for x, y, w, font in self.display_list:
            if y > self.scroll + HEIGHT: continue
            if y + VSTEP < self.scroll: continue
            top = y - self.scroll
            self.canvas.create_text(x, top, text=w, font=font, anchor="nw")
        elapsed = time.time() - now
        util.debug(f"RENDER\t|\t{elapsed}")

    def on_configure(self, e):
        global WIDTH, HEIGHT
        if WIDTH != e.width or HEIGHT != e.height:
            WIDTH = e.width
            HEIGHT = e.height
            self.layout(self.body)

    def on_mouse_wheel(self, e):
        util.debug(f"Scroll by {e.delta}")

    def scroll_up(self, e):
        self.scroll -= SCROLL_STEP
        if self.scroll < 0:
            self.scroll = 0
        self.render()

    def scroll_down(self, e):
        self.scroll += SCROLL_STEP
        self.render()

    def browser_quit(self, e):
        self.window.destroy()


if __name__ == "__main__":
    import connection

    browser = Browser()

    # url = "http://example.org"
    # url = "https://www.zggdwx.com/xiyou/1.html"
    # url = "http://browser.engineering/text.html"
    # _, body = connection.request(url)
    with open("test.html", "r") as fp:
        body = fp.read()
    parsed_body = html.HTMLParser(body).parse()
    browser.body = parsed_body

    tkinter.mainloop()


A  => util.py +5 -0
@@ 1,5 @@
import sys

def debug(msg):
    print(msg, file=sys.stderr)