~vpzom/lotide

50b3d3172e0e9e8613eb2cb1061fe724810bea7b — Colin Reeder 11 months ago c0668f9
Initial work on rearranging ingestion code
4 files changed, 1042 insertions(+), 878 deletions(-)

A src/apub_util/ingest.rs
R src/{apub_util.rs => apub_util/mod.rs}
M src/routes/apub/communities.rs
M src/routes/apub/mod.rs
A src/apub_util/ingest.rs => src/apub_util/ingest.rs +1031 -0
@@ 0,0 1,1031 @@
use super::{KnownObject, Verified};
use crate::{CommentLocalID, CommunityLocalID, PostLocalID, ThingLocalRef, UserLocalID};
use activitystreams::prelude::*;
use std::borrow::Cow;
use std::future::Future;
use std::ops::Deref;
use std::sync::Arc;

pub enum FoundFrom {
    Announce {
        url: url::Url,
        community_local_id: CommunityLocalID,
        community_is_local: bool,
    },
    Other,
}

impl FoundFrom {
    pub fn as_announce(&self) -> Option<&url::Url> {
        match self {
            FoundFrom::Announce { url, .. } => Some(url),
            _ => None,
        }
    }
}

pub async fn ingest_object(
    object: Verified<KnownObject>,
    found_from: FoundFrom,
    ctx: Arc<crate::RouteContext>,
) -> Result<Option<super::ActorLocalInfo>, crate::Error> {
    let db = ctx.db_pool.get().await?;
    match object.into_inner() {
        KnownObject::Accept(activity) => {
            let activity_id = activity
                .id_unchecked()
                .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

            let actor_ap_id = activity
                .actor_unchecked()
                .as_single_id()
                .ok_or(crate::Error::InternalStrStatic("Missing actor for Accept"))?;

            crate::apub_util::require_containment(activity_id, actor_ap_id)?;

            let actor_ap_id = actor_ap_id.as_str();

            let community_local_id: Option<CommunityLocalID> = {
                db.query_opt("SELECT id FROM community WHERE ap_id=$1", &[&actor_ap_id])
                    .await?
                    .map(|row| CommunityLocalID(row.get(0)))
            };

            if let Some(community_local_id) = community_local_id {
                let object_id = activity
                    .object()
                    .as_single_id()
                    .ok_or(crate::Error::InternalStrStatic("Missing object for Accept"))?;

                if let Some(remaining) =
                    crate::apub_util::try_strip_host(&object_id, &ctx.host_url_apub)
                {
                    if remaining.starts_with("/communities/") {
                        let remaining = &remaining[13..];
                        let next_expected = format!("{}/followers/", community_local_id);
                        if remaining.starts_with(&next_expected) {
                            let remaining = &remaining[next_expected.len()..];
                            let follower_local_id: UserLocalID = remaining.parse()?;

                            db.execute(
                                "UPDATE community_follow SET accepted=TRUE WHERE community=$1 AND follower=$2",
                                &[&community_local_id, &follower_local_id],
                            ).await?;
                        }
                    }
                }
            }

            Ok(None)
        }
        KnownObject::Announce(activity) => {
            let activity_id = activity
                .id_unchecked()
                .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

            let community_ap_id = activity.actor_unchecked().as_single_id().ok_or(
                crate::Error::InternalStrStatic("Missing actor for Announce"),
            )?;

            let community_local_info = db
                .query_opt(
                    "SELECT id, local FROM community WHERE ap_id=$1",
                    &[&community_ap_id.as_str()],
                )
                .await?
                .map(|row| (CommunityLocalID(row.get(0)), row.get(1)));

            if let Some((community_local_id, community_is_local)) = community_local_info {
                crate::apub_util::require_containment(activity_id, community_ap_id)?;

                let object_id = activity.object().as_single_id();

                if let Some(object_id) = object_id {
                    if let Some(remaining) =
                        crate::apub_util::try_strip_host(&object_id, &ctx.host_url_apub)
                    {
                        if remaining.starts_with("/posts/") {
                            let remaining = &remaining[7..];
                            if let Ok(local_post_id) = remaining.parse::<PostLocalID>() {
                                db.execute(
                                    "UPDATE post SET approved=TRUE, approved_ap_id=$1 WHERE id=$2 AND community=$3",
                                    &[&activity_id.as_str(), &local_post_id, &community_local_id],
                                ).await?;
                            }
                        }
                    } else {
                        // don't need announces for local objects
                        let obj =
                            crate::apub_util::fetch_ap_object(object_id, &ctx.http_client).await?;

                        ingest_object_boxed(
                            obj,
                            FoundFrom::Announce {
                                url: activity_id.clone(),
                                community_local_id,
                                community_is_local,
                            },
                            ctx,
                        )
                        .await?;
                    }
                }
            }
            Ok(None)
        }
        KnownObject::Article(obj) => {
            ingest_postlike(Verified(KnownObject::Article(obj)), found_from, ctx).await
        }
        KnownObject::Create(activity) => {
            ingest_create(Verified(activity), ctx).await?;
            Ok(None)
        }
        KnownObject::Delete(activity) => {
            ingest_delete(Verified(activity), ctx).await?;
            Ok(None)
        }
        KnownObject::Follow(follow) => {
            let follow = Verified(follow);
            let follower_ap_id = follow.actor_unchecked().as_single_id();
            let target = follow.object().as_single_id();

            if let Some(follower_ap_id) = follower_ap_id {
                let activity_ap_id = follow
                    .id_unchecked()
                    .ok_or(crate::Error::InternalStrStatic("Missing activitity ID"))?;

                crate::apub_util::require_containment(activity_ap_id, follower_ap_id)?;
                let follow = crate::apub_util::Contained(Cow::Borrowed(&follow));

                let follower_local_id = crate::apub_util::get_or_fetch_user_local_id(
                    follower_ap_id,
                    &db,
                    &ctx.host_url_apub,
                    &ctx.http_client,
                )
                .await?;

                if let Some(target) = target {
                    if let Some(community_id) =
                        super::maybe_get_local_community_id_from_uri(target, &ctx.host_url_apub)
                    {
                        let row = db
                            .query_opt("SELECT local FROM community WHERE id=$1", &[&community_id])
                            .await?;
                        if let Some(row) = row {
                            let local: bool = row.get(0);
                            if local {
                                db.execute("INSERT INTO community_follow (community, follower, local, ap_id, accepted) VALUES ($1, $2, FALSE, $3, TRUE) ON CONFLICT (community, follower) DO NOTHING", &[&community_id, &follower_local_id, &activity_ap_id.as_str()]).await?;

                                crate::apub_util::spawn_enqueue_send_community_follow_accept(
                                    community_id,
                                    follower_local_id,
                                    follow.with_owned(),
                                    ctx,
                                );
                            }
                        } else {
                            eprintln!("Warning: recieved follow for unknown community");
                        }
                    }
                }
            }

            Ok(None)
        }
        KnownObject::Group(group) => {
            let ap_id = group
                .id_unchecked()
                .ok_or(crate::Error::InternalStrStatic("Missing ID in Group"))?;

            let name = group
                .preferred_username()
                .or_else(|| {
                    group
                        .name()
                        .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
                })
                .unwrap_or("");
            let description = group
                .summary()
                .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
                .unwrap_or("");
            let inbox = group.inbox_unchecked().as_str();
            let shared_inbox = group
                .endpoints_unchecked()
                .and_then(|endpoints| endpoints.shared_inbox)
                .map(|url| url.as_str());
            let public_key = group
                .ext_one
                .public_key
                .as_ref()
                .map(|key| key.public_key_pem.as_bytes());
            let public_key_sigalg = group
                .ext_one
                .public_key
                .as_ref()
                .and_then(|key| key.signature_algorithm.as_deref());

            let id = CommunityLocalID(db.query_one(
                "INSERT INTO community (name, local, ap_id, ap_inbox, ap_shared_inbox, public_key, public_key_sigalg, description) VALUES ($1, FALSE, $2, $3, $4, $5, $6, $7) ON CONFLICT (ap_id) DO UPDATE SET ap_inbox=$3, ap_shared_inbox=$4, public_key=$5, public_key_sigalg=$6, description=$7 RETURNING id",
                &[&name, &ap_id.as_str(), &inbox, &shared_inbox, &public_key, &public_key_sigalg, &description],
            ).await?.get(0));

            Ok(Some(super::ActorLocalInfo::Community {
                id,
                public_key: public_key.map(|key| super::PubKeyInfo {
                    algorithm: super::get_message_digest(public_key_sigalg),
                    key: key.to_owned(),
                }),
            }))
        }
        KnownObject::Image(obj) => {
            ingest_postlike(Verified(KnownObject::Image(obj)), found_from, ctx).await
        }
        KnownObject::Like(activity) => {
            ingest_like(Verified(activity), ctx).await?;
            Ok(None)
        }
        KnownObject::Note(obj) => {
            ingest_postlike(Verified(KnownObject::Note(obj)), found_from, ctx).await
        }
        KnownObject::Page(obj) => {
            ingest_postlike(Verified(KnownObject::Page(obj)), found_from, ctx).await
        }
        KnownObject::Person(person) => {
            let ap_id = person
                .id_unchecked()
                .ok_or(crate::Error::InternalStrStatic("Missing ID in Person"))?;

            let username = person
                .preferred_username()
                .or_else(|| {
                    person
                        .name()
                        .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
                })
                .unwrap_or("");
            let inbox = person.inbox_unchecked().as_str();
            let shared_inbox = person
                .endpoints_unchecked()
                .and_then(|endpoints| endpoints.shared_inbox)
                .map(|url| url.as_str());
            let public_key = person
                .ext_one
                .public_key
                .as_ref()
                .map(|key| key.public_key_pem.as_bytes());
            let public_key_sigalg = person
                .ext_one
                .public_key
                .as_ref()
                .and_then(|key| key.signature_algorithm.as_deref());
            let description = person
                .summary()
                .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
                .unwrap_or("");

            let avatar = person.icon().and_then(|icon| {
                icon.iter()
                    .filter_map(|icon| {
                        if icon.kind_str() == Some("Image") {
                            match activitystreams::object::Image::from_any_base(icon.clone()) {
                                Err(_) | Ok(None) => None,
                                Ok(Some(icon)) => Some(icon),
                            }
                        } else {
                            None
                        }
                    })
                    .next()
            });
            let avatar = avatar
                .as_ref()
                .and_then(|icon| icon.url().and_then(|url| url.as_single_id()))
                .map(|x| x.as_str());

            let id = UserLocalID(db.query_one(
                "INSERT INTO person (username, local, created_local, ap_id, ap_inbox, ap_shared_inbox, public_key, public_key_sigalg, description, avatar) VALUES ($1, FALSE, localtimestamp, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (ap_id) DO UPDATE SET ap_inbox=$3, ap_shared_inbox=$4, public_key=$5, public_key_sigalg=$6, description=$7, avatar=$8 RETURNING id",
                &[&username, &ap_id.as_str(), &inbox, &shared_inbox, &public_key, &public_key_sigalg, &description, &avatar],
            ).await?.get(0));

            Ok(Some(super::ActorLocalInfo::User {
                id,
                public_key: public_key.map(|key| super::PubKeyInfo {
                    algorithm: super::get_message_digest(public_key_sigalg),
                    key: key.to_owned(),
                }),
            }))
        }
        KnownObject::Undo(activity) => {
            ingest_undo(Verified(activity), ctx).await?;
            Ok(None)
        }
        KnownObject::Update(activity) => {
            let activity_id = activity
                .id_unchecked()
                .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

            let object_id =
                activity
                    .object()
                    .as_single_id()
                    .ok_or(crate::Error::InternalStrStatic(
                        "Missing object ID for Update",
                    ))?;

            crate::apub_util::require_containment(activity_id, object_id)?;

            let object_id = object_id.clone();

            crate::spawn_task(async move {
                let row = db
                    .query_opt(
                        "SELECT 1 FROM community WHERE ap_id=$1 LIMIT 1",
                        &[&object_id.as_str()],
                    )
                    .await?;
                if row.is_some() {
                    ctx.enqueue_task(&crate::tasks::FetchActor {
                        actor_ap_id: Cow::Owned(object_id),
                    })
                    .await?;
                }

                Ok(())
            });

            Ok(None)
        }
    }
}

