~vpzom/lotide

8265980733eff941be5399f9211a1919973467aa — Colin Reeder 10 months ago 1b0b1e5 + 119790a
Merge remote-tracking branch 'origin/new-ingest'
8 files changed, 1197 insertions(+), 1040 deletions(-)

A src/apub_util/ingest.rs
R src/{apub_util.rs => apub_util/mod.rs}
M src/main.rs
M src/routes/api/mod.rs
M src/routes/apub/communities.rs
M src/routes/apub/mod.rs
M src/tasks.rs
M src/worker.rs
A src/apub_util/ingest.rs => src/apub_util/ingest.rs +1062 -0
@@ 0,0 1,1062 @@
use super::{KnownObject, Verified};
use crate::{CommentLocalID, CommunityLocalID, PostLocalID, ThingLocalRef, UserLocalID};
use activitystreams::prelude::*;
use std::borrow::Cow;
use std::future::Future;
use std::ops::Deref;
use std::sync::Arc;

pub enum FoundFrom {
    Announce {
        url: url::Url,
        community_local_id: CommunityLocalID,
        community_is_local: bool,
    },
    Other,
}

impl FoundFrom {
    pub fn as_announce(&self) -> Option<&url::Url> {
        match self {
            FoundFrom::Announce { url, .. } => Some(url),
            _ => None,
        }
    }
}

pub enum IngestResult {
    Actor(super::ActorLocalInfo),
    Other(crate::ThingLocalRef),
}

impl IngestResult {
    pub fn into_ref(self) -> crate::ThingLocalRef {
        match self {
            IngestResult::Actor(info) => info.as_ref(),
            IngestResult::Other(x) => x,
        }
    }
}

pub async fn ingest_object(
    object: Verified<KnownObject>,
    found_from: FoundFrom,
    ctx: Arc<crate::BaseContext>,
) -> Result<Option<IngestResult>, crate::Error> {
    let db = ctx.db_pool.get().await?;
    match object.into_inner() {
        KnownObject::Accept(activity) => {
            let activity_id = activity
                .id_unchecked()
                .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

            let actor_ap_id = activity
                .actor_unchecked()
                .as_single_id()
                .ok_or(crate::Error::InternalStrStatic("Missing actor for Accept"))?;

            crate::apub_util::require_containment(activity_id, actor_ap_id)?;

            let actor_ap_id = actor_ap_id.as_str();

            let community_local_id: Option<CommunityLocalID> = {
                db.query_opt("SELECT id FROM community WHERE ap_id=$1", &[&actor_ap_id])
                    .await?
                    .map(|row| CommunityLocalID(row.get(0)))
            };

            if let Some(community_local_id) = community_local_id {
                let object_id = activity
                    .object()
                    .as_single_id()
                    .ok_or(crate::Error::InternalStrStatic("Missing object for Accept"))?;

                if let Some(remaining) =
                    crate::apub_util::try_strip_host(&object_id, &ctx.host_url_apub)
                {
                    if remaining.starts_with("/communities/") {
                        let remaining = &remaining[13..];
                        let next_expected = format!("{}/followers/", community_local_id);
                        if remaining.starts_with(&next_expected) {
                            let remaining = &remaining[next_expected.len()..];
                            let follower_local_id: UserLocalID = remaining.parse()?;

                            db.execute(
                                "UPDATE community_follow SET accepted=TRUE WHERE community=$1 AND follower=$2",
                                &[&community_local_id, &follower_local_id],
                            ).await?;
                        }
                    }
                }
            }

            Ok(None)
        }
        KnownObject::Announce(activity) => {
            let activity_id = activity
                .id_unchecked()
                .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

            let community_ap_id = activity.actor_unchecked().as_single_id().ok_or(
                crate::Error::InternalStrStatic("Missing actor for Announce"),
            )?;

            let community_local_info = db
                .query_opt(
                    "SELECT id, local FROM community WHERE ap_id=$1",
                    &[&community_ap_id.as_str()],
                )
                .await?
                .map(|row| (CommunityLocalID(row.get(0)), row.get(1)));

            if let Some((community_local_id, community_is_local)) = community_local_info {
                crate::apub_util::require_containment(activity_id, community_ap_id)?;

                let object_id = activity.object().as_single_id();

                if let Some(object_id) = object_id {
                    if let Some(remaining) =
                        crate::apub_util::try_strip_host(&object_id, &ctx.host_url_apub)
                    {
                        if remaining.starts_with("/posts/") {
                            let remaining = &remaining[7..];
                            if let Ok(local_post_id) = remaining.parse::<PostLocalID>() {
                                db.execute(
                                    "UPDATE post SET approved=TRUE, approved_ap_id=$1 WHERE id=$2 AND community=$3",
                                    &[&activity_id.as_str(), &local_post_id, &community_local_id],
                                ).await?;
                            }
                        }
                    } else {
                        // don't need announces for local objects
                        let obj =
                            crate::apub_util::fetch_ap_object(object_id, &ctx.http_client).await?;

                        ingest_object_boxed(
                            obj,
                            FoundFrom::Announce {
                                url: activity_id.clone(),
                                community_local_id,
                                community_is_local,
                            },
                            ctx,
                        )
                        .await?;
                    }
                }
            }
            Ok(None)
        }
        KnownObject::Article(obj) => {
            ingest_postlike(Verified(KnownObject::Article(obj)), found_from, ctx).await
        }
        KnownObject::Create(activity) => {
            ingest_create(Verified(activity), ctx).await?;
            Ok(None)
        }
        KnownObject::Delete(activity) => {
            ingest_delete(Verified(activity), ctx).await?;
            Ok(None)
        }
        KnownObject::Follow(follow) => {
            let follow = Verified(follow);
            let follower_ap_id = follow.actor_unchecked().as_single_id();
            let target = follow.object().as_single_id();

            if let Some(follower_ap_id) = follower_ap_id {
                let activity_ap_id = follow
                    .id_unchecked()
                    .ok_or(crate::Error::InternalStrStatic("Missing activitity ID"))?;

                crate::apub_util::require_containment(activity_ap_id, follower_ap_id)?;
                let follow = crate::apub_util::Contained(Cow::Borrowed(&follow));

                let follower_local_id =
                    crate::apub_util::get_or_fetch_user_local_id(follower_ap_id, &db, &ctx).await?;

                if let Some(target) = target {
                    if let Some(community_id) =
                        super::maybe_get_local_community_id_from_uri(target, &ctx.host_url_apub)
                    {
                        let row = db
                            .query_opt("SELECT local FROM community WHERE id=$1", &[&community_id])
                            .await?;
                        if let Some(row) = row {
                            let local: bool = row.get(0);
                            if local {
                                db.execute("INSERT INTO community_follow (community, follower, local, ap_id, accepted) VALUES ($1, $2, FALSE, $3, TRUE) ON CONFLICT (community, follower) DO NOTHING", &[&community_id, &follower_local_id, &activity_ap_id.as_str()]).await?;

                                crate::apub_util::spawn_enqueue_send_community_follow_accept(
                                    community_id,
                                    follower_local_id,
                                    follow.with_owned(),
                                    ctx,
                                );
                            }
                        } else {
                            eprintln!("Warning: recieved follow for unknown community");
                        }
                    }
                }
            }

            Ok(None)
        }
        KnownObject::Group(group) => {
            let ap_id = group
                .id_unchecked()
                .ok_or(crate::Error::InternalStrStatic("Missing ID in Group"))?;

            let name = group
                .preferred_username()
                .or_else(|| {
                    group
                        .name()
                        .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
                })
                .unwrap_or("");
            let description = group
                .summary()
                .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
                .unwrap_or("");
            let inbox = group.inbox_unchecked().as_str();
            let shared_inbox = group
                .endpoints_unchecked()
                .and_then(|endpoints| endpoints.shared_inbox)
                .map(|url| url.as_str());
            let public_key = group
                .ext_one
                .public_key
                .as_ref()
                .map(|key| key.public_key_pem.as_bytes());
            let public_key_sigalg = group
                .ext_one
                .public_key
                .as_ref()
                .and_then(|key| key.signature_algorithm.as_deref());

            let id = CommunityLocalID(db.query_one(
                "INSERT INTO community (name, local, ap_id, ap_inbox, ap_shared_inbox, public_key, public_key_sigalg, description) VALUES ($1, FALSE, $2, $3, $4, $5, $6, $7) ON CONFLICT (ap_id) DO UPDATE SET ap_inbox=$3, ap_shared_inbox=$4, public_key=$5, public_key_sigalg=$6, description=$7 RETURNING id",
                &[&name, &ap_id.as_str(), &inbox, &shared_inbox, &public_key, &public_key_sigalg, &description],
            ).await?.get(0));

            Ok(Some(IngestResult::Actor(
                super::ActorLocalInfo::Community {
                    id,
                    public_key: public_key.map(|key| super::PubKeyInfo {
                        algorithm: super::get_message_digest(public_key_sigalg),
                        key: key.to_owned(),
                    }),
                },
            )))
        }
        KnownObject::Image(obj) => {
            ingest_postlike(Verified(KnownObject::Image(obj)), found_from, ctx).await
        }
        KnownObject::Like(activity) => {
            ingest_like(Verified(activity), ctx).await?;
            Ok(None)
        }
        KnownObject::Note(obj) => {
            ingest_postlike(Verified(KnownObject::Note(obj)), found_from, ctx).await
        }
        KnownObject::Page(obj) => {
            ingest_postlike(Verified(KnownObject::Page(obj)), found_from, ctx).await
        }
        KnownObject::Person(person) => {
            let ap_id = person
                .id_unchecked()
                .ok_or(crate::Error::InternalStrStatic("Missing ID in Person"))?;

            let username = person
                .preferred_username()
                .or_else(|| {
                    person
                        .name()
                        .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
                })
                .unwrap_or("");
            let inbox = person.inbox_unchecked().as_str();
            let shared_inbox = person
                .endpoints_unchecked()
                .and_then(|endpoints| endpoints.shared_inbox)
                .map(|url| url.as_str());
            let public_key = person
                .ext_one
                .public_key
                .as_ref()
                .map(|key| key.public_key_pem.as_bytes());
            let public_key_sigalg = person
                .ext_one
                .public_key
                .as_ref()
                .and_then(|key| key.signature_algorithm.as_deref());
            let description = person
                .summary()
                .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
                .unwrap_or("");

            let avatar = person.icon().and_then(|icon| {
                icon.iter()
                    .filter_map(|icon| {
                        if icon.kind_str() == Some("Image") {
                            match activitystreams::object::Image::from_any_base(icon.clone()) {
                                Err(_) | Ok(None) => None,
                                Ok(Some(icon)) => Some(icon),
                            }
                        } else {
                            None
                        }
                    })
                    .next()
            });
            let avatar = avatar
                .as_ref()
                .and_then(|icon| icon.url().and_then(|url| url.as_single_id()))
                .map(|x| x.as_str());

            let id = UserLocalID(db.query_one(
                "INSERT INTO person (username, local, created_local, ap_id, ap_inbox, ap_shared_inbox, public_key, public_key_sigalg, description, avatar) VALUES ($1, FALSE, localtimestamp, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (ap_id) DO UPDATE SET ap_inbox=$3, ap_shared_inbox=$4, public_key=$5, public_key_sigalg=$6, description=$7, avatar=$8 RETURNING id",
                &[&username, &ap_id.as_str(), &inbox, &shared_inbox, &public_key, &public_key_sigalg, &description, &avatar],
            ).await?.get(0));

            Ok(Some(IngestResult::Actor(super::ActorLocalInfo::User {
                id,
                public_key: public_key.map(|key| super::PubKeyInfo {
                    algorithm: super::get_message_digest(public_key_sigalg),
                    key: key.to_owned(),
                }),
            })))
        }
        KnownObject::Undo(activity) => {
            ingest_undo(Verified(activity), ctx).await?;
            Ok(None)
        }
        KnownObject::Update(activity) => {
            let activity_id = activity
                .id_unchecked()
                .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

            let object_id =
                activity
                    .object()
                    .as_single_id()
                    .ok_or(crate::Error::InternalStrStatic(
                        "Missing object ID for Update",
                    ))?;

            crate::apub_util::require_containment(activity_id, object_id)?;

            let object_id = object_id.clone();

            crate::spawn_task(async move {
                let row = db
                    .query_opt(
                        "SELECT 1 FROM community WHERE ap_id=$1 LIMIT 1",
                        &[&object_id.as_str()],
                    )
                    .await?;
                if row.is_some() {
                    ctx.enqueue_task(&crate::tasks::FetchActor {
                        actor_ap_id: Cow::Owned(object_id),
                    })
                    .await?;
                }

                Ok(())
            });

            Ok(None)
        }
    }
}

