~tsileo/gemapi

a0e2c3ba666c0f985df120a9c86cff36e5916c6e — Thomas Sileo a month ago efe475b
Improve response handling
6 files changed, 105 insertions(+), 29 deletions(-)

M README.md
M gemapi/applications.py
M gemapi/cli.py
M gemapi/responses.py
M tests/app.py
M tests/test_app.py
M README.md => README.md +11 -0
@@ 26,6 26,7 @@ import asyncio
from gemapi.applications import Application
from gemapi.applications import Input
from gemapi.applications import Request
from gemapi.responses import NotFoundError
from gemapi.responses import Response

app = Application()


@@ 44,6 45,16 @@ async def index(req: Request) -> Response:

@app.route("/hello/{name:str}")
async def hello(req: Request, name: str) -> Response:
    if name == "not-found":
        raise NotFoundError("nope")

    return Response(
        status_code=StatusCode.SUCCESS,
        meta="text/gemini",
        body=f"Hello {name}",
    )


    return Response(
        status_code=20,
        meta="text/gemini",

M gemapi/applications.py => gemapi/applications.py +61 -28
@@ 7,10 7,13 @@ from loguru import logger
from gemapi.request import Input
from gemapi.request import Request
from gemapi.request import SensitiveInput
from gemapi.responses import BadRequestResponse
from gemapi.responses import InputResponse
from gemapi.responses import NotFoundResponse
from gemapi.responses import Response
from gemapi.responses import SensitiveInputResponse
from gemapi.responses import StatusError
from gemapi.responses import TemporaryFailureResponse
from gemapi.router import Router




@@ 36,33 39,65 @@ class Application:
        reader: asyncio.StreamReader,
        writer: asyncio.StreamWriter,
    ) -> None:
        data = await reader.read(1024)
        logger.info(data)

        client_host, client_port, *_ = writer.get_extra_info("peername")
        resp: Response

        try:
            data = await reader.read(1026)

            # Ensure it's a valid request
            # 'gemini://localhost/\r\n'
            # 1. ending with a <CR><LF>
            if not data.endswith(b"\r\n"):
                raise ValueError("Not ending with a CRLF")

            message = data.decode()

            parsed_url = urlparse(message[:-2])

        # Ensure it's a valid request
        # 'gemini://localhost/\r\n'
        # 1. ending with a <CR><LF>
        if not data.endswith(b"\r\n"):
            raise ValueError("Not ending with a CRLF")
            if parsed_url.scheme != "gemini":
                raise ValueError("Not a gemini URL")

        message = data.decode()
        parsed_url = urlparse(message[:-2])
        logger.info(f"{parsed_url}")
            if parsed_url.path == "":
                parsed_url = parsed_url._replace(path="/")

        if parsed_url.scheme != "gemini":
            raise ValueError("Not a gemini URL")
            if "." in parsed_url.path:
                raise ValueError("dots in path are not allowed")

        if parsed_url.path == "":
            parsed_url = parsed_url._replace(path="/")
        except Exception:
            logger.exception(f"{client_host}:{client_port} - 51")
            resp = BadRequestResponse("Bad request")

        req = Request(
            parsed_url=parsed_url,
            client_host=client_host,
            client_port=client_port,
        )
        else:
            req = Request(
                parsed_url=parsed_url,
                client_host=client_host,
                client_port=client_port,
            )

            try:
                resp = await self._process_request(req)
            except StatusError as status_error:
                resp = status_error.as_response()
            except Exception:
                logger.exception(f"{client_host}:{client_port} - 40")
                resp = TemporaryFailureResponse("Failed to process request")

            logger.info(
                f"{client_host}:{client_port} - "
                f"{req.parsed_url.geturl()} {resp.status_code}"
            )

        writer.write(resp.as_bytes())
        await writer.drain()
        writer.close()

        return

    async def _process_request(
        self,
        req: Request,
    ) -> Response:
        # Check if there's router registered for the hostname
        if req.parsed_url.netloc in self._hostnames:
            router = self._hostnames[req.parsed_url.netloc]