pub fn ingest_object_boxed(
    object: Verified<KnownObject>,
    found_from: FoundFrom,
    ctx: Arc<crate::RouteContext>,
) -> std::pin::Pin<
    Box<dyn Future<Output = Result<Option<super::ActorLocalInfo>, crate::Error>> + Send>,
> {
    Box::pin(ingest_object(object, found_from, ctx))
}

pub async fn ingest_like(
    activity: Verified<activitystreams::activity::Like>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let db = ctx.db_pool.get().await?;

    let activity_id = activity
        .id_unchecked()
        .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

    if let Some(actor_id) = activity.actor_unchecked().as_single_id() {
        super::require_containment(activity_id, actor_id)?;

        let actor_local_id =
            super::get_or_fetch_user_local_id(actor_id, &db, &ctx.host_url_apub, &ctx.http_client)
                .await?;

        if let Some(object_id) = activity.object().as_single_id() {
            let thing_local_ref = if let Some(remaining) =
                super::try_strip_host(&object_id, &ctx.host_url_apub)
            {
                if remaining.starts_with("/posts/") {
                    if let Ok(local_post_id) = remaining[7..].parse() {
                        Some(ThingLocalRef::Post(local_post_id))
                    } else {
                        None
                    }
                } else if remaining.starts_with("/comments/") {
                    if let Ok(local_comment_id) = remaining[10..].parse() {
                        Some(ThingLocalRef::Comment(local_comment_id))
                    } else {
                        None
                    }
                } else {
                    None
                }
            } else {
                let row = db.query_opt(
                    "(SELECT TRUE, id FROM post WHERE ap_id=$1) UNION ALL (SELECT FALSE, id FROM reply WHERE ap_id=$1) LIMIT 1",
                    &[&object_id.as_str()],
                ).await?;

                if let Some(row) = row {
                    Some(if row.get(0) {
                        ThingLocalRef::Post(PostLocalID(row.get(1)))
                    } else {
                        ThingLocalRef::Comment(CommentLocalID(row.get(1)))
                    })
                } else {
                    None
                }
            };

            match thing_local_ref {
                Some(ThingLocalRef::Post(post_local_id)) => {
                    let row_count = db.execute(
                        "INSERT INTO post_like (post, person, local, ap_id) VALUES ($1, $2, FALSE, $3) ON CONFLICT (post, person) DO NOTHING",
                        &[&post_local_id, &actor_local_id, &activity_id.as_str()],
                    ).await?;

                    if row_count > 0 {
                        let row = db.query_opt("SELECT post.community, community.local FROM post, community WHERE post.community = community.id AND post.id=$1", &[&post_local_id]).await?;
                        if let Some(row) = row {
                            let community_local = row.get(1);
                            if community_local {
                                let community_id = CommunityLocalID(row.get(0));
                                let body = serde_json::to_string(&activity)?;
                                super::enqueue_forward_to_community_followers(
                                    community_id,
                                    body,
                                    ctx,
                                )
                                .await?;
                            }
                        }
                    }
                }
                Some(ThingLocalRef::Comment(comment_local_id)) => {
                    let row_count = db.execute(
                        "INSERT INTO reply_like (reply, person, local, ap_id) VALUES ($1, $2, FALSE, $3) ON CONFLICT (reply, person) DO NOTHING",
                        &[&comment_local_id, &actor_local_id, &activity_id.as_str()],
                    ).await?;

                    if row_count > 0 {
                        let row = db.query_opt("SELECT post.community, community.local FROM reply, post, community WHERE reply.post = post.id AND post.community = community.id AND post.id=$1", &[&comment_local_id]).await?;
                        if let Some(row) = row {
                            let community_local = row.get(1);
                            if community_local {
                                let community_id = CommunityLocalID(row.get(0));
                                let body = serde_json::to_string(&activity)?;
                                super::enqueue_forward_to_community_followers(
                                    community_id,
                                    body,
                                    ctx,
                                )
                                .await?;
                            }
                        }
                    }
                }
                _ => {}
            }
        }
    }

    Ok(())
}