pub fn ingest_object_boxed(
    object: Verified<KnownObject>,
    found_from: FoundFrom,
    ctx: Arc<crate::BaseContext>,
) -> std::pin::Pin<Box<dyn Future<Output = Result<Option<IngestResult>, crate::Error>> + Send>> {
    Box::pin(ingest_object(object, found_from, ctx))
}

pub async fn ingest_like(
    activity: Verified<activitystreams::activity::Like>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let db = ctx.db_pool.get().await?;

    let activity_id = activity
        .id_unchecked()
        .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

    if let Some(actor_id) = activity.actor_unchecked().as_single_id() {
        super::require_containment(activity_id, actor_id)?;

        let actor_local_id = super::get_or_fetch_user_local_id(actor_id, &db, &ctx).await?;

        if let Some(object_id) = activity.object().as_single_id() {
            let thing_local_ref = if let Some(remaining) =
                super::try_strip_host(&object_id, &ctx.host_url_apub)
            {
                if remaining.starts_with("/posts/") {
                    if let Ok(local_post_id) = remaining[7..].parse() {
                        Some(ThingLocalRef::Post(local_post_id))
                    } else {
                        None
                    }
                } else if remaining.starts_with("/comments/") {
                    if let Ok(local_comment_id) = remaining[10..].parse() {
                        Some(ThingLocalRef::Comment(local_comment_id))
                    } else {
                        None
                    }
                } else {
                    None
                }
            } else {
                let row = db.query_opt(
                    "(SELECT TRUE, id FROM post WHERE ap_id=$1) UNION ALL (SELECT FALSE, id FROM reply WHERE ap_id=$1) LIMIT 1",
                    &[&object_id.as_str()],
                ).await?;

                if let Some(row) = row {
                    Some(if row.get(0) {
                        ThingLocalRef::Post(PostLocalID(row.get(1)))
                    } else {
                        ThingLocalRef::Comment(CommentLocalID(row.get(1)))
                    })
                } else {
                    None
                }
            };

            match thing_local_ref {
                Some(ThingLocalRef::Post(post_local_id)) => {
                    let row_count = db.execute(
                        "INSERT INTO post_like (post, person, local, ap_id) VALUES ($1, $2, FALSE, $3) ON CONFLICT (post, person) DO NOTHING",
                        &[&post_local_id, &actor_local_id, &activity_id.as_str()],
                    ).await?;

                    if row_count > 0 {
                        let row = db.query_opt("SELECT post.community, community.local FROM post, community WHERE post.community = community.id AND post.id=$1", &[&post_local_id]).await?;
                        if let Some(row) = row {
                            let community_local = row.get(1);
                            if community_local {
                                let community_id = CommunityLocalID(row.get(0));
                                let body = serde_json::to_string(&activity)?;
                                super::enqueue_forward_to_community_followers(
                                    community_id,
                                    body,
                                    ctx,
                                )
                                .await?;
                            }
                        }
                    }
                }
                Some(ThingLocalRef::Comment(comment_local_id)) => {
                    let row_count = db.execute(
                        "INSERT INTO reply_like (reply, person, local, ap_id) VALUES ($1, $2, FALSE, $3) ON CONFLICT (reply, person) DO NOTHING",
                        &[&comment_local_id, &actor_local_id, &activity_id.as_str()],
                    ).await?;

                    if row_count > 0 {
                        let row = db.query_opt("SELECT post.community, community.local FROM reply, post, community WHERE reply.post = post.id AND post.community = community.id AND post.id=$1", &[&comment_local_id]).await?;
                        if let Some(row) = row {
                            let community_local = row.get(1);
                            if community_local {
                                let community_id = CommunityLocalID(row.get(0));
                                let body = serde_json::to_string(&activity)?;
                                super::enqueue_forward_to_community_followers(
                                    community_id,
                                    body,
                                    ctx,
                                )
                                .await?;
                            }
                        }
                    }
                }
                _ => {}
            }
        }
    }

    Ok(())
}

pub async fn ingest_delete(
    activity: Verified<activitystreams::activity::Delete>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let db = ctx.db_pool.get().await?;

    let activity_id = activity
        .id_unchecked()
        .ok_or(crate::Error::InternalStrStatic("Missing ID for activity"))?;
    let actor_id = activity
        .actor_unchecked()
        .as_single_id()
        .ok_or(crate::Error::InternalStrStatic("Missing ID for actor"))?;

    if let Some(object_id) = activity.object().as_single_id() {
        super::require_containment(activity_id, actor_id)?;
        super::require_containment(object_id, actor_id)?;

        let row = db.query_opt(
            "WITH deleted_post AS (UPDATE post SET href=NULL, title='[deleted]', content_text='[deleted]', content_markdown=NULL, content_html=NULL, deleted=TRUE WHERE ap_id=$1 AND deleted=FALSE RETURNING (SELECT id FROM community WHERE community.id = post.community AND community.local)), deleted_reply AS (UPDATE reply SET content_text='[deleted]', content_markdown=NULL, content_html=NULL, deleted=TRUE WHERE ap_id=$1 AND deleted=FALSE RETURNING (SELECT id FROM community WHERE community.id=(SELECT community FROM post WHERE id=reply.post) AND community.local)) (SELECT * FROM deleted_post) UNION ALL (SELECT * FROM deleted_reply) LIMIT 1",
            &[&object_id.as_str()],
            ).await?;

        if let Some(row) = row {
            // Something was deleted
            let local_community = row.get::<_, Option<_>>(0).map(CommunityLocalID);
            if let Some(community_id) = local_community {
                // Community is local, need to forward delete to followers

                let body = serde_json::to_string(&activity)?;

                crate::spawn_task(crate::apub_util::enqueue_forward_to_community_followers(
                    community_id,
                    body,
                    ctx,
                ));
            }
        }
    }

    Ok(())
}

pub async fn ingest_undo(
    activity: Verified<activitystreams::activity::Undo>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let activity_id = activity
        .id_unchecked()
        .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

    let actor_id =
        activity
            .actor_unchecked()
            .as_single_id()
            .ok_or(crate::Error::InternalStrStatic(
                "Missing actor for activity",
            ))?;

    let object_id = activity
        .object()
        .as_single_id()
        .ok_or(crate::Error::InternalStrStatic("Missing object for Undo"))?;

    super::require_containment(activity_id, actor_id)?;
    super::require_containment(object_id, actor_id)?;

    let object_id = object_id.as_str();

    let db = ctx.db_pool.get().await?;

    db.execute("DELETE FROM post_like WHERE ap_id=$1", &[&object_id])
        .await?;
    db.execute("DELETE FROM reply_like WHERE ap_id=$1", &[&object_id])
        .await?;
    db.execute("DELETE FROM community_follow WHERE ap_id=$1", &[&object_id])
        .await?;
    db.execute(
        "UPDATE post SET approved=FALSE, approved_ap_id=NULL WHERE approved_ap_id=$1",
        &[&object_id],
    )
    .await?;

    Ok(())
}

pub async fn ingest_create(
    activity: Verified<activitystreams::activity::Create>,
    ctx: Arc<crate::BaseContext>,
) -> Result<(), crate::Error> {
    for req_obj in activity.object().iter() {
        let object_id = req_obj.id();

        if let Some(object_id) = object_id {
            let obj = crate::apub_util::fetch_ap_object(object_id, &ctx.http_client).await?;

            ingest_object_boxed(obj, FoundFrom::Other, ctx.clone()).await?;
        }
    }

    Ok(())
}

