~tsileo/microblog.pub

2bd6c98538d357a2e6d15679f448528b475f101f — Thomas Sileo 1 year, 8 months ago f13376d
Add OAuth 2.0 introspection endpoint
3 files changed, 41 insertions(+), 3 deletions(-)

M app/indieauth.py
M app/main.py
M app/micropub.py
M app/indieauth.py => app/indieauth.py +38 -0
@@ 41,6 41,7 @@ async def well_known_authorization_server(
        "revocation_endpoint": request.url_for("indieauth_revocation_endpoint"),
        "revocation_endpoint_auth_methods_supported": ["none"],
        "registration_endpoint": request.url_for("oauth_registration_endpoint"),
        "introspection_endpoint": request.url_for("oauth_introspection_endpoint"),
    }




@@ 378,6 379,8 @@ async def _check_access_token(
class AccessTokenInfo:
    scopes: list[str]
    client_id: str | None
    access_token: str
    exp: int


async def verify_access_token(


@@ 409,6 412,13 @@ async def verify_access_token(
            if access_token.indieauth_authorization_request
            else None
        ),
        access_token=access_token.access_token,
        exp=int(
            (
                access_token.created_at.replace(tzinfo=timezone.utc)
                + timedelta(seconds=access_token.expires_in)
            ).timestamp()
        ),
    )




@@ 434,6 444,13 @@ async def check_access_token(
            if access_token.indieauth_authorization_request
            else None
        ),
        access_token=access_token.access_token,
        exp=int(
            (
                access_token.created_at.replace(tzinfo=timezone.utc)
                + timedelta(seconds=access_token.expires_in)
            ).timestamp()
        ),
    )

    logger.info(


@@ 474,3 491,24 @@ async def indieauth_revocation_endpoint(
        content={},
        status_code=200,
    )


@router.post("/token_introspection")
async def oauth_introspection_endpoint(
    request: Request,
    access_token_info: AccessTokenInfo = Depends(enforce_access_token),
    token: str = Form(),
) -> JSONResponse:
    # Ensure the requested token is the same as bearer token
    if token != access_token_info.access_token:
        raise HTTPException(status_code=401, detail="access token required")

    return JSONResponse(
        content={
            "active": True,
            "client_id": access_token_info.client_id,
            "scope": " ".join(access_token_info.scopes),
            "exp": access_token_info.exp,
        },
        status_code=200,
    )

M app/main.py => app/main.py +1 -1
@@ 1696,7 1696,7 @@ async def _gen_rss_feed(
        fe.id(outbox_object.url)
        if outbox_object.name is not None:
            fe.title(outbox_object.name)
        elif not is_rss: # Atom feeds require a title
        elif not is_rss:  # Atom feeds require a title
            fe.title(outbox_object.url)

        fe.link(href=outbox_object.url)

M app/micropub.py => app/micropub.py +2 -2
@@ 132,7 132,7 @@ async def post_micropub_endpoint(
            h = form_data["h"]
        entry_type = f"h-{h}"

    logger.info(f"Creating {entry_type}")
    logger.info(f"Creating {entry_type=} with {access_token_info=}")

    if entry_type != "h-entry":
        return JSONResponse(


@@ 150,7 150,7 @@ async def post_micropub_endpoint(
    else:
        content = form_data["content"]

    public_id = await send_create(
    public_id, _ = await send_create(
        db_session,
        "Note",
        content,