@@ 70,8 105,11 @@ class Application:
            # Or use the default router
            router = self._default_router

        # Select the router
        resp: Response
        matched_route, matched_params = router.match(req.parsed_url.path)

        # Build the response
        if not matched_route:
            resp = NotFoundResponse()
        else:


@@ 80,7 118,7 @@ class Application:

            handler_params: dict[str, Any] = {}
            handler_params.update(matched_params)
            if matched_route.input_parameter and not parsed_url.query:
            if matched_route.input_parameter and not req.parsed_url.query:
                if matched_route.input_parameter.annotation is Input:
                    resp = InputResponse(
                        matched_route.input_parameter.name,


@@ 97,7 135,7 @@ class Application:
            else:
                if matched_route.input_parameter:
                    handler_params[matched_route.input_parameter.name] = Input(
                        parsed_url.query
                        req.parsed_url.query
                    )
                # TODO: pass the path params wit the right type as kwargs
                if matched_route.handler_is_coroutine:


@@ 105,9 143,4 @@ class Application:
                else:
                    resp = matched_route.handler(req, **handler_params)

        writer.write(resp.as_bytes())
        await writer.drain()

        writer.close()

        return
        return resp

M gemapi/cli.py => gemapi/cli.py +1 -1
@@ 20,7 20,7 @@ def run(app: str) -> None:
    if not isinstance(application, Application):
        raise ValueError(f"{app} is not a valid app")

    asyncio.run(Server(application).run())
    asyncio.run(Server(application).run(), debug=True)


main.add_command(run)

M gemapi/responses.py => gemapi/responses.py +21 -0
@@ 1,4 1,5 @@
from enum import IntEnum
from typing import ClassVar


class StatusCode(IntEnum):


@@ 51,6 52,21 @@ class Response:
        return data.encode("utf-8")


class StatusError(Exception):

    STATUS_CODE: ClassVar[StatusCode]

    def __init__(self, meta: str) -> None:
        self.meta = meta

    def as_response(self) -> Response:
        return Response(self.STATUS_CODE, self.meta)


class NotFoundError(StatusError):
    STATUS_CODE = StatusCode.NOT_FOUND


class NotFoundResponse(Response):
    def __init__(self, meta: str = "Not found") -> None:
        super().__init__(StatusCode.NOT_FOUND, meta)


@@ 69,3 85,8 @@ class SensitiveInputResponse(Response):
class BadRequestResponse(Response):
    def __init__(self, meta: str) -> None:
        super().__init__(StatusCode.BAD_REQUEST, meta)


class TemporaryFailureResponse(Response):
    def __init__(self, meta: str) -> None:
        super().__init__(StatusCode.TEMPORARY_FAILURE, meta)

M tests/app.py => tests/app.py +4 -0
@@ 1,6 1,7 @@
from gemapi.applications import Application
from gemapi.applications import Input
from gemapi.applications import Request
from gemapi.responses import NotFoundError
from gemapi.responses import Response
from gemapi.responses import StatusCode



@@ 20,6 21,9 @@ async def index(req: Request) -> Response:

@app.route("/hello/{name:str}")
async def hello(req: Request, name: str) -> Response:
    if name == "not-found":
        raise NotFoundError("nope")

    return Response(
        status_code=StatusCode.SUCCESS,
        meta="text/gemini",

M tests/test_app.py => tests/test_app.py +7 -0
@@ 54,6 54,13 @@ def test_app__hello(test_application):
    assert response.data() == "Hello thomas"


def test_app__hello__not_found_error(test_application):
    response = ignition.request("//localhost/hello/not-found")

    assert response.status == "51"
    assert response.data() == "51 nope"


def test_app__input(test_application):
    response = ignition.request("//localhost/search")