pub async fn ingest_delete(
    activity: Verified<activitystreams::activity::Delete>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let db = ctx.db_pool.get().await?;

    let activity_id = activity
        .id_unchecked()
        .ok_or(crate::Error::InternalStrStatic("Missing ID for activity"))?;
    let actor_id = activity
        .actor_unchecked()
        .as_single_id()
        .ok_or(crate::Error::InternalStrStatic("Missing ID for actor"))?;

    if let Some(object_id) = activity.object().as_single_id() {
        super::require_containment(activity_id, actor_id)?;
        super::require_containment(object_id, actor_id)?;

        let row = db.query_opt(
            "WITH deleted_post AS (UPDATE post SET href=NULL, title='[deleted]', content_text='[deleted]', content_markdown=NULL, content_html=NULL, deleted=TRUE WHERE ap_id=$1 AND deleted=FALSE RETURNING (SELECT id FROM community WHERE community.id = post.community AND community.local)), deleted_reply AS (UPDATE reply SET content_text='[deleted]', content_markdown=NULL, content_html=NULL, deleted=TRUE WHERE ap_id=$1 AND deleted=FALSE RETURNING (SELECT id FROM community WHERE community.id=(SELECT community FROM post WHERE id=reply.post) AND community.local)) (SELECT * FROM deleted_post) UNION ALL (SELECT * FROM deleted_reply) LIMIT 1",
            &[&object_id.as_str()],
            ).await?;

        if let Some(row) = row {
            // Something was deleted
            let local_community = row.get::<_, Option<_>>(0).map(CommunityLocalID);
            if let Some(community_id) = local_community {
                // Community is local, need to forward delete to followers

                let body = serde_json::to_string(&activity)?;

                crate::spawn_task(crate::apub_util::enqueue_forward_to_community_followers(
                    community_id,
                    body,
                    ctx,
                ));
            }
        }
    }

    Ok(())
}

pub async fn ingest_undo(
    activity: Verified<activitystreams::activity::Undo>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let activity_id = activity
        .id_unchecked()
        .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

    let actor_id =
        activity
            .actor_unchecked()
            .as_single_id()
            .ok_or(crate::Error::InternalStrStatic(
                "Missing actor for activity",
            ))?;

    let object_id = activity
        .object()
        .as_single_id()
        .ok_or(crate::Error::InternalStrStatic("Missing object for Undo"))?;

    super::require_containment(activity_id, actor_id)?;
    super::require_containment(object_id, actor_id)?;

    let object_id = object_id.as_str();

    let db = ctx.db_pool.get().await?;

    db.execute("DELETE FROM post_like WHERE ap_id=$1", &[&object_id])
        .await?;
    db.execute("DELETE FROM reply_like WHERE ap_id=$1", &[&object_id])
        .await?;
    db.execute("DELETE FROM community_follow WHERE ap_id=$1", &[&object_id])
        .await?;
    db.execute(
        "UPDATE post SET approved=FALSE, approved_ap_id=NULL WHERE approved_ap_id=$1",
        &[&object_id],
    )
    .await?;

    Ok(())
}

pub async fn ingest_create(
    activity: Verified<activitystreams::activity::Create>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    for req_obj in activity.object().iter() {
        let object_id = req_obj.id();

        if let Some(object_id) = object_id {
            let obj = crate::apub_util::fetch_ap_object(object_id, &ctx.http_client).await?;

            ingest_object_boxed(obj, FoundFrom::Other, ctx.clone()).await?;
        }
    }

    Ok(())
}

/// Ingestion flow for Page, Image, Article, and Note. Should not be called with any other objects.
async fn ingest_postlike(
    obj: Verified<KnownObject>,
    found_from: FoundFrom,
    ctx: Arc<crate::RouteContext>,
) -> Result<Option<super::ActorLocalInfo>, crate::Error> {
    let (to, in_reply_to, obj_id) = match obj.deref() {
        KnownObject::Page(obj) => (obj.to(), None, obj.id_unchecked()),
        KnownObject::Image(obj) => (obj.to(), None, obj.id_unchecked()),
        KnownObject::Article(obj) => (obj.to(), None, obj.id_unchecked()),
        KnownObject::Note(obj) => (obj.to(), obj.in_reply_to(), obj.id_unchecked()),
        _ => (None, None, None),
    };

    let community_found = match found_from {
        FoundFrom::Announce {
            community_local_id,
            community_is_local,
            ..
        } => Some((community_local_id, community_is_local)),
        _ => match to {
            None => None,
            Some(maybe) => maybe
                .iter()
                .filter_map(|any| {
                    any.as_xsd_any_uri()
                        .and_then(|uri| {
                            super::maybe_get_local_community_id_from_uri(uri, &ctx.host_url_apub)
                        })
                        .map(|id| (id, true))
                })
                .next(),
        },
    };

    if let Some((community_local_id, community_is_local)) = community_found {
        match obj.into_inner() {
            KnownObject::Page(obj) => {
                handle_received_page_for_community(
                    community_local_id,
                    community_is_local,
                    found_from.as_announce(),
                    Verified(obj),
                    ctx,
                )
                .await?
            }
            KnownObject::Image(obj) => {
                handle_received_page_for_community(
                    community_local_id,
                    community_is_local,
                    found_from.as_announce(),
                    Verified(obj),
                    ctx,
                )
                .await?
            }
            KnownObject::Article(obj) => {
                handle_received_page_for_community(
                    community_local_id,
                    community_is_local,
                    found_from.as_announce(),
                    Verified(obj),
                    ctx,
                )
                .await?
            }
            KnownObject::Note(obj) => {
                let content = obj.content().and_then(|x| x.as_single_xsd_string());
                let media_type = obj.media_type();
                let created = obj.published();
                let author = obj.attributed_to().and_then(|x| x.as_single_id());

                // fetch first attachment
                let attachment_href = obj
                    .attachment()
                    .and_then(|x| x.iter().next())
                    .and_then(
                        |base: &activitystreams::base::AnyBase| match base.kind_str() {
                            Some("Document") => Some(
                                activitystreams::object::Document::from_any_base(base.clone())
                                    .map(|obj| obj.unwrap().take_url()),
                            ),
                            Some("Image") => Some(
                                activitystreams::object::Image::from_any_base(base.clone())
                                    .map(|obj| obj.unwrap().take_url()),
                            ),
                            _ => None,
                        },
                    )
                    .transpose()?
                    .flatten();
                let attachment_href = attachment_href
                    .as_ref()
                    .and_then(|href| href.iter().filter_map(|x| x.as_xsd_any_uri()).next())
                    .map(|href| href.as_str());

                if let Some(object_id) = obj.id_unchecked() {
                    if let Some(in_reply_to) = obj.in_reply_to() {
                        // it's a reply

                        handle_recieved_reply(
                            object_id,
                            content.unwrap_or(""),
                            media_type,
                            created.as_ref(),
                            author,
                            in_reply_to,
                            attachment_href,
                            ctx,
                        )
                        .await?;
                    } else {
                        // not a reply, must be a top-level post
                        let title = obj
                            .summary()
                            .and_then(|x| x.as_single_xsd_string())
                            .unwrap_or("");

                        // Interpret attachments (usually images) as links
                        let href = obj
                            .attachment()
                            .and_then(|x| x.iter().next())
                            .and_then(|base: &activitystreams::base::AnyBase| {
                                match base.kind_str() {
                                    Some("Document") => Some(
                                        activitystreams::object::Document::from_any_base(
                                            base.clone(),
                                        )
                                        .map(|obj| obj.unwrap().take_url()),
                                    ),
                                    Some("Image") => Some(
                                        activitystreams::object::Image::from_any_base(base.clone())
                                            .map(|obj| obj.unwrap().take_url()),
                                    ),
                                    _ => None,
                                }
                            })
                            .transpose()?
                            .flatten();
                        let href = href
                            .as_ref()
                            .and_then(|href| href.iter().filter_map(|x| x.as_xsd_any_uri()).next())
                            .map(|href| href.as_str());

                        if let Some(object_id) = obj.id_unchecked() {
                            handle_recieved_post(
                                object_id.clone(),
                                title,
                                href,
                                content,
                                media_type,
                                created.as_ref(),
                                author,
                                community_local_id,
                                community_is_local,
                                found_from.as_announce(),
                                ctx,
                            )
                            .await?;
                        }
                    }
                }
            }
            _ => {}
        }
    } else {
        // not to a community, but might still match as a reply
        if let Some(in_reply_to) = in_reply_to {
            if let Some(obj_id) = obj_id {
                if let KnownObject::Note(obj) = obj.deref() {
                    // TODO deduplicate this?

                    let content = obj.content().and_then(|x| x.as_single_xsd_string());
                    let media_type = obj.media_type();
                    let created = obj.published();
                    let author = obj.attributed_to().and_then(|x| x.as_single_id());

                    if let Some(author) = author {
                        super::require_containment(obj_id, author)?;
                    }

                    // fetch first attachment
                    let attachment_href = obj
                        .attachment()
                        .and_then(|x| x.iter().next())
                        .and_then(
                            |base: &activitystreams::base::AnyBase| match base.kind_str() {
                                Some("Document") => Some(
                                    activitystreams::object::Document::from_any_base(base.clone())
                                        .map(|obj| obj.unwrap().take_url()),
                                ),
                                Some("Image") => Some(
                                    activitystreams::object::Image::from_any_base(base.clone())
                                        .map(|obj| obj.unwrap().take_url()),
                                ),
                                _ => None,
                            },
                        )
                        .transpose()?
                        .flatten();
                    let attachment_href = attachment_href
                        .as_ref()
                        .and_then(|href| href.iter().filter_map(|x| x.as_xsd_any_uri()).next())
                        .map(|href| href.as_str());

                    handle_recieved_reply(
                        obj_id,
                        content.unwrap_or(""),
                        media_type,
                        created.as_ref(),
                        author,
                        &in_reply_to,
                        attachment_href,
                        ctx,
                    )
                    .await?;
                }
            }
        }
    }

    Ok(None)
}

