~tsileo/microblog.pub

59af633c6c795e6e94ed4c4bc67d4a6f65a6eced — Thomas Sileo 9 months ago c711096
Prefetch some notes when following an actor
4 files changed, 63 insertions(+), 3 deletions(-)

M app/activitypub.py
M app/actor.py
M app/boxes.py
M tests/utils.py
M app/activitypub.py => app/activitypub.py +8 -3
@@ 166,6 166,7 @@ async def parse_collection(  # noqa: C901
    url: str | None = None,
    payload: RawObject | None = None,
    level: int = 0,
    limit: int = 0,
) -> list[RawObject]:
    """Resolve/fetch a `Collection`/`OrderedCollection`."""
    if level > 3:


@@ 193,7 194,9 @@ async def parse_collection(  # noqa: C901
        if "first" in payload:
            if isinstance(payload["first"], str):
                out.extend(
                    await parse_collection(url=payload["first"], level=level + 1)
                    await parse_collection(
                        url=payload["first"], level=level + 1, limit=limit
                    )
                )
            else:
                if "orderedItems" in payload["first"]:


@@ 202,7 205,9 @@ async def parse_collection(  # noqa: C901
                    out.extend(payload["first"]["items"])
                n = payload["first"].get("next")
                if n:
                    out.extend(await parse_collection(url=n, level=level + 1))
                    out.extend(
                        await parse_collection(url=n, level=level + 1, limit=limit)
                    )
        return out

    while payload:


@@ 212,7 217,7 @@ async def parse_collection(  # noqa: C901
            if "items" in payload:
                out.extend(payload["items"])
            n = payload.get("next")
            if n is None:
            if n is None or (limit > 0 and len(out) >= limit):
                break
            payload = await fetch(n)
        else:

M app/actor.py => app/actor.py +4 -0
@@ 69,6 69,10 @@ class Actor:
        return self.ap_actor["inbox"]

    @property
    def outbox_url(self) -> str:
        return self.ap_actor["outbox"]

    @property
    def shared_inbox_url(self) -> str:
        return self.ap_actor.get("endpoints", {}).get("sharedInbox") or self.inbox_url


M app/boxes.py => app/boxes.py +38 -0
@@ 132,6 132,8 @@ async def send_like(db_session: AsyncSession, ap_object_id: str) -> None:
        raw_object = await ap.fetch(ap.get_id(ap_object_id))
        await save_object_to_inbox(db_session, raw_object)
        await db_session.commit()
        # XXX: we need to reload it as lazy-loading the actor will fail
        # (asyncio SQLAlchemy issue)
        inbox_object = await get_inbox_object_by_ap_id(db_session, ap_object_id)
        if not inbox_object:
            raise ValueError("Should never happen")


@@ 165,6 167,8 @@ async def send_announce(db_session: AsyncSession, ap_object_id: str) -> None:
        raw_object = await ap.fetch(ap.get_id(ap_object_id))
        await save_object_to_inbox(db_session, raw_object)
        await db_session.commit()
        # XXX: we need to reload it as lazy-loading the actor will fail
        # (asyncio SQLAlchemy issue)
        inbox_object = await get_inbox_object_by_ap_id(db_session, ap_object_id)
        if not inbox_object:
            raise ValueError("Should never happen")


@@ 1615,6 1619,9 @@ async def save_to_inbox(
                    inbox_object_id=inbox_object.id,
                )
                db_session.add(notif)
                if activity_ro.ap_type == "Accept":
                    # Pre-fetch the latest activities
                    await _prefetch_actor_outbox(db_session, actor)
            else:
                logger.info(
                    "Received an Accept for an unsupported activity: "


@@ 1750,10 1757,41 @@ async def save_to_inbox(
    await db_session.commit()


async def _prefetch_actor_outbox(
    db_session: AsyncSession,
    actor: models.Actor,
) -> None:
    """Try to fetch some notes to fill the stream"""
    saved = 0
    outbox = await ap.parse_collection(actor.outbox_url, limit=20)
    for activity in outbox:
        activity_id = ap.get_id(activity)
        raw_activity = await ap.fetch(activity_id)
        if ap.as_list(raw_activity["type"])[0] == "Create":
            obj = await ap.get_object(raw_activity)
            saved_inbox_object = await get_inbox_object_by_ap_id(
                db_session, ap.get_id(obj)
            )
            if not saved_inbox_object:
                saved_inbox_object = await save_object_to_inbox(db_session, obj)

            if not saved_inbox_object.in_reply_to:
                saved_inbox_object.is_hidden_from_stream = False
                saved += 1

        if saved >= 5:
            break

    # commit is performed by the called


async def save_object_to_inbox(
    db_session: AsyncSession,
    raw_object: ap.RawObject,
) -> models.InboxObject:
    """Used to save unknown object before intetacting with them, i.e. to like
    an object that was looked up, or prefill the inbox when an actor accepted
    a follow request."""
    obj_actor = await fetch_actor(db_session, ap.get_actor_id(raw_object))

    ro = RemoteObject(raw_object, actor=obj_actor)

M tests/utils.py => tests/utils.py +13 -0
@@ 7,6 7,7 @@ import fastapi
import httpx
import respx

from app import activitypub as ap
from app import actor
from app import httpsig
from app import models


@@ 45,6 46,18 @@ def setup_remote_actor(respx_mock: respx.MockRouter) -> actor.RemoteActor:
        username="toto",
        public_key="pk",
    )
    respx_mock.get(ra.ap_id + "/outbox").mock(
        return_value=httpx.Response(
            200,
            json={
                "@context": ap.AS_EXTENDED_CTX,
                "id": f"{ra.ap_id}/outbox",
                "type": "OrderedCollection",
                "totalItems": 0,
                "orderedItems": [],
            },
        )
    )
    respx_mock.get(ra.ap_id).mock(return_value=httpx.Response(200, json=ra.ap_actor))
    return ra