/// Ingestion flow for Page, Image, Article, and Note. Should not be called with any other objects.
async fn ingest_postlike(
    obj: Verified<KnownObject>,
    found_from: FoundFrom,
    ctx: Arc<crate::RouteContext>,
) -> Result<Option<IngestResult>, crate::Error> {
    let (to, in_reply_to, obj_id) = match obj.deref() {
        KnownObject::Page(obj) => (obj.to(), None, obj.id_unchecked()),
        KnownObject::Image(obj) => (obj.to(), None, obj.id_unchecked()),
        KnownObject::Article(obj) => (obj.to(), None, obj.id_unchecked()),
        KnownObject::Note(obj) => (obj.to(), obj.in_reply_to(), obj.id_unchecked()),
        _ => (None, None, None),
    };

    let community_found = match found_from {
        FoundFrom::Announce {
            community_local_id,
            community_is_local,
            ..
        } => Some((community_local_id, community_is_local)),
        _ => match to {
            None => None,
            Some(maybe) => maybe
                .iter()
                .filter_map(|any| {
                    any.as_xsd_any_uri()
                        .and_then(|uri| {
                            super::maybe_get_local_community_id_from_uri(uri, &ctx.host_url_apub)
                        })
                        .map(|id| (id, true))
                })
                .next(),
        },
    };

    if let Some((community_local_id, community_is_local)) = community_found {
        match obj.into_inner() {
            KnownObject::Page(obj) => Ok(handle_received_page_for_community(
                community_local_id,
                community_is_local,
                found_from.as_announce(),
                Verified(obj),
                ctx,
            )
            .await?
            .map(|id| IngestResult::Other(crate::ThingLocalRef::Post(id)))),
            KnownObject::Image(obj) => Ok(handle_received_page_for_community(
                community_local_id,
                community_is_local,
                found_from.as_announce(),
                Verified(obj),
                ctx,
            )
            .await?
            .map(|id| IngestResult::Other(crate::ThingLocalRef::Post(id)))),
            KnownObject::Article(obj) => Ok(handle_received_page_for_community(
                community_local_id,
                community_is_local,
                found_from.as_announce(),
                Verified(obj),
                ctx,
            )
            .await?
            .map(|id| IngestResult::Other(crate::ThingLocalRef::Post(id)))),
            KnownObject::Note(obj) => {
                let content = obj.content().and_then(|x| x.as_single_xsd_string());
                let media_type = obj.media_type();
                let created = obj.published();
                let author = obj.attributed_to().and_then(|x| x.as_single_id());

                // fetch first attachment
                let attachment_href = obj
                    .attachment()
                    .and_then(|x| x.iter().next())
                    .and_then(
                        |base: &activitystreams::base::AnyBase| match base.kind_str() {
                            Some("Document") => Some(
                                activitystreams::object::Document::from_any_base(base.clone())
                                    .map(|obj| obj.unwrap().take_url()),
                            ),
                            Some("Image") => Some(
                                activitystreams::object::Image::from_any_base(base.clone())
                                    .map(|obj| obj.unwrap().take_url()),
                            ),
                            _ => None,
                        },
                    )
                    .transpose()?
                    .flatten();
                let attachment_href = attachment_href
                    .as_ref()
                    .and_then(|href| href.iter().filter_map(|x| x.as_xsd_any_uri()).next())
                    .map(|href| href.as_str());

                if let Some(object_id) = obj.id_unchecked() {
                    if let Some(in_reply_to) = obj.in_reply_to() {
                        // it's a reply

                        Ok(handle_recieved_reply(
                            object_id,
                            content.unwrap_or(""),
                            media_type,
                            created.as_ref(),
                            author,
                            in_reply_to,
                            attachment_href,
                            ctx,
                        )
                        .await?
                        .map(|id| IngestResult::Other(crate::ThingLocalRef::Comment(id))))
                    } else {
                        // not a reply, must be a top-level post
                        let title = obj
                            .summary()
                            .and_then(|x| x.as_single_xsd_string())
                            .unwrap_or("");

                        // Interpret attachments (usually images) as links
                        let href = obj
                            .attachment()
                            .and_then(|x| x.iter().next())
                            .and_then(|base: &activitystreams::base::AnyBase| {
                                match base.kind_str() {
                                    Some("Document") => Some(
                                        activitystreams::object::Document::from_any_base(
                                            base.clone(),
                                        )
                                        .map(|obj| obj.unwrap().take_url()),
                                    ),
                                    Some("Image") => Some(
                                        activitystreams::object::Image::from_any_base(base.clone())
                                            .map(|obj| obj.unwrap().take_url()),
                                    ),
                                    _ => None,
                                }
                            })
                            .transpose()?
                            .flatten();
                        let href = href
                            .as_ref()
                            .and_then(|href| href.iter().filter_map(|x| x.as_xsd_any_uri()).next())
                            .map(|href| href.as_str());

                        Ok(Some(IngestResult::Other(crate::ThingLocalRef::Post(
                            handle_recieved_post(
                                object_id.clone(),
                                title,
                                href,
                                content,
                                media_type,
                                created.as_ref(),
                                author,
                                community_local_id,
                                community_is_local,
                                found_from.as_announce(),
                                ctx,
                            )
                            .await?,
                        ))))
                    }
                } else {
                    Ok(None)
                }
            }
            _ => Err(crate::Error::InternalStrStatic(
                "ingest_postlike called with an unknown object",
            )),
        }
    } else {
        // not to a community, but might still match as a reply
        if let Some(in_reply_to) = in_reply_to {
            if let Some(obj_id) = obj_id {
                if let KnownObject::Note(obj) = obj.deref() {
                    // TODO deduplicate this?

                    let content = obj.content().and_then(|x| x.as_single_xsd_string());
                    let media_type = obj.media_type();
                    let created = obj.published();
                    let author = obj.attributed_to().and_then(|x| x.as_single_id());

                    if let Some(author) = author {
                        super::require_containment(obj_id, author)?;
                    }

                    // fetch first attachment
                    let attachment_href = obj
                        .attachment()
                        .and_then(|x| x.iter().next())
                        .and_then(
                            |base: &activitystreams::base::AnyBase| match base.kind_str() {
                                Some("Document") => Some(
                                    activitystreams::object::Document::from_any_base(base.clone())
                                        .map(|obj| obj.unwrap().take_url()),
                                ),
                                Some("Image") => Some(
                                    activitystreams::object::Image::from_any_base(base.clone())
                                        .map(|obj| obj.unwrap().take_url()),
                                ),
                                _ => None,
                            },
                        )
                        .transpose()?
                        .flatten();
                    let attachment_href = attachment_href
                        .as_ref()
                        .and_then(|href| href.iter().filter_map(|x| x.as_xsd_any_uri()).next())
                        .map(|href| href.as_str());

                    let id = handle_recieved_reply(
                        obj_id,
                        content.unwrap_or(""),
                        media_type,
                        created.as_ref(),
                        author,
                        &in_reply_to,
                        attachment_href,
                        ctx,
                    )
                    .await?;

                    Ok(id.map(|id| IngestResult::Other(crate::ThingLocalRef::Comment(id))))
                } else {
                    Ok(None)
                }
            } else {
                Ok(None)
            }
        } else {
            Ok(None)
        }
    }
}

async fn handle_recieved_reply(
    object_id: &url::Url,
    content: &str,
    media_type: Option<&mime::Mime>,
    created: Option<&chrono::DateTime<chrono::FixedOffset>>,
    author: Option<&url::Url>,
    in_reply_to: &activitystreams::primitives::OneOrMany<activitystreams::base::AnyBase>,
    attachment_href: Option<&str>,
    ctx: Arc<crate::RouteContext>,
) -> Result<Option<CommentLocalID>, crate::Error> {
    let db = ctx.db_pool.get().await?;

    let author = match author {
        Some(author) => Some(super::get_or_fetch_user_local_id(&author, &db, &ctx).await?),
        None => None,
    };

    let last_reply_to = in_reply_to.iter().last(); // TODO maybe not this? Not sure how to interpret inReplyTo

    if let Some(last_reply_to) = last_reply_to {
        if let Some(term_ap_id) = last_reply_to.as_xsd_any_uri() {
            #[derive(Debug)]
            enum ReplyTarget {
                Post {
                    id: PostLocalID,
                },
                Comment {
                    id: CommentLocalID,
                    post: PostLocalID,
                },
            }

            let target = if let Some(remaining) =
                super::try_strip_host(&term_ap_id, &ctx.host_url_apub)
            {
                if remaining.starts_with("/posts/") {
                    if let Ok(local_post_id) = remaining[7..].parse() {
                        Some(ReplyTarget::Post { id: local_post_id })
                    } else {
                        None
                    }
                } else if remaining.starts_with("/comments/") {
                    if let Ok(local_comment_id) = remaining[10..].parse() {
                        let row = db
                            .query_opt("SELECT post FROM reply WHERE id=$1", &[&local_comment_id])
                            .await?;
                        if let Some(row) = row {
                            Some(ReplyTarget::Comment {
                                id: local_comment_id,
                                post: PostLocalID(row.get(0)),
                            })
                        } else {
                            None
                        }
                    } else {
                        None
                    }
                } else {
                    None
                }
            } else {
                let row = db
                    .query_opt("(SELECT id, post FROM reply WHERE ap_id=$1) UNION (SELECT NULL, id FROM post WHERE ap_id=$1) LIMIT 1", &[&term_ap_id.as_str()])
                    .await?;
                row.map(|row| match row.get::<_, Option<_>>(0).map(CommentLocalID) {
                    Some(reply_id) => ReplyTarget::Comment {
                        id: reply_id,
                        post: PostLocalID(row.get(1)),
                    },
                    None => ReplyTarget::Post {
                        id: PostLocalID(row.get(1)),
                    },
                })
            };

            if let Some(target) = target {
                let (post, parent) = match target {
                    ReplyTarget::Post { id } => (id, None),
                    ReplyTarget::Comment { id, post } => (post, Some(id)),
                };

                let content_is_html = media_type.is_none() || media_type == Some(&mime::TEXT_HTML);
                let (content_text, content_html) = if content_is_html {
                    (None, Some(content))
                } else {
                    (Some(content), None)
                };

                let row = db.query_opt(
                    "INSERT INTO reply (post, parent, author, content_text, content_html, created, local, ap_id, attachment_href) VALUES ($1, $2, $3, $4, $5, COALESCE($6, current_timestamp), FALSE, $7, $8) ON CONFLICT (ap_id) DO NOTHING RETURNING id",
                    &[&post, &parent, &author, &content_text, &content_html, &created, &object_id.as_str(), &attachment_href],
                    ).await?;

                if let Some(row) = row {
                    let id = CommentLocalID(row.get(0));
                    let info = crate::CommentInfo {
                        id,
                        author,
                        post,
                        parent,
                        content_text: content_text.map(|x| Cow::Owned(x.to_owned())),
                        content_markdown: None,
                        content_html: content_html.map(|x| Cow::Owned(x.to_owned())),
                        created: created.copied().unwrap_or_else(|| {
                            chrono::offset::Utc::now()
                                .with_timezone(&chrono::offset::FixedOffset::west(0))
                        }),
                        ap_id: crate::APIDOrLocal::APID(object_id.to_owned()),
                        attachment_href: attachment_href.map(|x| Cow::Owned(x.to_owned())),
                    };

                    crate::on_post_add_comment(info, ctx);

                    Ok(Some(id))
                } else {
                    // not new, try to fetch id
                    // will probably be unnecessary when we implement comment editing

                    let row = db
                        .query_opt(
                            "SELECT id FROM reply WHERE ap_id=$1",
                            &[&object_id.as_str()],
                        )
                        .await?;
                    Ok(match row {
                        None => None,
                        Some(row) => Some(CommentLocalID(row.get(0))),
                    })
                }
            } else {
                Ok(None)
            }
        } else {
            Ok(None)
        }
    } else {
        Ok(None)
    }
}