async fn handle_recieved_reply(
    object_id: &url::Url,
    content: &str,
    media_type: Option<&mime::Mime>,
    created: Option<&chrono::DateTime<chrono::FixedOffset>>,
    author: Option<&url::Url>,
    in_reply_to: &activitystreams::primitives::OneOrMany<activitystreams::base::AnyBase>,
    attachment_href: Option<&str>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let db = ctx.db_pool.get().await?;

    let author = match author {
        Some(author) => Some(
            super::get_or_fetch_user_local_id(&author, &db, &ctx.host_url_apub, &ctx.http_client)
                .await?,
        ),
        None => None,
    };

    let last_reply_to = in_reply_to.iter().last(); // TODO maybe not this? Not sure how to interpret inReplyTo

    if let Some(last_reply_to) = last_reply_to {
        if let Some(term_ap_id) = last_reply_to.as_xsd_any_uri() {
            #[derive(Debug)]
            enum ReplyTarget {
                Post {
                    id: PostLocalID,
                },
                Comment {
                    id: CommentLocalID,
                    post: PostLocalID,
                },
            }

            let target = if let Some(remaining) =
                super::try_strip_host(&term_ap_id, &ctx.host_url_apub)
            {
                if remaining.starts_with("/posts/") {
                    if let Ok(local_post_id) = remaining[7..].parse() {
                        Some(ReplyTarget::Post { id: local_post_id })
                    } else {
                        None
                    }
                } else if remaining.starts_with("/comments/") {
                    if let Ok(local_comment_id) = remaining[10..].parse() {
                        let row = db
                            .query_opt("SELECT post FROM reply WHERE id=$1", &[&local_comment_id])
                            .await?;
                        if let Some(row) = row {
                            Some(ReplyTarget::Comment {
                                id: local_comment_id,
                                post: PostLocalID(row.get(0)),
                            })
                        } else {
                            None
                        }
                    } else {
                        None
                    }
                } else {
                    None
                }
            } else {
                let row = db
                    .query_opt("(SELECT id, post FROM reply WHERE ap_id=$1) UNION (SELECT NULL, id FROM post WHERE ap_id=$1) LIMIT 1", &[&term_ap_id.as_str()])
                    .await?;
                row.map(|row| match row.get::<_, Option<_>>(0).map(CommentLocalID) {
                    Some(reply_id) => ReplyTarget::Comment {
                        id: reply_id,
                        post: PostLocalID(row.get(1)),
                    },
                    None => ReplyTarget::Post {
                        id: PostLocalID(row.get(1)),
                    },
                })
            };

            if let Some(target) = target {
                let (post, parent) = match target {
                    ReplyTarget::Post { id } => (id, None),
                    ReplyTarget::Comment { id, post } => (post, Some(id)),
                };

                let content_is_html = media_type.is_none() || media_type == Some(&mime::TEXT_HTML);
                let (content_text, content_html) = if content_is_html {
                    (None, Some(content))
                } else {
                    (Some(content), None)
                };

                let row = db.query_opt(
                    "INSERT INTO reply (post, parent, author, content_text, content_html, created, local, ap_id, attachment_href) VALUES ($1, $2, $3, $4, $5, COALESCE($6, current_timestamp), FALSE, $7, $8) ON CONFLICT (ap_id) DO NOTHING RETURNING id",
                    &[&post, &parent, &author, &content_text, &content_html, &created, &object_id.as_str(), &attachment_href],
                    ).await?;

                if let Some(row) = row {
                    let info = crate::CommentInfo {
                        id: CommentLocalID(row.get(0)),
                        author,
                        post,
                        parent,
                        content_text: content_text.map(|x| Cow::Owned(x.to_owned())),
                        content_markdown: None,
                        content_html: content_html.map(|x| Cow::Owned(x.to_owned())),
                        created: created.copied().unwrap_or_else(|| {
                            chrono::offset::Utc::now()
                                .with_timezone(&chrono::offset::FixedOffset::west(0))
                        }),
                        ap_id: crate::APIDOrLocal::APID(object_id.to_owned()),
                        attachment_href: attachment_href.map(|x| Cow::Owned(x.to_owned())),
                    };

                    crate::on_post_add_comment(info, ctx);
                }
            }
        }
    }

    Ok(())
}

async fn handle_received_page_for_community<Kind: Clone + std::fmt::Debug>(
    community_local_id: CommunityLocalID,
    community_is_local: bool,
    is_announce: Option<&url::Url>,
    obj: Verified<activitystreams::object::Object<Kind>>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let title = obj
        .summary()
        .iter()
        .map(|x| x.iter())
        .flatten()
        .filter_map(|maybe| maybe.as_xsd_string())
        .next()
        .unwrap_or("");
    let href = obj
        .url()
        .iter()
        .map(|x| x.iter())
        .flatten()
        .filter_map(|maybe| {
            maybe
                .as_xsd_any_uri()
                .map(|x| x.as_str())
                .or_else(|| maybe.as_xsd_string())
        })
        .next();
    let content = obj.content().and_then(|x| x.as_single_xsd_string());
    let media_type = obj.media_type();
    let created = obj.published();
    let author = obj.attributed_to().and_then(|x| x.as_single_id());

    if let Some(object_id) = obj.id_unchecked() {
        if let Some(author) = author {
            super::require_containment(&object_id, author)?;
        }

        handle_recieved_post(
            object_id.clone(),
            title,
            href,
            content,
            media_type,
            created.as_ref(),
            author,
            community_local_id,
            community_is_local,
            is_announce,
            ctx,
        )
        .await?;
    }

    Ok(())
}

