~fkfd/git.gmi

a4f8c8a25d1deadea90c44c0445fa902c26d6306 — Frederick Yin 1 year, 2 months ago b5b7457
Caching 101

Changes:
- Caching operations
- Mangled internal method names
3 files changed, 118 insertions(+), 33 deletions(-)

M README.md
M git-gmi/config.py
M git-gmi/git.py
M README.md => README.md +1 -0
@@ 19,6 19,7 @@ Dependencies:
- relatively new version of Python (3.8.3 personally)
- pygit2 (`pip install pygit2`)
- hurry.filesize (`pip install hurry.filesize`)
- dateutil (`pip install python-dateutil`)
- a gemini server capable of serving CGI

You need to edit the shebang of `git-gmi/cgi`:

M git-gmi/config.py => git-gmi/config.py +4 -0
@@ 2,6 2,10 @@
GIT_CATALOG = "/home/fakefred/p/gemini/repos/"
# which path leads to your cgi app after the URL's host part
CGI_PATH = "/git/cgi/"
# cache dir
CACHE_DIR = "/home/fakefred/Archive/_cache/"
# how long before cache expires, in seconds: int
CACHE_TTL = 120
# your site's display name
GIT_GMI_SITE_TITLE = "git.gmi demo instance"
# the "main" branch that git.gmi defaults to

M git-gmi/git.py => git-gmi/git.py +113 -33
@@ 1,6 1,10 @@
from pygit2 import *
from hurry.filesize import size, alternative
from datetime import datetime
from datetime import datetime, timedelta
import dateutil.parser
from pathlib import Path
import os
import shutil
import mimetypes
from const import *
from config import *


@@ 18,12 22,60 @@ class GitGmiRepo:
    def __init__(self, name: str, path: str):
        self.name = name
        self.path = path
        self.cache_dir = Path(CACHE_DIR) / name
        self._init_cache()
        try:
            self.repo = Repository(path)
        except GitError:
            raise FileNotFoundError(f"Error: no such repo: {name}")

    def generate_header(self):
    def _init_cache(self):
        try:
            os.mkdir(self.cache_dir)
        except FileExistsError:
            pass

    def _read_cache(self, req: list) -> str:
        # req is what the user requests after the repo name,
        # like ["tree", "master", "src"]
        # which points to a file called tree_master_src.gmi
        # file content:
        # 20 text/gemini
        # [body - page content]
        # [newline]
        # cached at:
        # [iso timestamp]
        fn = "_".join(req) + ".gmi"
        try:
            with open(self.cache_dir / fn) as f:
                response = f.read()
                f.close()
                created_at = dateutil.parser.isoparse(response.splitlines()[-1])
                if datetime.now() - created_at < timedelta(seconds=CACHE_TTL):
                    # cache valid
                    # response will include the timestamp
                    return response
        except FileNotFoundError:
            pass

        return None

    def _write_cache(self, req: list, resp: str):
        # write resp into cache, appended with timestamp
        fn = "_".join(req) + ".gmi"
        try:
            f = open(self.cache_dir / fn, "x")
        except FileExistsError:
            f = open(self.cache_dir / fn, "w")
        f.write(resp + "\ncached at:\n" + datetime.now().isoformat())

    def _flush_cache(self):
        try:
            shutil.rmtree(self.cache_dir)
        except FileNotFoundError:
            pass

    def _generate_header(self):
        # global "header" to display above all views (except raw files)
        header = (
            f"# {self.name}\n"


@@ 36,9 88,13 @@ class GitGmiRepo:
        return header

    def view_summary(self) -> str:
        response = f"{STATUS_SUCCESS} {META_GEMINI}\n" + self.generate_header()
        cached = self._read_cache(["summary"])
        if cached is not None:
            return cached

        response = f"{STATUS_SUCCESS} {META_GEMINI}\n" + self._generate_header()
        # show 3 recent commits
        recent_commits = self.get_commit_log()[:3]
        recent_commits = self._get_commit_log()[:3]
        for cmt in recent_commits:
            time = str(datetime.utcfromtimestamp(cmt["time"])) + " UTC"
            response += (


@@ 46,8 102,8 @@ class GitGmiRepo:
                f"{cmt['msg'].splitlines()[0]}\n\n"
            )  # TODO: link to commit view
        # find and display readme(.*)
        tree = self.get_tree(MAIN_BRANCH)
        trls = self.list_tree(tree)
        tree = self._get_tree(MAIN_BRANCH)
        trls = self._list_tree(tree)
        found_readme = False
        for item in trls:
            if (


@@ 62,9 118,12 @@ class GitGmiRepo:
                )
        if not found_readme:
            response += "## No readme found."

        self._write_cache(["summary"], response)

        return response

    def get_commit_log(self) -> list:
    def _get_commit_log(self) -> list:
        # returns useful info from commit log.
        repo = self.repo
        commits = list(repo.walk(repo[repo.head.target].id, GIT_SORT_TIME))


@@ 82,8 141,11 @@ class GitGmiRepo:
        return log  # reverse chronical order

    def view_log(self) -> str:
        response = f"{STATUS_SUCCESS} {META_GEMINI}\n" + self.generate_header()
        log = self.get_commit_log()
        cached = self._read_cache(["log"])
        if cached is not None:
            return cached
        response = f"{STATUS_SUCCESS} {META_GEMINI}\n" + self._generate_header()
        log = self._get_commit_log()
        for cmt in log:
            # looks like "2020-06-06 04:51:21 UTC"
            time = str(datetime.utcfromtimestamp(cmt["time"])) + " UTC"


@@ 93,9 155,10 @@ class GitGmiRepo:
                f"=> tree/{cmt['id']}/ view tree\n"
                f"{cmt['msg']}\n\n"
            )
        self._write_cache(["log"], response)
        return response

    def get_commit(self, commit_str) -> dict:
    def _get_commit(self, commit_str) -> dict:
        try:
            commit = self.repo.revparse_single(commit_str)
            diff = self.repo.diff(commit.parents[0], commit)