async fn handle_received_page_for_community<Kind: Clone + std::fmt::Debug>(
    community_local_id: CommunityLocalID,
    community_is_local: bool,
    is_announce: Option<&url::Url>,
    obj: Verified<activitystreams::object::Object<Kind>>,
    ctx: Arc<crate::RouteContext>,
) -> Result<Option<PostLocalID>, crate::Error> {
    let title = obj
        .summary()
        .iter()
        .map(|x| x.iter())
        .flatten()
        .filter_map(|maybe| maybe.as_xsd_string())
        .next()
        .unwrap_or("");
    let href = obj
        .url()
        .iter()
        .map(|x| x.iter())
        .flatten()
        .filter_map(|maybe| {
            maybe
                .as_xsd_any_uri()
                .map(|x| x.as_str())
                .or_else(|| maybe.as_xsd_string())
        })
        .next();
    let content = obj.content().and_then(|x| x.as_single_xsd_string());
    let media_type = obj.media_type();
    let created = obj.published();
    let author = obj.attributed_to().and_then(|x| x.as_single_id());

    if let Some(object_id) = obj.id_unchecked() {
        if let Some(author) = author {
            super::require_containment(&object_id, author)?;
        }

        Ok(Some(
            handle_recieved_post(
                object_id.clone(),
                title,
                href,
                content,
                media_type,
                created.as_ref(),
                author,
                community_local_id,
                community_is_local,
                is_announce,
                ctx,
            )
            .await?,
        ))
    } else {
        Ok(None)
    }
}

async fn handle_recieved_post(
    object_id: url::Url,
    title: &str,
    href: Option<&str>,
    content: Option<&str>,
    media_type: Option<&mime::Mime>,
    created: Option<&chrono::DateTime<chrono::FixedOffset>>,
    author: Option<&url::Url>,
    community_local_id: CommunityLocalID,
    community_is_local: bool,
    is_announce: Option<&url::Url>,
    ctx: Arc<crate::RouteContext>,
) -> Result<PostLocalID, crate::Error> {
    let db = ctx.db_pool.get().await?;
    let author = match author {
        Some(author) => Some(super::get_or_fetch_user_local_id(&author, &db, &ctx).await?),
        None => None,
    };

    let content_is_html = media_type.is_none() || media_type == Some(&mime::TEXT_HTML);
    let (content_text, content_html) = if content_is_html {
        (None, Some(content))
    } else {
        (Some(content), None)
    };

    let approved = is_announce.is_some() || community_is_local;

    let row = db.query_one(
        "INSERT INTO post (author, href, content_text, content_html, title, created, community, local, ap_id, approved, approved_ap_id) VALUES ($1, $2, $3, $4, $5, COALESCE($6, current_timestamp), $7, FALSE, $8, $9, $10) ON CONFLICT (ap_id) DO UPDATE SET approved=$9, approved_ap_id=$10 RETURNING id",
        &[&author, &href, &content_text, &content_html, &title, &created, &community_local_id, &object_id.as_str(), &approved, &is_announce.map(|x| x.as_str())],
    ).await?;

    let post_local_id = PostLocalID(row.get(0));

    if community_is_local {
        crate::on_local_community_add_post(community_local_id, post_local_id, object_id, ctx);
    }

    Ok(post_local_id)
}

R src/apub_util.rs => src/apub_util/mod.rs +23 -751
@@ 1,4 1,4 @@
use crate::{BaseURL, CommentLocalID, CommunityLocalID, PostLocalID, ThingLocalRef, UserLocalID};
use crate::{BaseURL, CommentLocalID, CommunityLocalID, PostLocalID, UserLocalID};
use activitystreams::prelude::*;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;


@@ 7,6 7,8 @@ use std::convert::TryFrom;
use std::ops::Deref;
use std::sync::Arc;

pub mod ingest;

pub const ACTIVITY_TYPE: &str = "application/activity+json";

pub const SIGALG_RSA_SHA256: &str = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256";


@@ 295,6 297,13 @@ impl ActorLocalInfo {
            ActorLocalInfo::Community { public_key, .. } => public_key.as_ref(),
        }
    }

    pub fn as_ref(&self) -> crate::ThingLocalRef {
        match self {
            ActorLocalInfo::User { id, .. } => crate::ThingLocalRef::User(*id),
            ActorLocalInfo::Community { id, .. } => crate::ThingLocalRef::Community(*id),
        }
    }
}

#[derive(Clone, Debug, thiserror::Error)]