async fn handle_recieved_post(
    object_id: url::Url,
    title: &str,
    href: Option<&str>,
    content: Option<&str>,
    media_type: Option<&mime::Mime>,
    created: Option<&chrono::DateTime<chrono::FixedOffset>>,
    author: Option<&url::Url>,
    community_local_id: CommunityLocalID,
    community_is_local: bool,
    is_announce: Option<&url::Url>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let db = ctx.db_pool.get().await?;
    let author = match author {
        Some(author) => Some(
            super::get_or_fetch_user_local_id(&author, &db, &ctx.host_url_apub, &ctx.http_client)
                .await?,
        ),
        None => None,
    };

    let content_is_html = media_type.is_none() || media_type == Some(&mime::TEXT_HTML);
    let (content_text, content_html) = if content_is_html {
        (None, Some(content))
    } else {
        (Some(content), None)
    };

    let approved = is_announce.is_some() || community_is_local;

    let row = db.query_opt(
        "INSERT INTO post (author, href, content_text, content_html, title, created, community, local, ap_id, approved, approved_ap_id) VALUES ($1, $2, $3, $4, $5, COALESCE($6, current_timestamp), $7, FALSE, $8, $9, $10) ON CONFLICT (ap_id) DO UPDATE SET approved=$9, approved_ap_id=$10 RETURNING id",
        &[&author, &href, &content_text, &content_html, &title, &created, &community_local_id, &object_id.as_str(), &approved, &is_announce.map(|x| x.as_str())],
    ).await?;

    if community_is_local {
        if let Some(row) = row {
            let post_local_id = PostLocalID(row.get(0));
            crate::on_local_community_add_post(community_local_id, post_local_id, object_id, ctx);
        }
    }

    Ok(())
}

R src/apub_util.rs => src/apub_util/mod.rs +3 -631
@@ 1,4 1,4 @@
use crate::{BaseURL, CommentLocalID, CommunityLocalID, PostLocalID, ThingLocalRef, UserLocalID};
use crate::{BaseURL, CommentLocalID, CommunityLocalID, PostLocalID, UserLocalID};
use activitystreams::prelude::*;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;


@@ 7,6 7,8 @@ use std::convert::TryFrom;
use std::ops::Deref;
use std::sync::Arc;

pub mod ingest;

pub const ACTIVITY_TYPE: &str = "application/activity+json";

pub const SIGALG_RSA_SHA256: &str = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256";


@@ 1472,636 1474,6 @@ pub fn maybe_get_local_community_id_from_uri(
    }
}

pub async fn handle_recieved_object_for_local_community<'a>(
    obj: Verified<KnownObject>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let (to, in_reply_to, obj_id) = match obj.deref() {
        KnownObject::Page(obj) => (obj.to(), None, obj.id_unchecked()),
        KnownObject::Image(obj) => (obj.to(), None, obj.id_unchecked()),
        KnownObject::Article(obj) => (obj.to(), None, obj.id_unchecked()),
        KnownObject::Note(obj) => (obj.to(), obj.in_reply_to(), obj.id_unchecked()),
        _ => (None, None, None),
    };

    let local_community_id = match to {
        None => None,
        Some(maybe) => maybe
            .iter()
            .filter_map(|any| {
                any.as_xsd_any_uri()
                    .and_then(|uri| maybe_get_local_community_id_from_uri(uri, &ctx.host_url_apub))
            })
            .next(),
    };

    if let Some(local_community_id) = local_community_id {
        handle_recieved_object_for_community(local_community_id, true, None, obj, ctx).await?;
    } else {
        // not to a community, but might still match as a reply
        if let Some(in_reply_to) = in_reply_to {
            if let Some(obj_id) = obj_id {
                if let KnownObject::Note(obj) = obj.deref() {
                    // TODO deduplicate this?

                    let content = obj.content().and_then(|x| x.as_single_xsd_string());
                    let media_type = obj.media_type();
                    let created = obj.published();
                    let author = obj.attributed_to().and_then(|x| x.as_single_id());

                    if let Some(author) = author {
                        require_containment(obj_id, author)?;
                    }

                    // fetch first attachment
                    let attachment_href = obj
                        .attachment()
                        .and_then(|x| x.iter().next())
                        .and_then(
                            |base: &activitystreams::base::AnyBase| match base.kind_str() {
                                Some("Document") => Some(
                                    activitystreams::object::Document::from_any_base(base.clone())
                                        .map(|obj| obj.unwrap().take_url()),
                                ),
                                Some("Image") => Some(
                                    activitystreams::object::Image::from_any_base(base.clone())
                                        .map(|obj| obj.unwrap().take_url()),
                                ),
                                _ => None,
                            },
                        )
                        .transpose()?
                        .flatten();
                    let attachment_href = attachment_href
                        .as_ref()
                        .and_then(|href| href.iter().filter_map(|x| x.as_xsd_any_uri()).next())
                        .map(|href| href.as_str());

                    handle_recieved_reply(
                        obj_id,
                        content.unwrap_or(""),
                        media_type,
                        created.as_ref(),
                        author,
                        &in_reply_to,
                        attachment_href,
                        ctx,
                    )
                    .await?;
                }
            }
        }
    }

    Ok(())
}

pub async fn handle_received_page_for_community<Kind: Clone + std::fmt::Debug>(
    community_local_id: CommunityLocalID,
    community_is_local: bool,
    is_announce: Option<&url::Url>,
    obj: Verified<activitystreams::object::Object<Kind>>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let title = obj
        .summary()
        .iter()
        .map(|x| x.iter())
        .flatten()
        .filter_map(|maybe| maybe.as_xsd_string())
        .next()
        .unwrap_or("");
    let href = obj
        .url()
        .iter()
        .map(|x| x.iter())
        .flatten()
        .filter_map(|maybe| {
            maybe
                .as_xsd_any_uri()
                .map(|x| x.as_str())
                .or_else(|| maybe.as_xsd_string())
        })
        .next();
    let content = obj.content().and_then(|x| x.as_single_xsd_string());
    let media_type = obj.media_type();
    let created = obj.published();
    let author = obj.attributed_to().and_then(|x| x.as_single_id());

    if let Some(object_id) = obj.id_unchecked() {
        if let Some(author) = author {
            require_containment(&object_id, author)?;
        }

        handle_recieved_post(
            object_id.clone(),
            title,
            href,
            content,
            media_type,
            created.as_ref(),
            author,
            community_local_id,
            community_is_local,
            is_announce,
            ctx,
        )
        .await?;
    }

    Ok(())
}