@@ 110,10 173,13 @@ class GitGmiRepo:
            raise FileNotFoundError(f"Error: no such commit: {commit_str}")

    def view_commit(self, commit_str) -> str:
        commit = self.get_commit(commit_str)
        cached = self._read_cache(["commit", commit_str])
        if cached is not None:
            return cached
        commit = self._get_commit(commit_str)
        response = (
            f"{STATUS_SUCCESS} {META_GEMINI}\n"
            + self.generate_header()
            + self._generate_header()
            + f"{commit['id']} - {commit['author']} - {commit['time']}\n"
            + commit["msg"]
            + "\n"


@@ 123,14 189,15 @@ class GitGmiRepo:
            + commit["patch"]
            + "\n```"
        )
        self._write_cache(["commit", commit_str], response)
        return response

    def view_raw_commit(self, commit_str) -> str:
        commit = self.get_commit(commit_str)
        commit = self._get_commit(commit_str)
        response = f"{STATUS_SUCCESS} {META_PLAINTEXT}\n" + commit["patch"]
        return response

    def get_refs(self) -> list:
    def _get_refs(self) -> list:
        refs = self.repo.listall_reference_objects()
        return [
            {


@@ 143,44 210,48 @@ class GitGmiRepo:
        ]

    def view_refs(self) -> str:
        response = f"{STATUS_SUCCESS} {META_GEMINI}\n" + self.generate_header()
        refs = self.get_refs()
        cached = self._read_cache(["refs"])
        if cached is not None:
            return cached
        response = f"{STATUS_SUCCESS} {META_GEMINI}\n" + self._generate_header()
        refs = self._get_refs()
        for ref in refs:
            # HACK: filter out refs with slashes as remote branches
            if ref["shorthand"].find("/") == -1:
                response += (
                    f"## {ref['shorthand']}\n=> tree/{ref['shorthand']}/ view tree\n\n"
                )
        self._write_cache(["refs"], response)
        return response

    @classmethod
    def parse_recursive_tree(cls, tree: Tree) -> list:
    def _parse_recursive_tree(cls, tree: Tree) -> list:
        # recursively replace all Trees with a list of Blobs inside it,
        # bundled with the Tree's name as a tuple,
        # e.g. [('src', [blob0, blob1]), otherblob].
        tree_list = list(tree)
        for idx, item in enumerate(tree_list):
            if isinstance(item, Tree):
                tree_list[idx] = (item.name, cls.parse_recursive_tree(tree_list[idx]))
                tree_list[idx] = (item.name, cls._parse_recursive_tree(tree_list[idx]))

        return tree_list

    def get_tree(self, revision_str: str) -> list:
    def _get_tree(self, revision_str: str) -> list:
        # returns a recursive list of Blob objects
        try:
            revision = self.repo.revparse_single(revision_str)
            if isinstance(revision, Commit):
                # top level tree; may contain sub-trees
                return self.parse_recursive_tree(revision.tree)
                return self._parse_recursive_tree(revision.tree)
            elif isinstance(revision, Tag):
                return self.parse_recursive_tree(revision.get_object().tree)
                return self._parse_recursive_tree(revision.get_object().tree)
        except ValueError:
            raise FileNotFoundError(f"Error: no such tree: {revision_str}")
            return None

    @staticmethod
    def list_tree(tree_list: list, location=[]) -> list:
        # tree_list is the output of parse_recursive_tree(<tree>);
    def _list_tree(tree_list: list, location=[]) -> list:
        # tree_list is the output of _parse_recursive_tree(<tree>);
        # location is which dir you are viewing, represented path-like
        # in a list, e.g. ['src', 'static', 'css'] => 'src/static/css',
        # which this method will cd into and display to the visitor.


@@ 225,12 296,16 @@ class GitGmiRepo:
    def view_tree(self, branch: str, location=[]) -> str:
        # actual Gemini response
        # consists of a header and a body
        tree = self.get_tree(branch)
        contents = self.list_tree(tree, location)
        cached = self._read_cache(["tree", branch] + location)
        if cached is not None:
            return cached

        tree = self._get_tree(branch)
        contents = self._list_tree(tree, location)
        items = len(contents)
        response = (
            f"{STATUS_SUCCESS} {META_GEMINI}\n"
            + self.generate_header()
            + self._generate_header()
            + f"## {self.name}{'/' if location else ''}{'/'.join(location)}/"
            f" | {items} {'items' if items > 1 else 'item'}\n\n"
        )


@@ 241,15 316,16 @@ class GitGmiRepo:
                )
            elif item["type"] == "file":
                response += f"=> {item['name']} {item['name']} | {convert_filesize(item['size'])}\n"
        self._write_cache(["tree", branch] + location, response)
        return response

    def get_blob(self, commit_str: str, location=[]) -> Blob:
    def _get_blob(self, commit_str: str, location=[]) -> Blob:
        # returns a specific Blob object
        # location: just like that of list_tree, but the last element
        # location: just like that of _list_tree, but the last element
        # is the filename
        try:
            tree = self.get_tree(commit_str)
            trls = self.list_tree(tree, location[:-1])
            tree = self._get_tree(commit_str)
            trls = self._list_tree(tree, location[:-1])
            for item in trls:
                if item["type"] == "file" and item["name"] == location[-1]:
                    return item["blob"]


@@ 258,19 334,23 @@ class GitGmiRepo:
            raise FileNotFoundError(f"Error: No such tree: {'/'.join(location[:-1])}")

    def view_blob(self, branch: str, location=[]) -> str:
        blob = self.get_blob(branch, location)
        cached = self._read_cache(["tree", branch] + location)
        if cached is not None:
            return cached
        blob = self._get_blob(branch, location)
        response = (
            f"{STATUS_SUCCESS} {META_GEMINI}\n"
            + self.generate_header()
            + self._generate_header()
            + f"## {self.name}/{'/'.join(location)} | {convert_filesize(blob.size)}\n\n"
            f"=> {blob.name}?raw view raw\n\n"
            f"```\n"
        )
        response += blob.data.decode("utf-8") + "\n```"
        self._write_cache(["tree", branch] + location, response)
        return response

    def view_raw_blob(self, branch: str, location=[]) -> str:
        blob = self.get_blob(branch, location)
        blob = self._get_blob(branch, location)
        # if mimetypes can't make out the type, set it to plaintext
        guessed_mimetype = mimetypes.guess_type(blob.name)[0] or META_PLAINTEXT
        response = f"{STATUS_SUCCESS} {guessed_mimetype}\n"