@@ 359,116 368,11 @@ pub async fn fetch_ap_object(

pub async fn fetch_actor(
    req_ap_id: &url::Url,
    db: &tokio_postgres::Client,
    http_client: &crate::HttpClient,
    ctx: Arc<crate::BaseContext>,
) -> Result<ActorLocalInfo, crate::Error> {
    let obj = fetch_ap_object(req_ap_id, http_client).await?;
    let ap_id = req_ap_id;

    match obj.deref() {
        KnownObject::Person(person) => {
            let username = person
                .preferred_username()
                .or_else(|| {
                    person
                        .name()
                        .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
                })
                .unwrap_or("");
            let inbox = person.inbox_unchecked().as_str();
            let shared_inbox = person
                .endpoints_unchecked()
                .and_then(|endpoints| endpoints.shared_inbox)
                .map(|url| url.as_str());
            let public_key = person
                .ext_one
                .public_key
                .as_ref()
                .map(|key| key.public_key_pem.as_bytes());
            let public_key_sigalg = person
                .ext_one
                .public_key
                .as_ref()
                .and_then(|key| key.signature_algorithm.as_deref());
            let description = person
                .summary()
                .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
                .unwrap_or("");

            let avatar = person.icon().and_then(|icon| {
                icon.iter()
                    .filter_map(|icon| {
                        if icon.kind_str() == Some("Image") {
                            match activitystreams::object::Image::from_any_base(icon.clone()) {
                                Err(_) | Ok(None) => None,
                                Ok(Some(icon)) => Some(icon),
                            }
                        } else {
                            None
                        }
                    })
                    .next()
            });
            let avatar = avatar
                .as_ref()
                .and_then(|icon| icon.url().and_then(|url| url.as_single_id()))
                .map(|x| x.as_str());

            let id = UserLocalID(db.query_one(
                "INSERT INTO person (username, local, created_local, ap_id, ap_inbox, ap_shared_inbox, public_key, public_key_sigalg, description, avatar) VALUES ($1, FALSE, localtimestamp, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (ap_id) DO UPDATE SET ap_inbox=$3, ap_shared_inbox=$4, public_key=$5, public_key_sigalg=$6, description=$7, avatar=$8 RETURNING id",
                &[&username, &ap_id.as_str(), &inbox, &shared_inbox, &public_key, &public_key_sigalg, &description, &avatar],
            ).await?.get(0));

            Ok(ActorLocalInfo::User {
                id,
                public_key: public_key.map(|key| PubKeyInfo {
                    algorithm: get_message_digest(public_key_sigalg),
                    key: key.to_owned(),
                }),
            })
        }
        KnownObject::Group(group) => {
            let name = group
                .preferred_username()
                .or_else(|| {
                    group
                        .name()
                        .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
                })
                .unwrap_or("");
            let description = group
                .summary()
                .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
                .unwrap_or("");
            let inbox = group.inbox_unchecked().as_str();
            let shared_inbox = group
                .endpoints_unchecked()
                .and_then(|endpoints| endpoints.shared_inbox)
                .map(|url| url.as_str());
            let public_key = group
                .ext_one
                .public_key
                .as_ref()
                .map(|key| key.public_key_pem.as_bytes());
            let public_key_sigalg = group
                .ext_one
                .public_key
                .as_ref()
                .and_then(|key| key.signature_algorithm.as_deref());

            let id = CommunityLocalID(db.query_one(
                "INSERT INTO community (name, local, ap_id, ap_inbox, ap_shared_inbox, public_key, public_key_sigalg, description) VALUES ($1, FALSE, $2, $3, $4, $5, $6, $7) ON CONFLICT (ap_id) DO UPDATE SET ap_inbox=$3, ap_shared_inbox=$4, public_key=$5, public_key_sigalg=$6, description=$7 RETURNING id",
                &[&name, &ap_id.as_str(), &inbox, &shared_inbox, &public_key, &public_key_sigalg, &description],
            ).await?.get(0));

            Ok(ActorLocalInfo::Community {
                id,
                public_key: public_key.map(|key| PubKeyInfo {
                    algorithm: get_message_digest(public_key_sigalg),
                    key: key.to_owned(),
                }),
            })
        }
    let obj = fetch_ap_object(req_ap_id, &ctx.http_client).await?;
    match ingest::ingest_object_boxed(obj, ingest::FoundFrom::Other, ctx).await? {
        Some(ingest::IngestResult::Actor(info)) => Ok(info),
        _ => Err(crate::Error::InternalStrStatic("Unrecognized actor type")),
    }
}


@@ 476,10 380,9 @@ pub async fn fetch_actor(
pub async fn get_or_fetch_user_local_id(
    ap_id: &url::Url,
    db: &tokio_postgres::Client,
    host_url_apub: &BaseURL,
    http_client: &crate::HttpClient,
    ctx: &Arc<crate::BaseContext>,
) -> Result<UserLocalID, crate::Error> {
    if let Some(remaining) = try_strip_host(ap_id, host_url_apub) {
    if let Some(remaining) = try_strip_host(ap_id, &ctx.host_url_apub) {
        if remaining.starts_with("/users/") {
            Ok(remaining[7..].parse()?)
        } else {


@@ 497,7 400,7 @@ pub async fn get_or_fetch_user_local_id(
            None => {
                // Not known yet, time to fetch

                let actor = fetch_actor(ap_id, db, http_client).await?;
                let actor = fetch_actor(ap_id, ctx.clone()).await?;

                if let ActorLocalInfo::User { id, .. } = actor {
                    Ok(id)


@@ 1472,636 1375,6 @@ pub fn maybe_get_local_community_id_from_uri(
    }
}

pub async fn handle_recieved_object_for_local_community<'a>(
    obj: Verified<KnownObject>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let (to, in_reply_to, obj_id) = match obj.deref() {
        KnownObject::Page(obj) => (obj.to(), None, obj.id_unchecked()),
        KnownObject::Image(obj) => (obj.to(), None, obj.id_unchecked()),
        KnownObject::Article(obj) => (obj.to(), None, obj.id_unchecked()),
        KnownObject::Note(obj) => (obj.to(), obj.in_reply_to(), obj.id_unchecked()),
        _ => (None, None, None),
    };

    let local_community_id = match to {
        None => None,
        Some(maybe) => maybe
            .iter()
            .filter_map(|any| {
                any.as_xsd_any_uri()
                    .and_then(|uri| maybe_get_local_community_id_from_uri(uri, &ctx.host_url_apub))
            })
            .next(),
    };

    if let Some(local_community_id) = local_community_id {
        handle_recieved_object_for_community(local_community_id, true, None, obj, ctx).await?;
    } else {
        // not to a community, but might still match as a reply
        if let Some(in_reply_to) = in_reply_to {
            if let Some(obj_id) = obj_id {
                if let KnownObject::Note(obj) = obj.deref() {
                    // TODO deduplicate this?

                    let content = obj.content().and_then(|x| x.as_single_xsd_string());
                    let media_type = obj.media_type();
                    let created = obj.published();
                    let author = obj.attributed_to().and_then(|x| x.as_single_id());

                    if let Some(author) = author {
                        require_containment(obj_id, author)?;
                    }

                    // fetch first attachment
                    let attachment_href = obj
                        .attachment()
                        .and_then(|x| x.iter().next())
                        .and_then(
                            |base: &activitystreams::base::AnyBase| match base.kind_str() {
                                Some("Document") => Some(
                                    activitystreams::object::Document::from_any_base(base.clone())
                                        .map(|obj| obj.unwrap().take_url()),
                                ),
                                Some("Image") => Some(
                                    activitystreams::object::Image::from_any_base(base.clone())
                                        .map(|obj| obj.unwrap().take_url()),
                                ),
                                _ => None,
                            },
                        )
                        .transpose()?
                        .flatten();
                    let attachment_href = attachment_href
                        .as_ref()
                        .and_then(|href| href.iter().filter_map(|x| x.as_xsd_any_uri()).next())
                        .map(|href| href.as_str());

                    handle_recieved_reply(
                        obj_id,
                        content.unwrap_or(""),
                        media_type,
                        created.as_ref(),
                        author,
                        &in_reply_to,
                        attachment_href,
                        ctx,
                    )
                    .await?;
                }
            }
        }
    }

    Ok(())
}

pub async fn handle_received_page_for_community<Kind: Clone + std::fmt::Debug>(
    community_local_id: CommunityLocalID,
    community_is_local: bool,
    is_announce: Option<&url::Url>,
    obj: Verified<activitystreams::object::Object<Kind>>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let title = obj
        .summary()
        .iter()
        .map(|x| x.iter())
        .flatten()
        .filter_map(|maybe| maybe.as_xsd_string())
        .next()
        .unwrap_or("");
    let href = obj
        .url()
        .iter()
        .map(|x| x.iter())
        .flatten()
        .filter_map(|maybe| {
            maybe
                .as_xsd_any_uri()
                .map(|x| x.as_str())
                .or_else(|| maybe.as_xsd_string())
        })
        .next();
    let content = obj.content().and_then(|x| x.as_single_xsd_string());
    let media_type = obj.media_type();
    let created = obj.published();
    let author = obj.attributed_to().and_then(|x| x.as_single_id());

    if let Some(object_id) = obj.id_unchecked() {
        if let Some(author) = author {
            require_containment(&object_id, author)?;
        }

        handle_recieved_post(
            object_id.clone(),
            title,
            href,
            content,
            media_type,
            created.as_ref(),
            author,
            community_local_id,
            community_is_local,
            is_announce,
            ctx,
        )
        .await?;
    }

    Ok(())
}

pub async fn handle_recieved_object_for_community<'a>(
    community_local_id: CommunityLocalID,
    community_is_local: bool,
    is_announce: Option<&url::Url>,
    obj: Verified<KnownObject>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    println!("recieved object: {:?}", obj);

    match obj.into_inner() {
        KnownObject::Page(obj) => {
            handle_received_page_for_community(
                community_local_id,
                community_is_local,
                is_announce,
                Verified(obj),
                ctx,
            )
            .await?
        }
        KnownObject::Image(obj) => {
            handle_received_page_for_community(
                community_local_id,
                community_is_local,
                is_announce,
                Verified(obj),
                ctx,
            )
            .await?
        }
        KnownObject::Article(obj) => {
            handle_received_page_for_community(
                community_local_id,
                community_is_local,
                is_announce,
                Verified(obj),
                ctx,
            )
            .await?
        }
        KnownObject::Note(obj) => {
            let content = obj.content().and_then(|x| x.as_single_xsd_string());
            let media_type = obj.media_type();
            let created = obj.published();
            let author = obj.attributed_to().and_then(|x| x.as_single_id());

            // fetch first attachment
            let attachment_href = obj
                .attachment()
                .and_then(|x| x.iter().next())
                .and_then(
                    |base: &activitystreams::base::AnyBase| match base.kind_str() {
                        Some("Document") => Some(
                            activitystreams::object::Document::from_any_base(base.clone())
                                .map(|obj| obj.unwrap().take_url()),
                        ),
                        Some("Image") => Some(
                            activitystreams::object::Image::from_any_base(base.clone())
                                .map(|obj| obj.unwrap().take_url()),
                        ),
                        _ => None,
                    },
                )
                .transpose()?
                .flatten();
            let attachment_href = attachment_href
                .as_ref()
                .and_then(|href| href.iter().filter_map(|x| x.as_xsd_any_uri()).next())
                .map(|href| href.as_str());

            if let Some(object_id) = obj.id_unchecked() {
                if let Some(in_reply_to) = obj.in_reply_to() {
                    // it's a reply

                    handle_recieved_reply(
                        object_id,
                        content.unwrap_or(""),
                        media_type,
                        created.as_ref(),
                        author,
                        in_reply_to,
                        attachment_href,
                        ctx,
                    )
                    .await?;
                } else {
                    // not a reply, must be a top-level post
                    let title = obj
                        .summary()
                        .and_then(|x| x.as_single_xsd_string())
                        .unwrap_or("");

                    // Interpret attachments (usually images) as links
                    let href = obj
                        .attachment()
                        .and_then(|x| x.iter().next())
                        .and_then(
                            |base: &activitystreams::base::AnyBase| match base.kind_str() {
                                Some("Document") => Some(
                                    activitystreams::object::Document::from_any_base(base.clone())
                                        .map(|obj| obj.unwrap().take_url()),
                                ),
                                Some("Image") => Some(
                                    activitystreams::object::Image::from_any_base(base.clone())
                                        .map(|obj| obj.unwrap().take_url()),
                                ),
                                _ => None,
                            },
                        )
                        .transpose()?
                        .flatten();
                    let href = href
                        .as_ref()
                        .and_then(|href| href.iter().filter_map(|x| x.as_xsd_any_uri()).next())
                        .map(|href| href.as_str());

                    if let Some(object_id) = obj.id_unchecked() {
                        handle_recieved_post(
                            object_id.clone(),
                            title,
                            href,
                            content,
                            media_type,
                            created.as_ref(),
                            author,
                            community_local_id,
                            community_is_local,
                            is_announce,
                            ctx,
                        )
                        .await?;
                    }
                }
            }
        }
        _ => {}
    }

    Ok(())
}

async fn handle_recieved_post(
    object_id: url::Url,
    title: &str,
    href: Option<&str>,
    content: Option<&str>,
    media_type: Option<&mime::Mime>,
    created: Option<&chrono::DateTime<chrono::FixedOffset>>,
    author: Option<&url::Url>,
    community_local_id: CommunityLocalID,
    community_is_local: bool,
    is_announce: Option<&url::Url>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let db = ctx.db_pool.get().await?;
    let author = match author {
        Some(author) => Some(
            get_or_fetch_user_local_id(&author, &db, &ctx.host_url_apub, &ctx.http_client).await?,
        ),
        None => None,
    };

    let content_is_html = media_type.is_none() || media_type == Some(&mime::TEXT_HTML);
    let (content_text, content_html) = if content_is_html {
        (None, Some(content))
    } else {
        (Some(content), None)
    };

    let approved = is_announce.is_some() || community_is_local;

    let row = db.query_opt(
        "INSERT INTO post (author, href, content_text, content_html, title, created, community, local, ap_id, approved, approved_ap_id) VALUES ($1, $2, $3, $4, $5, COALESCE($6, current_timestamp), $7, FALSE, $8, $9, $10) ON CONFLICT (ap_id) DO UPDATE SET approved=$9, approved_ap_id=$10 RETURNING id",
        &[&author, &href, &content_text, &content_html, &title, &created, &community_local_id, &object_id.as_str(), &approved, &is_announce.map(|x| x.as_str())],
    ).await?;

    if community_is_local {
        if let Some(row) = row {
            let post_local_id = PostLocalID(row.get(0));
            crate::on_local_community_add_post(community_local_id, post_local_id, object_id, ctx);
        }
    }

    Ok(())
}

async fn handle_recieved_reply(
    object_id: &url::Url,
    content: &str,
    media_type: Option<&mime::Mime>,
    created: Option<&chrono::DateTime<chrono::FixedOffset>>,
    author: Option<&url::Url>,
    in_reply_to: &activitystreams::primitives::OneOrMany<activitystreams::base::AnyBase>,
    attachment_href: Option<&str>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let db = ctx.db_pool.get().await?;

    let author = match author {
        Some(author) => Some(
            get_or_fetch_user_local_id(&author, &db, &ctx.host_url_apub, &ctx.http_client).await?,
        ),
        None => None,
    };

    let last_reply_to = in_reply_to.iter().last(); // TODO maybe not this? Not sure how to interpret inReplyTo

    if let Some(last_reply_to) = last_reply_to {
        if let Some(term_ap_id) = last_reply_to.as_xsd_any_uri() {
            #[derive(Debug)]
            enum ReplyTarget {
                Post {
                    id: PostLocalID,
                },
                Comment {
                    id: CommentLocalID,
                    post: PostLocalID,
                },
            }

            let target = if let Some(remaining) = try_strip_host(&term_ap_id, &ctx.host_url_apub) {
                if remaining.starts_with("/posts/") {
                    if let Ok(local_post_id) = remaining[7..].parse() {
                        Some(ReplyTarget::Post { id: local_post_id })
                    } else {
                        None
                    }
                } else if remaining.starts_with("/comments/") {
                    if let Ok(local_comment_id) = remaining[10..].parse() {
                        let row = db
                            .query_opt("SELECT post FROM reply WHERE id=$1", &[&local_comment_id])
                            .await?;
                        if let Some(row) = row {
                            Some(ReplyTarget::Comment {
                                id: local_comment_id,
                                post: PostLocalID(row.get(0)),
                            })
                        } else {
                            None
                        }
                    } else {
                        None
                    }
                } else {
                    None
                }
            } else {
                let row = db
                    .query_opt("(SELECT id, post FROM reply WHERE ap_id=$1) UNION (SELECT NULL, id FROM post WHERE ap_id=$1) LIMIT 1", &[&term_ap_id.as_str()])
                    .await?;
                row.map(|row| match row.get::<_, Option<_>>(0).map(CommentLocalID) {
                    Some(reply_id) => ReplyTarget::Comment {
                        id: reply_id,
                        post: PostLocalID(row.get(1)),
                    },
                    None => ReplyTarget::Post {
                        id: PostLocalID(row.get(1)),
                    },
                })
            };

            if let Some(target) = target {
                let (post, parent) = match target {
                    ReplyTarget::Post { id } => (id, None),
                    ReplyTarget::Comment { id, post } => (post, Some(id)),
                };

                let content_is_html = media_type.is_none() || media_type == Some(&mime::TEXT_HTML);
                let (content_text, content_html) = if content_is_html {
                    (None, Some(content))
                } else {
                    (Some(content), None)
                };

                let row = db.query_opt(
                    "INSERT INTO reply (post, parent, author, content_text, content_html, created, local, ap_id, attachment_href) VALUES ($1, $2, $3, $4, $5, COALESCE($6, current_timestamp), FALSE, $7, $8) ON CONFLICT (ap_id) DO NOTHING RETURNING id",
                    &[&post, &parent, &author, &content_text, &content_html, &created, &object_id.as_str(), &attachment_href],
                    ).await?;

                if let Some(row) = row {
                    let info = crate::CommentInfo {
                        id: CommentLocalID(row.get(0)),
                        author,
                        post,
                        parent,
                        content_text: content_text.map(|x| Cow::Owned(x.to_owned())),
                        content_markdown: None,
                        content_html: content_html.map(|x| Cow::Owned(x.to_owned())),
                        created: created.copied().unwrap_or_else(|| {
                            chrono::offset::Utc::now()
                                .with_timezone(&chrono::offset::FixedOffset::west(0))
                        }),
                        ap_id: crate::APIDOrLocal::APID(object_id.to_owned()),
                        attachment_href: attachment_href.map(|x| Cow::Owned(x.to_owned())),
                    };

                    crate::on_post_add_comment(info, ctx);
                }
            }
        }
    }

    Ok(())
}

pub async fn handle_like(
    activity: Verified<activitystreams::activity::Like>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let db = ctx.db_pool.get().await?;

    let activity_id = activity
        .id_unchecked()
        .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

    if let Some(actor_id) = activity.actor_unchecked().as_single_id() {
        require_containment(activity_id, actor_id)?;

        let actor_local_id =
            get_or_fetch_user_local_id(actor_id, &db, &ctx.host_url_apub, &ctx.http_client).await?;

        if let Some(object_id) = activity.object().as_single_id() {
            let thing_local_ref = if let Some(remaining) =
                try_strip_host(&object_id, &ctx.host_url_apub)
            {
                if remaining.starts_with("/posts/") {
                    if let Ok(local_post_id) = remaining[7..].parse() {
                        Some(ThingLocalRef::Post(local_post_id))
                    } else {
                        None
                    }
                } else if remaining.starts_with("/comments/") {
                    if let Ok(local_comment_id) = remaining[10..].parse() {
                        Some(ThingLocalRef::Comment(local_comment_id))
                    } else {
                        None
                    }
                } else {
                    None
                }
            } else {
                let row = db.query_opt(
                    "(SELECT TRUE, id FROM post WHERE ap_id=$1) UNION ALL (SELECT FALSE, id FROM reply WHERE ap_id=$1) LIMIT 1",
                    &[&object_id.as_str()],
                ).await?;

                if let Some(row) = row {
                    Some(if row.get(0) {
                        ThingLocalRef::Post(PostLocalID(row.get(1)))
                    } else {
                        ThingLocalRef::Comment(CommentLocalID(row.get(1)))
                    })
                } else {
                    None
                }
            };

            match thing_local_ref {
                Some(ThingLocalRef::Post(post_local_id)) => {
                    let row_count = db.execute(
                        "INSERT INTO post_like (post, person, local, ap_id) VALUES ($1, $2, FALSE, $3) ON CONFLICT (post, person) DO NOTHING",
                        &[&post_local_id, &actor_local_id, &activity_id.as_str()],
                    ).await?;

                    if row_count > 0 {
                        let row = db.query_opt("SELECT post.community, community.local FROM post, community WHERE post.community = community.id AND post.id=$1", &[&post_local_id]).await?;
                        if let Some(row) = row {
                            let community_local = row.get(1);
                            if community_local {
                                let community_id = CommunityLocalID(row.get(0));
                                let body = serde_json::to_string(&activity)?;
                                enqueue_forward_to_community_followers(community_id, body, ctx)
                                    .await?;
                            }
                        }
                    }
                }
                Some(ThingLocalRef::Comment(comment_local_id)) => {
                    let row_count = db.execute(
                        "INSERT INTO reply_like (reply, person, local, ap_id) VALUES ($1, $2, FALSE, $3) ON CONFLICT (reply, person) DO NOTHING",
                        &[&comment_local_id, &actor_local_id, &activity_id.as_str()],
                    ).await?;

                    if row_count > 0 {
                        let row = db.query_opt("SELECT post.community, community.local FROM reply, post, community WHERE reply.post = post.id AND post.community = community.id AND post.id=$1", &[&comment_local_id]).await?;
                        if let Some(row) = row {
                            let community_local = row.get(1);
                            if community_local {
                                let community_id = CommunityLocalID(row.get(0));
                                let body = serde_json::to_string(&activity)?;
                                enqueue_forward_to_community_followers(community_id, body, ctx)
                                    .await?;
                            }
                        }
                    }
                }
                _ => {}
            }
        }
    }

    Ok(())
}

pub async fn handle_delete(
    activity: Verified<activitystreams::activity::Delete>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let db = ctx.db_pool.get().await?;

    let activity_id = activity
        .id_unchecked()
        .ok_or(crate::Error::InternalStrStatic("Missing ID for activity"))?;
    let actor_id = activity
        .actor_unchecked()
        .as_single_id()
        .ok_or(crate::Error::InternalStrStatic("Missing ID for actor"))?;

    if let Some(object_id) = activity.object().as_single_id() {
        require_containment(activity_id, actor_id)?;
        require_containment(object_id, actor_id)?;

        let row = db.query_opt(
            "WITH deleted_post AS (UPDATE post SET href=NULL, title='[deleted]', content_text='[deleted]', content_markdown=NULL, content_html=NULL, deleted=TRUE WHERE ap_id=$1 AND deleted=FALSE RETURNING (SELECT id FROM community WHERE community.id = post.community AND community.local)), deleted_reply AS (UPDATE reply SET content_text='[deleted]', content_markdown=NULL, content_html=NULL, deleted=TRUE WHERE ap_id=$1 AND deleted=FALSE RETURNING (SELECT id FROM community WHERE community.id=(SELECT community FROM post WHERE id=reply.post) AND community.local)) (SELECT * FROM deleted_post) UNION ALL (SELECT * FROM deleted_reply) LIMIT 1",
            &[&object_id.as_str()],
            ).await?;

        if let Some(row) = row {
            // Something was deleted
            let local_community = row.get::<_, Option<_>>(0).map(CommunityLocalID);
            if let Some(community_id) = local_community {
                // Community is local, need to forward delete to followers

                let body = serde_json::to_string(&activity)?;

                crate::spawn_task(crate::apub_util::enqueue_forward_to_community_followers(
                    community_id,
                    body,
                    ctx,
                ));
            }
        }
    }

    Ok(())
}

pub async fn handle_undo(
    activity: Verified<activitystreams::activity::Undo>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let activity_id = activity
        .id_unchecked()
        .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

    let actor_id =
        activity
            .actor_unchecked()
            .as_single_id()
            .ok_or(crate::Error::InternalStrStatic(
                "Missing actor for activity",
            ))?;

    let object_id = activity
        .object()
        .as_single_id()
        .ok_or(crate::Error::InternalStrStatic("Missing object for Undo"))?;

    require_containment(activity_id, actor_id)?;
    require_containment(object_id, actor_id)?;

    let object_id = object_id.as_str();

    let db = ctx.db_pool.get().await?;

    db.execute("DELETE FROM post_like WHERE ap_id=$1", &[&object_id])
        .await?;
    db.execute("DELETE FROM reply_like WHERE ap_id=$1", &[&object_id])
        .await?;
    db.execute("DELETE FROM community_follow WHERE ap_id=$1", &[&object_id])
        .await?;
    db.execute(
        "UPDATE post SET approved=FALSE, approved_ap_id=NULL WHERE approved_ap_id=$1",
        &[&object_id],
    )
    .await?;

    Ok(())
}

fn get_message_digest(src: Option<&str>) -> Option<openssl::hash::MessageDigest> {
    match src {
        None | Some(SIGALG_RSA_SHA256) => Some(openssl::hash::MessageDigest::sha256()),


@@ 2117,7 1390,7 @@ pub async fn check_signature_for_actor(
    headers: &hyper::header::HeaderMap,
    actor_ap_id: &url::Url,
    db: &tokio_postgres::Client,
    http_client: &crate::HttpClient,
    ctx: &Arc<crate::BaseContext>,
) -> Result<bool, crate::Error> {
    let found_key = db.query_opt("(SELECT public_key, public_key_sigalg FROM person WHERE ap_id=$1) UNION ALL (SELECT public_key, public_key_sigalg FROM community WHERE ap_id=$1) LIMIT 1", &[&actor_ap_id.as_str()]).await?
        .and_then(|row| {


@@ 2149,7 1422,7 @@ pub async fn check_signature_for_actor(
    // Either no key found or failed to verify
    // Try fetching the actor/key

    let actor = fetch_actor(actor_ap_id, db, http_client).await?;
    let actor = fetch_actor(actor_ap_id, ctx.clone()).await?;

    if let Some(key_info) = actor.public_key() {
        let key = openssl::pkey::PKey::public_key_from_pem(&key_info.key)?;


@@ 2172,8 1445,7 @@ pub async fn check_signature_for_actor(
pub async fn verify_incoming_object(
    mut req: hyper::Request<hyper::Body>,
    db: &tokio_postgres::Client,
    http_client: &crate::HttpClient,
    apub_proxy_rewrites: bool,
    ctx: &Arc<crate::BaseContext>,
) -> Result<Verified<KnownObject>, crate::Error> {
    let req_body = hyper::body::to_bytes(req.body_mut()).await?;



@@ 2184,7 1456,7 @@ pub async fn verify_incoming_object(
                crate::Error::InternalStrStatic("Missing id in received activity")
            })?;

            let res_body = fetch_ap_object(&ap_id, http_client).await?;
            let res_body = fetch_ap_object(&ap_id, &ctx.http_client).await?;

            Ok(res_body)
        }


@@ 2210,7 1482,7 @@ pub async fn verify_incoming_object(
                .as_str();

            // path ends up wrong with our recommended proxy config
            let path_and_query = if apub_proxy_rewrites {
            let path_and_query = if ctx.apub_proxy_rewrites {
                req.headers()
                    .get("x-forwarded-path")
                    .map(|x| x.to_str())


@@ 2227,7 1499,7 @@ pub async fn verify_incoming_object(
                &req.headers(),
                &actor_ap_id,
                db,
                http_client,
                &ctx,
            )
            .await?
            {

M src/main.rs => src/main.rs +12 -20
@@ 144,6 144,8 @@ pub struct BaseContext {
    pub api_ratelimit: henry::RatelimitBucket<std::net::IpAddr>,

    pub local_hostname: String,

    worker_trigger: tokio::sync::mpsc::Sender<()>,
}

impl BaseContext {


@@ 192,14 194,7 @@ impl BaseContext {
            href.into()
        }
    }
}

pub struct RouteContext {
    base: Arc<BaseContext>,
    worker_trigger: tokio::sync::mpsc::Sender<()>,
}

impl RouteContext {
    pub async fn enqueue_task<T: crate::tasks::TaskDef>(
        &self,
        task: &T,


@@ 219,13 214,7 @@ impl RouteContext {
    }
}

impl std::ops::Deref for RouteContext {
    type Target = BaseContext;

    fn deref(&self) -> &BaseContext {
        &self.base
    }
}
pub type RouteContext = BaseContext;

pub type RouteNode<P> = trout::Node<
    P,


@@ 349,9 338,13 @@ pub enum ActorLocalRef {
    Community(CommunityLocalID),
}

#[derive(Clone, Copy, Debug, Serialize)]
#[serde(tag = "type")]
pub enum ThingLocalRef {
    Post(PostLocalID),
    Comment(CommentLocalID),
    User(UserLocalID),
    Community(CommunityLocalID),
}

#[derive(Debug)]


@@ 994,8 987,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
        panic!("SMTP_URL was provided, but SMTP_FROM was not");
    }

    let (worker_trigger, worker_rx) = tokio::sync::mpsc::channel(1);

    let routes = Arc::new(routes::route_root());
    let base_context = Arc::new(BaseContext {
    let context = Arc::new(BaseContext {
        local_hostname: get_url_host(&host_url_apub)
            .expect("Couldn't find host in HOST_URL_ACTIVITYPUB"),



@@ 1008,15 1003,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
        http_client: hyper::Client::builder().build(hyper_tls::HttpsConnector::new()),
        apub_proxy_rewrites,
        api_ratelimit: henry::RatelimitBucket::new(300),
    });

    let worker_trigger = worker::start_worker(base_context.clone());

    let context = Arc::new(RouteContext {
        base: base_context,
        worker_trigger,
    });

    worker::start_worker(context.clone(), worker_rx);

    let server = hyper::Server::bind(&(std::net::Ipv6Addr::UNSPECIFIED, port).into()).serve(
        hyper::service::make_service_fn(|sock: &hyper::server::conn::AddrStream| {
            let addr_direct = sock.remote_addr().ip();

M src/routes/api/mod.rs => src/routes/api/mod.rs +83 -3
@@ 199,6 199,13 @@ pub fn route_api() -> crate::RouteNode<()> {
                    crate::RouteNode::new()
                        .with_handler_async("GET", route_unstable_nodeinfo_20_get),
                )
                .with_child(
                    "objects:lookup",
                    crate::RouteNode::new().with_child_str(
                        crate::RouteNode::new()
                            .with_handler_async("GET", route_unstable_objects_lookup),
                    ),
                )
                .with_child("communities", communities::route_communities())
                .with_child(
                    "instance",


@@ 269,8 276,6 @@ async fn route_unstable_actors_lookup(
    let (query,) = params;
    println!("lookup {}", query);

    let db = ctx.db_pool.get().await?;

    let lookup = parse_lookup(&query)?;

    let uri = match lookup {


@@ 325,7 330,7 @@ async fn route_unstable_actors_lookup(
        }
    };

    let actor = crate::apub_util::fetch_actor(&uri, &db, &ctx.http_client).await?;
    let actor = crate::apub_util::fetch_actor(&uri, ctx).await?;

    let info = match actor {
        crate::apub_util::ActorLocalInfo::Community { id, .. } => {


@@ 567,6 572,81 @@ async fn route_unstable_instance_patch(
    }
}

async fn route_unstable_objects_lookup(
    params: (String,),
    ctx: Arc<crate::RouteContext>,
    _req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, crate::Error> {
    let (query,) = params;
    println!("lookup {}", query);

    let lookup = parse_lookup(&query)?;

    let uri = match lookup {
        Lookup::URL(uri) => Some(uri),
        Lookup::WebFinger { user, host } => {
            let uri = format!(
                "https://{}/.well-known/webfinger?{}",
                host,
                serde_urlencoded::to_string(FingerRequestQuery {
                    resource: format!("acct:{}@{}", user, host).into(),
                    rel: Some("self".into()),
                })?
            );
            println!("{}", uri);
            let res = ctx
                .http_client
                .request(hyper::Request::get(uri).body(Default::default())?)
                .await?;

            if res.status() == hyper::StatusCode::NOT_FOUND {
                println!("not found");
                None
            } else {
                let res = crate::res_to_error(res).await?;

                let res = hyper::body::to_bytes(res.into_body()).await?;
                let res: FingerResponse = serde_json::from_slice(&res)?;

                let mut found_uri = None;
                for entry in res.links {
                    if entry.rel == "self"
                        && entry.type_.as_deref() == Some(crate::apub_util::ACTIVITY_TYPE)
                    {
                        if let Some(href) = entry.href {
                            found_uri = Some(href.parse()?);
                            break;
                        }
                    }
                }

                found_uri
            }
        }
    };

    let res = match &uri {
        Some(uri) => {
            let obj = crate::apub_util::fetch_ap_object(&uri, &ctx.http_client).await?;

            crate::apub_util::ingest::ingest_object(
                obj,
                crate::apub_util::ingest::FoundFrom::Other,
                ctx,
            )
            .await?
        }
        None => None,
    };

    match res {
        None => Ok(crate::common_response_builder()
            .header(hyper::header::CONTENT_TYPE, "application/json")
            .body("[]".into())?),
        Some(res) => crate::json_response(&[res.into_ref()]),
    }
}

async fn apply_comments_replies<'a, T>(
    comments: &mut Vec<(T, RespPostCommentInfo<'a>)>,
    include_your_for: Option<UserLocalID>,

M src/routes/apub/communities.rs => src/routes/apub/communities.rs +2 -85
@@ 1,6 1,5 @@
use crate::{CommentLocalID, CommunityLocalID, PostLocalID, UserLocalID};
use activitystreams::prelude::*;
use std::borrow::Cow;
use std::ops::Deref;
use std::sync::Arc;



@@ 388,93 387,11 @@ async fn handler_communities_followers_accept_get(
}

async fn handler_communities_inbox_post(
    params: (CommunityLocalID,),
    _: (CommunityLocalID,),
    ctx: Arc<crate::RouteContext>,
    req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, crate::Error> {
    use crate::apub_util::{KnownObject, Verified};

    let (community_id,) = params;
    let db = ctx.db_pool.get().await?;

    let object = crate::apub_util::verify_incoming_object(
        req,
        &db,
        &ctx.http_client,
        ctx.apub_proxy_rewrites,
    )
    .await?;

    match object.into_inner() {
        KnownObject::Create(create) => {
            super::inbox_common_create(Verified(create), ctx).await?;
        }
        KnownObject::Follow(follow) => {
            let follow = Verified(follow);
            let follower_ap_id = follow.actor_unchecked().as_single_id();
            let target_community = follow.object().as_single_id();

            if let Some(follower_ap_id) = follower_ap_id {
                let activity_ap_id = follow
                    .id_unchecked()
                    .ok_or(crate::Error::InternalStrStatic("Missing activitity ID"))?;

                crate::apub_util::require_containment(activity_ap_id, follower_ap_id)?;
                let follow = crate::apub_util::Contained(Cow::Borrowed(&follow));

                let follower_local_id = crate::apub_util::get_or_fetch_user_local_id(
                    follower_ap_id,
                    &db,
                    &ctx.host_url_apub,
                    &ctx.http_client,
                )
                .await?;

                if let Some(target_community) = target_community {
                    if target_community
                        == crate::apub_util::get_local_community_apub_id(
                            community_id,
                            &ctx.host_url_apub,
                        )
                        .deref()
                    {
                        let row = db
                            .query_opt("SELECT local FROM community WHERE id=$1", &[&community_id])
                            .await?;
                        if let Some(row) = row {
                            let local: bool = row.get(0);
                            if local {
                                db.execute("INSERT INTO community_follow (community, follower, local, ap_id, accepted) VALUES ($1, $2, FALSE, $3, TRUE) ON CONFLICT (community, follower) DO NOTHING", &[&community_id, &follower_local_id, &activity_ap_id.as_str()]).await?;

                                crate::apub_util::spawn_enqueue_send_community_follow_accept(
                                    community_id,
                                    follower_local_id,
                                    follow.with_owned(),
                                    ctx,
                                );
                            }
                        } else {
                            eprintln!("Warning: recieved follow for unknown community");
                        }
                    } else {
                        eprintln!("Warning: recieved follow for wrong community");
                    }
                }
            }
        }
        KnownObject::Delete(activity) => {
            crate::apub_util::handle_delete(Verified(activity), ctx).await?;
        }
        KnownObject::Like(activity) => {
            crate::apub_util::handle_like(Verified(activity), ctx).await?;
        }
        KnownObject::Undo(activity) => {
            crate::apub_util::handle_undo(Verified(activity), ctx).await?;
        }
        _ => {}
    }

    Ok(crate::simple_response(hyper::StatusCode::ACCEPTED, ""))
    super::inbox_common(ctx, req).await
}

async fn handler_communities_outbox_get(

M src/routes/apub/mod.rs => src/routes/apub/mod.rs +6 -168
@@ 227,182 227,20 @@ async fn inbox_common(
    ctx: Arc<crate::RouteContext>,
    req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, crate::Error> {
    use crate::apub_util::{KnownObject, Verified};

    let db = ctx.db_pool.get().await?;

    let object = crate::apub_util::verify_incoming_object(
        req,
        &db,
        &ctx.http_client,
        ctx.apub_proxy_rewrites,
    let object = crate::apub_util::verify_incoming_object(req, &db, &ctx).await?;

    crate::apub_util::ingest::ingest_object(
        object,
        crate::apub_util::ingest::FoundFrom::Other,
        ctx,
    )
    .await?;

    match object.into_inner() {
        KnownObject::Accept(activity) => {
            let activity_id = activity
                .id_unchecked()
                .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

            let actor_ap_id = activity
                .actor_unchecked()
                .as_single_id()
                .ok_or(crate::Error::InternalStrStatic("Missing actor for Accept"))?;

            crate::apub_util::require_containment(activity_id, actor_ap_id)?;

            let actor_ap_id = actor_ap_id.as_str();

            let community_local_id: Option<CommunityLocalID> = {
                db.query_opt("SELECT id FROM community WHERE ap_id=$1", &[&actor_ap_id])
                    .await?
                    .map(|row| CommunityLocalID(row.get(0)))
            };

            if let Some(community_local_id) = community_local_id {
                let object_id = activity
                    .object()
                    .as_single_id()
                    .ok_or(crate::Error::InternalStrStatic("Missing object for Accept"))?;

                if let Some(remaining) =
                    crate::apub_util::try_strip_host(&object_id, &ctx.host_url_apub)
                {
                    if remaining.starts_with("/communities/") {
                        let remaining = &remaining[13..];
                        let next_expected = format!("{}/followers/", community_local_id);
                        if remaining.starts_with(&next_expected) {
                            let remaining = &remaining[next_expected.len()..];
                            let follower_local_id: UserLocalID = remaining.parse()?;

                            db.execute(
                                "UPDATE community_follow SET accepted=TRUE WHERE community=$1 AND follower=$2",
                                &[&community_local_id, &follower_local_id],
                            ).await?;
                        }
                    }
                }
            }
        }
        KnownObject::Announce(activity) => {
            let activity_id = activity
                .id_unchecked()
                .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

            let community_ap_id = activity.actor_unchecked().as_single_id().ok_or(
                crate::Error::InternalStrStatic("Missing actor for Announce"),
            )?;

            let community_local_info = db
                .query_opt(
                    "SELECT id, local FROM community WHERE ap_id=$1",
                    &[&community_ap_id.as_str()],
                )
                .await?
                .map(|row| (CommunityLocalID(row.get(0)), row.get(1)));

            if let Some((community_local_id, community_is_local)) = community_local_info {
                crate::apub_util::require_containment(activity_id, community_ap_id)?;

                let object_id = activity.object().as_single_id();

                if let Some(object_id) = object_id {
                    if let Some(remaining) =
                        crate::apub_util::try_strip_host(&object_id, &ctx.host_url_apub)
                    {
                        if remaining.starts_with("/posts/") {
                            let remaining = &remaining[7..];
                            if let Ok(local_post_id) = remaining.parse::<PostLocalID>() {
                                db.execute(
                                    "UPDATE post SET approved=TRUE, approved_ap_id=$1 WHERE id=$2 AND community=$3",
                                    &[&activity_id.as_str(), &local_post_id, &community_local_id],
                                ).await?;
                            }
                        }
                    } else {
                        // don't need announces for local objects
                        let obj =
                            crate::apub_util::fetch_ap_object(object_id, &ctx.http_client).await?;
                        crate::apub_util::handle_recieved_object_for_community(
                            community_local_id,
                            community_is_local,
                            Some(&activity_id),
                            obj,
                            ctx,
                        )
                        .await?;
                    }
                }
            }
        }
        KnownObject::Create(activity) => inbox_common_create(Verified(activity), ctx).await?,
        KnownObject::Delete(activity) => {
            crate::apub_util::handle_delete(Verified(activity), ctx).await?
        }
        KnownObject::Like(activity) => {
            crate::apub_util::handle_like(Verified(activity), ctx).await?
        }
        KnownObject::Undo(activity) => {
            crate::apub_util::handle_undo(Verified(activity), ctx).await?
        }
        KnownObject::Update(activity) => {
            let activity_id = activity
                .id_unchecked()
                .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

            let object_id =
                activity
                    .object()
                    .as_single_id()
                    .ok_or(crate::Error::InternalStrStatic(
                        "Missing object ID for Update",
                    ))?;

            crate::apub_util::require_containment(activity_id, object_id)?;

            let object_id = object_id.clone();

            crate::spawn_task(async move {
                let row = db
                    .query_opt(
                        "SELECT 1 FROM community WHERE ap_id=$1 LIMIT 1",
                        &[&object_id.as_str()],
                    )
                    .await?;
                if row.is_some() {
                    ctx.enqueue_task(&crate::tasks::FetchActor {
                        actor_ap_id: Cow::Owned(object_id),
                    })
                    .await?;
                }

                Ok(())
            });
        }
        _ => {}
    }

    Ok(crate::simple_response(hyper::StatusCode::ACCEPTED, ""))
}

pub async fn inbox_common_create(
    activity: crate::apub_util::Verified<activitystreams::activity::Create>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    for req_obj in activity.object().iter() {
        let object_id = req_obj.id();

        if let Some(object_id) = object_id {
            let obj = crate::apub_util::fetch_ap_object(object_id, &ctx.http_client).await?;

            crate::apub_util::handle_recieved_object_for_local_community(obj, ctx.clone()).await?;
        }
    }

    Ok(())
}

async fn handler_users_inbox_post(
    _: (UserLocalID,),
    ctx: Arc<crate::RouteContext>,

M src/tasks.rs => src/tasks.rs +6 -7
@@ 1,12 1,13 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::sync::Arc;

#[async_trait]
pub trait TaskDef: Serialize + std::fmt::Debug + Sync {
    const KIND: &'static str;
    const MAX_ATTEMPTS: i16 = 8;
    async fn perform(self, ctx: &crate::BaseContext) -> Result<(), crate::Error>;
    async fn perform(self, ctx: Arc<crate::BaseContext>) -> Result<(), crate::Error>;
}

#[derive(Deserialize, Serialize, Debug)]


@@ 20,7 21,7 @@ pub struct DeliverToInbox<'a> {
impl<'a> TaskDef for DeliverToInbox<'a> {
    const KIND: &'static str = "deliver_to_inbox";

    async fn perform(self, ctx: &crate::BaseContext) -> Result<(), crate::Error> {
    async fn perform(self, ctx: Arc<crate::BaseContext>) -> Result<(), crate::Error> {
        let db = ctx.db_pool.get().await?;

        let signing_info: Option<(_, _)> = match self.sign_as {


@@ 98,7 99,7 @@ pub struct DeliverToFollowers {
impl TaskDef for DeliverToFollowers {
    const KIND: &'static str = "deliver_to_followers";

    async fn perform(self, ctx: &crate::BaseContext) -> Result<(), crate::Error> {
    async fn perform(self, ctx: Arc<crate::BaseContext>) -> Result<(), crate::Error> {
        let community_id = match self.actor {
            crate::ActorLocalRef::Community(id) => id,
            crate::ActorLocalRef::Person(_) => return Ok(()), // We don't have user followers at this point


@@ 124,10 125,8 @@ pub struct FetchActor<'a> {
impl<'a> TaskDef for FetchActor<'a> {
    const KIND: &'static str = "fetch_actor";

    async fn perform(self, ctx: &crate::BaseContext) -> Result<(), crate::Error> {
        let db = ctx.db_pool.get().await?;

        crate::apub_util::fetch_actor(&self.actor_ap_id, &db, &ctx.http_client).await?;
    async fn perform(self, ctx: Arc<crate::BaseContext>) -> Result<(), crate::Error> {
        crate::apub_util::fetch_actor(&self.actor_ap_id, ctx).await?;

        Ok(())
    }

M src/worker.rs => src/worker.rs +3 -6
@@ 1,10 1,7 @@
use std::sync::Arc;

pub fn start_worker(ctx: Arc<crate::BaseContext>) -> tokio::sync::mpsc::Sender<()> {
    let (tx, rx) = tokio::sync::mpsc::channel(1);
pub fn start_worker(ctx: Arc<crate::BaseContext>, rx: tokio::sync::mpsc::Receiver<()>) {
    crate::spawn_task(run_worker(ctx, rx));

    tx
}

async fn run_worker(


@@ 39,7 36,7 @@ async fn run_worker(
            let kind: &str = row.get(1);
            let params: serde_json::Value = row.get(2);

            let result = perform_task(&ctx, kind, params).await;
            let result = perform_task(ctx.clone(), kind, params).await;
            if let Err(err) = result {
                let err = format!("{:?}", err);
                db.execute(


@@ 63,7 60,7 @@ async fn run_worker(
}

async fn perform_task(
    ctx: &crate::BaseContext,
    ctx: Arc<crate::BaseContext>,
    kind: &str,
    params: serde_json::Value,
) -> Result<(), crate::Error> {