pub async fn handle_recieved_object_for_community<'a>(
    community_local_id: CommunityLocalID,
    community_is_local: bool,
    is_announce: Option<&url::Url>,
    obj: Verified<KnownObject>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    println!("recieved object: {:?}", obj);

    match obj.into_inner() {
        KnownObject::Page(obj) => {
            handle_received_page_for_community(
                community_local_id,
                community_is_local,
                is_announce,
                Verified(obj),
                ctx,
            )
            .await?
        }
        KnownObject::Image(obj) => {
            handle_received_page_for_community(
                community_local_id,
                community_is_local,
                is_announce,
                Verified(obj),
                ctx,
            )
            .await?
        }
        KnownObject::Article(obj) => {
            handle_received_page_for_community(
                community_local_id,
                community_is_local,
                is_announce,
                Verified(obj),
                ctx,
            )
            .await?
        }
        KnownObject::Note(obj) => {
            let content = obj.content().and_then(|x| x.as_single_xsd_string());
            let media_type = obj.media_type();
            let created = obj.published();
            let author = obj.attributed_to().and_then(|x| x.as_single_id());

            // fetch first attachment
            let attachment_href = obj
                .attachment()
                .and_then(|x| x.iter().next())
                .and_then(
                    |base: &activitystreams::base::AnyBase| match base.kind_str() {
                        Some("Document") => Some(
                            activitystreams::object::Document::from_any_base(base.clone())
                                .map(|obj| obj.unwrap().take_url()),
                        ),
                        Some("Image") => Some(
                            activitystreams::object::Image::from_any_base(base.clone())
                                .map(|obj| obj.unwrap().take_url()),
                        ),
                        _ => None,
                    },
                )
                .transpose()?
                .flatten();
            let attachment_href = attachment_href
                .as_ref()
                .and_then(|href| href.iter().filter_map(|x| x.as_xsd_any_uri()).next())
                .map(|href| href.as_str());

            if let Some(object_id) = obj.id_unchecked() {
                if let Some(in_reply_to) = obj.in_reply_to() {
                    // it's a reply

                    handle_recieved_reply(
                        object_id,
                        content.unwrap_or(""),
                        media_type,
                        created.as_ref(),
                        author,
                        in_reply_to,
                        attachment_href,
                        ctx,
                    )
                    .await?;
                } else {
                    // not a reply, must be a top-level post
                    let title = obj
                        .summary()
                        .and_then(|x| x.as_single_xsd_string())
                        .unwrap_or("");

                    // Interpret attachments (usually images) as links
                    let href = obj
                        .attachment()
                        .and_then(|x| x.iter().next())
                        .and_then(
                            |base: &activitystreams::base::AnyBase| match base.kind_str() {
                                Some("Document") => Some(
                                    activitystreams::object::Document::from_any_base(base.clone())
                                        .map(|obj| obj.unwrap().take_url()),
                                ),
                                Some("Image") => Some(
                                    activitystreams::object::Image::from_any_base(base.clone())
                                        .map(|obj| obj.unwrap().take_url()),
                                ),
                                _ => None,
                            },
                        )
                        .transpose()?
                        .flatten();
                    let href = href
                        .as_ref()
                        .and_then(|href| href.iter().filter_map(|x| x.as_xsd_any_uri()).next())
                        .map(|href| href.as_str());

                    if let Some(object_id) = obj.id_unchecked() {
                        handle_recieved_post(
                            object_id.clone(),
                            title,
                            href,
                            content,
                            media_type,
                            created.as_ref(),
                            author,
                            community_local_id,
                            community_is_local,
                            is_announce,
                            ctx,
                        )
                        .await?;
                    }
                }
            }
        }
        _ => {}
    }

    Ok(())
}

async fn handle_recieved_post(
    object_id: url::Url,
    title: &str,
    href: Option<&str>,
    content: Option<&str>,
    media_type: Option<&mime::Mime>,
    created: Option<&chrono::DateTime<chrono::FixedOffset>>,
    author: Option<&url::Url>,
    community_local_id: CommunityLocalID,
    community_is_local: bool,
    is_announce: Option<&url::Url>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let db = ctx.db_pool.get().await?;
    let author = match author {
        Some(author) => Some(
            get_or_fetch_user_local_id(&author, &db, &ctx.host_url_apub, &ctx.http_client).await?,
        ),
        None => None,
    };

    let content_is_html = media_type.is_none() || media_type == Some(&mime::TEXT_HTML);
    let (content_text, content_html) = if content_is_html {
        (None, Some(content))
    } else {
        (Some(content), None)
    };

    let approved = is_announce.is_some() || community_is_local;

    let row = db.query_opt(
        "INSERT INTO post (author, href, content_text, content_html, title, created, community, local, ap_id, approved, approved_ap_id) VALUES ($1, $2, $3, $4, $5, COALESCE($6, current_timestamp), $7, FALSE, $8, $9, $10) ON CONFLICT (ap_id) DO UPDATE SET approved=$9, approved_ap_id=$10 RETURNING id",
        &[&author, &href, &content_text, &content_html, &title, &created, &community_local_id, &object_id.as_str(), &approved, &is_announce.map(|x| x.as_str())],
    ).await?;

    if community_is_local {
        if let Some(row) = row {
            let post_local_id = PostLocalID(row.get(0));
            crate::on_local_community_add_post(community_local_id, post_local_id, object_id, ctx);
        }
    }

    Ok(())
}

async fn handle_recieved_reply(
    object_id: &url::Url,
    content: &str,
    media_type: Option<&mime::Mime>,
    created: Option<&chrono::DateTime<chrono::FixedOffset>>,
    author: Option<&url::Url>,
    in_reply_to: &activitystreams::primitives::OneOrMany<activitystreams::base::AnyBase>,
    attachment_href: Option<&str>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let db = ctx.db_pool.get().await?;

    let author = match author {
        Some(author) => Some(
            get_or_fetch_user_local_id(&author, &db, &ctx.host_url_apub, &ctx.http_client).await?,
        ),
        None => None,
    };

    let last_reply_to = in_reply_to.iter().last(); // TODO maybe not this? Not sure how to interpret inReplyTo

    if let Some(last_reply_to) = last_reply_to {
        if let Some(term_ap_id) = last_reply_to.as_xsd_any_uri() {
            #[derive(Debug)]
            enum ReplyTarget {
                Post {
                    id: PostLocalID,
                },
                Comment {
                    id: CommentLocalID,
                    post: PostLocalID,
                },
            }

            let target = if let Some(remaining) = try_strip_host(&term_ap_id, &ctx.host_url_apub) {
                if remaining.starts_with("/posts/") {
                    if let Ok(local_post_id) = remaining[7..].parse() {
                        Some(ReplyTarget::Post { id: local_post_id })
                    } else {
                        None
                    }
                } else if remaining.starts_with("/comments/") {
                    if let Ok(local_comment_id) = remaining[10..].parse() {
                        let row = db
                            .query_opt("SELECT post FROM reply WHERE id=$1", &[&local_comment_id])
                            .await?;
                        if let Some(row) = row {
                            Some(ReplyTarget::Comment {
                                id: local_comment_id,
                                post: PostLocalID(row.get(0)),
                            })
                        } else {
                            None
                        }
                    } else {
                        None
                    }
                } else {
                    None
                }
            } else {
                let row = db
                    .query_opt("(SELECT id, post FROM reply WHERE ap_id=$1) UNION (SELECT NULL, id FROM post WHERE ap_id=$1) LIMIT 1", &[&term_ap_id.as_str()])
                    .await?;
                row.map(|row| match row.get::<_, Option<_>>(0).map(CommentLocalID) {
                    Some(reply_id) => ReplyTarget::Comment {
                        id: reply_id,
                        post: PostLocalID(row.get(1)),
                    },
                    None => ReplyTarget::Post {
                        id: PostLocalID(row.get(1)),
                    },
                })
            };

            if let Some(target) = target {
                let (post, parent) = match target {
                    ReplyTarget::Post { id } => (id, None),
                    ReplyTarget::Comment { id, post } => (post, Some(id)),
                };

                let content_is_html = media_type.is_none() || media_type == Some(&mime::TEXT_HTML);
                let (content_text, content_html) = if content_is_html {
                    (None, Some(content))
                } else {
                    (Some(content), None)
                };

                let row = db.query_opt(
                    "INSERT INTO reply (post, parent, author, content_text, content_html, created, local, ap_id, attachment_href) VALUES ($1, $2, $3, $4, $5, COALESCE($6, current_timestamp), FALSE, $7, $8) ON CONFLICT (ap_id) DO NOTHING RETURNING id",
                    &[&post, &parent, &author, &content_text, &content_html, &created, &object_id.as_str(), &attachment_href],
                    ).await?;

                if let Some(row) = row {
                    let info = crate::CommentInfo {
                        id: CommentLocalID(row.get(0)),
                        author,
                        post,
                        parent,
                        content_text: content_text.map(|x| Cow::Owned(x.to_owned())),
                        content_markdown: None,
                        content_html: content_html.map(|x| Cow::Owned(x.to_owned())),
                        created: created.copied().unwrap_or_else(|| {
                            chrono::offset::Utc::now()
                                .with_timezone(&chrono::offset::FixedOffset::west(0))
                        }),
                        ap_id: crate::APIDOrLocal::APID(object_id.to_owned()),
                        attachment_href: attachment_href.map(|x| Cow::Owned(x.to_owned())),
                    };

                    crate::on_post_add_comment(info, ctx);
                }
            }
        }
    }

    Ok(())
}

pub async fn handle_like(
    activity: Verified<activitystreams::activity::Like>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let db = ctx.db_pool.get().await?;

    let activity_id = activity
        .id_unchecked()
        .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

    if let Some(actor_id) = activity.actor_unchecked().as_single_id() {
        require_containment(activity_id, actor_id)?;

        let actor_local_id =
            get_or_fetch_user_local_id(actor_id, &db, &ctx.host_url_apub, &ctx.http_client).await?;

        if let Some(object_id) = activity.object().as_single_id() {
            let thing_local_ref = if let Some(remaining) =
                try_strip_host(&object_id, &ctx.host_url_apub)
            {
                if remaining.starts_with("/posts/") {
                    if let Ok(local_post_id) = remaining[7..].parse() {
                        Some(ThingLocalRef::Post(local_post_id))
                    } else {
                        None
                    }
                } else if remaining.starts_with("/comments/") {
                    if let Ok(local_comment_id) = remaining[10..].parse() {
                        Some(ThingLocalRef::Comment(local_comment_id))
                    } else {
                        None
                    }
                } else {
                    None
                }
            } else {
                let row = db.query_opt(
                    "(SELECT TRUE, id FROM post WHERE ap_id=$1) UNION ALL (SELECT FALSE, id FROM reply WHERE ap_id=$1) LIMIT 1",
                    &[&object_id.as_str()],
                ).await?;

                if let Some(row) = row {
                    Some(if row.get(0) {
                        ThingLocalRef::Post(PostLocalID(row.get(1)))
                    } else {
                        ThingLocalRef::Comment(CommentLocalID(row.get(1)))
                    })
                } else {
                    None
                }
            };

            match thing_local_ref {
                Some(ThingLocalRef::Post(post_local_id)) => {
                    let row_count = db.execute(
                        "INSERT INTO post_like (post, person, local, ap_id) VALUES ($1, $2, FALSE, $3) ON CONFLICT (post, person) DO NOTHING",
                        &[&post_local_id, &actor_local_id, &activity_id.as_str()],
                    ).await?;

                    if row_count > 0 {
                        let row = db.query_opt("SELECT post.community, community.local FROM post, community WHERE post.community = community.id AND post.id=$1", &[&post_local_id]).await?;
                        if let Some(row) = row {
                            let community_local = row.get(1);
                            if community_local {
                                let community_id = CommunityLocalID(row.get(0));
                                let body = serde_json::to_string(&activity)?;
                                enqueue_forward_to_community_followers(community_id, body, ctx)
                                    .await?;
                            }
                        }
                    }
                }
                Some(ThingLocalRef::Comment(comment_local_id)) => {
                    let row_count = db.execute(
                        "INSERT INTO reply_like (reply, person, local, ap_id) VALUES ($1, $2, FALSE, $3) ON CONFLICT (reply, person) DO NOTHING",
                        &[&comment_local_id, &actor_local_id, &activity_id.as_str()],
                    ).await?;

                    if row_count > 0 {
                        let row = db.query_opt("SELECT post.community, community.local FROM reply, post, community WHERE reply.post = post.id AND post.community = community.id AND post.id=$1", &[&comment_local_id]).await?;
                        if let Some(row) = row {
                            let community_local = row.get(1);
                            if community_local {
                                let community_id = CommunityLocalID(row.get(0));
                                let body = serde_json::to_string(&activity)?;
                                enqueue_forward_to_community_followers(community_id, body, ctx)
                                    .await?;
                            }
                        }
                    }
                }
                _ => {}
            }
        }
    }

    Ok(())
}

pub async fn handle_delete(
    activity: Verified<activitystreams::activity::Delete>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let db = ctx.db_pool.get().await?;

    let activity_id = activity
        .id_unchecked()
        .ok_or(crate::Error::InternalStrStatic("Missing ID for activity"))?;
    let actor_id = activity
        .actor_unchecked()
        .as_single_id()
        .ok_or(crate::Error::InternalStrStatic("Missing ID for actor"))?;

    if let Some(object_id) = activity.object().as_single_id() {
        require_containment(activity_id, actor_id)?;
        require_containment(object_id, actor_id)?;

        let row = db.query_opt(
            "WITH deleted_post AS (UPDATE post SET href=NULL, title='[deleted]', content_text='[deleted]', content_markdown=NULL, content_html=NULL, deleted=TRUE WHERE ap_id=$1 AND deleted=FALSE RETURNING (SELECT id FROM community WHERE community.id = post.community AND community.local)), deleted_reply AS (UPDATE reply SET content_text='[deleted]', content_markdown=NULL, content_html=NULL, deleted=TRUE WHERE ap_id=$1 AND deleted=FALSE RETURNING (SELECT id FROM community WHERE community.id=(SELECT community FROM post WHERE id=reply.post) AND community.local)) (SELECT * FROM deleted_post) UNION ALL (SELECT * FROM deleted_reply) LIMIT 1",
            &[&object_id.as_str()],
            ).await?;

        if let Some(row) = row {
            // Something was deleted
            let local_community = row.get::<_, Option<_>>(0).map(CommunityLocalID);
            if let Some(community_id) = local_community {
                // Community is local, need to forward delete to followers

                let body = serde_json::to_string(&activity)?;

                crate::spawn_task(crate::apub_util::enqueue_forward_to_community_followers(
                    community_id,
                    body,
                    ctx,
                ));
            }
        }
    }

    Ok(())
}

pub async fn handle_undo(
    activity: Verified<activitystreams::activity::Undo>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    let activity_id = activity
        .id_unchecked()
        .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

    let actor_id =
        activity
            .actor_unchecked()
            .as_single_id()
            .ok_or(crate::Error::InternalStrStatic(
                "Missing actor for activity",
            ))?;

    let object_id = activity
        .object()
        .as_single_id()
        .ok_or(crate::Error::InternalStrStatic("Missing object for Undo"))?;

    require_containment(activity_id, actor_id)?;
    require_containment(object_id, actor_id)?;

    let object_id = object_id.as_str();

    let db = ctx.db_pool.get().await?;

    db.execute("DELETE FROM post_like WHERE ap_id=$1", &[&object_id])
        .await?;
    db.execute("DELETE FROM reply_like WHERE ap_id=$1", &[&object_id])
        .await?;
    db.execute("DELETE FROM community_follow WHERE ap_id=$1", &[&object_id])
        .await?;
    db.execute(
        "UPDATE post SET approved=FALSE, approved_ap_id=NULL WHERE approved_ap_id=$1",
        &[&object_id],
    )
    .await?;

    Ok(())
}

fn get_message_digest(src: Option<&str>) -> Option<openssl::hash::MessageDigest> {
    match src {
        None | Some(SIGALG_RSA_SHA256) => Some(openssl::hash::MessageDigest::sha256()),

M src/routes/apub/communities.rs => src/routes/apub/communities.rs +2 -85
@@ 1,6 1,5 @@
use crate::{CommentLocalID, CommunityLocalID, PostLocalID, UserLocalID};
use activitystreams::prelude::*;
use std::borrow::Cow;
use std::ops::Deref;
use std::sync::Arc;



@@ 388,93 387,11 @@ async fn handler_communities_followers_accept_get(
}

async fn handler_communities_inbox_post(
    params: (CommunityLocalID,),
    _: (CommunityLocalID,),
    ctx: Arc<crate::RouteContext>,
    req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, crate::Error> {
    use crate::apub_util::{KnownObject, Verified};

    let (community_id,) = params;
    let db = ctx.db_pool.get().await?;

    let object = crate::apub_util::verify_incoming_object(
        req,
        &db,
        &ctx.http_client,
        ctx.apub_proxy_rewrites,
    )
    .await?;

    match object.into_inner() {
        KnownObject::Create(create) => {
            super::inbox_common_create(Verified(create), ctx).await?;
        }
        KnownObject::Follow(follow) => {
            let follow = Verified(follow);
            let follower_ap_id = follow.actor_unchecked().as_single_id();
            let target_community = follow.object().as_single_id();

            if let Some(follower_ap_id) = follower_ap_id {
                let activity_ap_id = follow
                    .id_unchecked()
                    .ok_or(crate::Error::InternalStrStatic("Missing activitity ID"))?;

                crate::apub_util::require_containment(activity_ap_id, follower_ap_id)?;
                let follow = crate::apub_util::Contained(Cow::Borrowed(&follow));

                let follower_local_id = crate::apub_util::get_or_fetch_user_local_id(
                    follower_ap_id,
                    &db,
                    &ctx.host_url_apub,
                    &ctx.http_client,
                )
                .await?;

                if let Some(target_community) = target_community {
                    if target_community
                        == crate::apub_util::get_local_community_apub_id(
                            community_id,
                            &ctx.host_url_apub,
                        )
                        .deref()
                    {
                        let row = db
                            .query_opt("SELECT local FROM community WHERE id=$1", &[&community_id])
                            .await?;
                        if let Some(row) = row {
                            let local: bool = row.get(0);
                            if local {
                                db.execute("INSERT INTO community_follow (community, follower, local, ap_id, accepted) VALUES ($1, $2, FALSE, $3, TRUE) ON CONFLICT (community, follower) DO NOTHING", &[&community_id, &follower_local_id, &activity_ap_id.as_str()]).await?;

                                crate::apub_util::spawn_enqueue_send_community_follow_accept(
                                    community_id,
                                    follower_local_id,
                                    follow.with_owned(),
                                    ctx,
                                );
                            }
                        } else {
                            eprintln!("Warning: recieved follow for unknown community");
                        }
                    } else {
                        eprintln!("Warning: recieved follow for wrong community");
                    }
                }
            }
        }
        KnownObject::Delete(activity) => {
            crate::apub_util::handle_delete(Verified(activity), ctx).await?;
        }
        KnownObject::Like(activity) => {
            crate::apub_util::handle_like(Verified(activity), ctx).await?;
        }
        KnownObject::Undo(activity) => {
            crate::apub_util::handle_undo(Verified(activity), ctx).await?;
        }
        _ => {}
    }

    Ok(crate::simple_response(hyper::StatusCode::ACCEPTED, ""))
    super::inbox_common(ctx, req).await
}

async fn handler_communities_outbox_get(

M src/routes/apub/mod.rs => src/routes/apub/mod.rs +6 -162
@@ 227,8 227,6 @@ async fn inbox_common(
    ctx: Arc<crate::RouteContext>,
    req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, crate::Error> {
    use crate::apub_util::{KnownObject, Verified};

    let db = ctx.db_pool.get().await?;

    let object = crate::apub_util::verify_incoming_object(


@@ 239,170 237,16 @@ async fn inbox_common(
    )
    .await?;

    match object.into_inner() {
        KnownObject::Accept(activity) => {
            let activity_id = activity
                .id_unchecked()
                .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

            let actor_ap_id = activity
                .actor_unchecked()
                .as_single_id()
                .ok_or(crate::Error::InternalStrStatic("Missing actor for Accept"))?;

            crate::apub_util::require_containment(activity_id, actor_ap_id)?;

            let actor_ap_id = actor_ap_id.as_str();

            let community_local_id: Option<CommunityLocalID> = {
                db.query_opt("SELECT id FROM community WHERE ap_id=$1", &[&actor_ap_id])
                    .await?
                    .map(|row| CommunityLocalID(row.get(0)))
            };

            if let Some(community_local_id) = community_local_id {
                let object_id = activity
                    .object()
                    .as_single_id()
                    .ok_or(crate::Error::InternalStrStatic("Missing object for Accept"))?;

                if let Some(remaining) =
                    crate::apub_util::try_strip_host(&object_id, &ctx.host_url_apub)
                {
                    if remaining.starts_with("/communities/") {
                        let remaining = &remaining[13..];
                        let next_expected = format!("{}/followers/", community_local_id);
                        if remaining.starts_with(&next_expected) {
                            let remaining = &remaining[next_expected.len()..];
                            let follower_local_id: UserLocalID = remaining.parse()?;

                            db.execute(
                                "UPDATE community_follow SET accepted=TRUE WHERE community=$1 AND follower=$2",
                                &[&community_local_id, &follower_local_id],
                            ).await?;
                        }
                    }
                }
            }
        }
        KnownObject::Announce(activity) => {
            let activity_id = activity
                .id_unchecked()
                .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

            let community_ap_id = activity.actor_unchecked().as_single_id().ok_or(
                crate::Error::InternalStrStatic("Missing actor for Announce"),
            )?;

            let community_local_info = db
                .query_opt(
                    "SELECT id, local FROM community WHERE ap_id=$1",
                    &[&community_ap_id.as_str()],
                )
                .await?
                .map(|row| (CommunityLocalID(row.get(0)), row.get(1)));

            if let Some((community_local_id, community_is_local)) = community_local_info {
                crate::apub_util::require_containment(activity_id, community_ap_id)?;

                let object_id = activity.object().as_single_id();

                if let Some(object_id) = object_id {
                    if let Some(remaining) =
                        crate::apub_util::try_strip_host(&object_id, &ctx.host_url_apub)
                    {
                        if remaining.starts_with("/posts/") {
                            let remaining = &remaining[7..];
                            if let Ok(local_post_id) = remaining.parse::<PostLocalID>() {
                                db.execute(
                                    "UPDATE post SET approved=TRUE, approved_ap_id=$1 WHERE id=$2 AND community=$3",
                                    &[&activity_id.as_str(), &local_post_id, &community_local_id],
                                ).await?;
                            }
                        }
                    } else {
                        // don't need announces for local objects
                        let obj =
                            crate::apub_util::fetch_ap_object(object_id, &ctx.http_client).await?;
                        crate::apub_util::handle_recieved_object_for_community(
                            community_local_id,
                            community_is_local,
                            Some(&activity_id),
                            obj,
                            ctx,
                        )
                        .await?;
                    }
                }
            }
        }
        KnownObject::Create(activity) => inbox_common_create(Verified(activity), ctx).await?,
        KnownObject::Delete(activity) => {
            crate::apub_util::handle_delete(Verified(activity), ctx).await?
        }
        KnownObject::Like(activity) => {
            crate::apub_util::handle_like(Verified(activity), ctx).await?
        }
        KnownObject::Undo(activity) => {
            crate::apub_util::handle_undo(Verified(activity), ctx).await?
        }
        KnownObject::Update(activity) => {
            let activity_id = activity
                .id_unchecked()
                .ok_or(crate::Error::InternalStrStatic("Missing activity ID"))?;

            let object_id =
                activity
                    .object()
                    .as_single_id()
                    .ok_or(crate::Error::InternalStrStatic(
                        "Missing object ID for Update",
                    ))?;

            crate::apub_util::require_containment(activity_id, object_id)?;

            let object_id = object_id.clone();

            crate::spawn_task(async move {
                let row = db
                    .query_opt(
                        "SELECT 1 FROM community WHERE ap_id=$1 LIMIT 1",
                        &[&object_id.as_str()],
                    )
                    .await?;
                if row.is_some() {
                    ctx.enqueue_task(&crate::tasks::FetchActor {
                        actor_ap_id: Cow::Owned(object_id),
                    })
                    .await?;
                }

                Ok(())
            });
        }
        _ => {}
    }
    crate::apub_util::ingest::ingest_object(
        object,
        crate::apub_util::ingest::FoundFrom::Other,
        ctx,
    )
    .await?;

    Ok(crate::simple_response(hyper::StatusCode::ACCEPTED, ""))
}

pub async fn inbox_common_create(
    activity: crate::apub_util::Verified<activitystreams::activity::Create>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
    for req_obj in activity.object().iter() {
        let object_id = req_obj.id();

        if let Some(object_id) = object_id {
            let obj = crate::apub_util::fetch_ap_object(object_id, &ctx.http_client).await?;

            crate::apub_util::handle_recieved_object_for_local_community(obj, ctx.clone()).await?;
        }
    }

    Ok(())
}

async fn handler_users_inbox_post(
    _: (UserLocalID,),
    ctx: Arc<crate::RouteContext>,