~vpzom/lotide

119790a58cdc220999dfe09c9170f4cdc4984070 — Colin Reeder 11 days ago a2c65e9 new-ingest
attempt at fetching posts
4 files changed, 219 insertions(+), 82 deletions(-)

M src/apub_util/ingest.rs
M src/apub_util/mod.rs
M src/main.rs
M src/routes/api/mod.rs
M src/apub_util/ingest.rs => src/apub_util/ingest.rs +125 -81
@@ 24,11 24,25 @@ impl FoundFrom {
    }
}

pub enum IngestResult {
    Actor(super::ActorLocalInfo),
    Other(crate::ThingLocalRef),
}

impl IngestResult {
    pub fn into_ref(self) -> crate::ThingLocalRef {
        match self {
            IngestResult::Actor(info) => info.as_ref(),
            IngestResult::Other(x) => x,
        }
    }
}

pub async fn ingest_object(
    object: Verified<KnownObject>,
    found_from: FoundFrom,
    ctx: Arc<crate::BaseContext>,
) -> Result<Option<super::ActorLocalInfo>, crate::Error> {
) -> Result<Option<IngestResult>, crate::Error> {
    let db = ctx.db_pool.get().await?;
    match object.into_inner() {
        KnownObject::Accept(activity) => {


@@ 226,13 240,15 @@ pub async fn ingest_object(
                &[&name, &ap_id.as_str(), &inbox, &shared_inbox, &public_key, &public_key_sigalg, &description],
            ).await?.get(0));

            Ok(Some(super::ActorLocalInfo::Community {
                id,
                public_key: public_key.map(|key| super::PubKeyInfo {
                    algorithm: super::get_message_digest(public_key_sigalg),
                    key: key.to_owned(),
                }),
            }))
            Ok(Some(IngestResult::Actor(
                super::ActorLocalInfo::Community {
                    id,
                    public_key: public_key.map(|key| super::PubKeyInfo {
                        algorithm: super::get_message_digest(public_key_sigalg),
                        key: key.to_owned(),
                    }),
                },
            )))
        }
        KnownObject::Image(obj) => {
            ingest_postlike(Verified(KnownObject::Image(obj)), found_from, ctx).await


@@ 304,13 320,13 @@ pub async fn ingest_object(
                &[&username, &ap_id.as_str(), &inbox, &shared_inbox, &public_key, &public_key_sigalg, &description, &avatar],
            ).await?.get(0));

            Ok(Some(super::ActorLocalInfo::User {
            Ok(Some(IngestResult::Actor(super::ActorLocalInfo::User {
                id,
                public_key: public_key.map(|key| super::PubKeyInfo {
                    algorithm: super::get_message_digest(public_key_sigalg),
                    key: key.to_owned(),
                }),
            }))
            })))
        }
        KnownObject::Undo(activity) => {
            ingest_undo(Verified(activity), ctx).await?;


@@ 359,9 375,7 @@ pub fn ingest_object_boxed(
    object: Verified<KnownObject>,
    found_from: FoundFrom,
    ctx: Arc<crate::BaseContext>,
) -> std::pin::Pin<
    Box<dyn Future<Output = Result<Option<super::ActorLocalInfo>, crate::Error>> + Send>,
> {
) -> std::pin::Pin<Box<dyn Future<Output = Result<Option<IngestResult>, crate::Error>> + Send>> {
    Box::pin(ingest_object(object, found_from, ctx))
}



@@ 579,7 593,7 @@ async fn ingest_postlike(
    obj: Verified<KnownObject>,
    found_from: FoundFrom,
    ctx: Arc<crate::RouteContext>,
) -> Result<Option<super::ActorLocalInfo>, crate::Error> {
) -> Result<Option<IngestResult>, crate::Error> {
    let (to, in_reply_to, obj_id) = match obj.deref() {
        KnownObject::Page(obj) => (obj.to(), None, obj.id_unchecked()),
        KnownObject::Image(obj) => (obj.to(), None, obj.id_unchecked()),


@@ 611,36 625,33 @@ async fn ingest_postlike(

    if let Some((community_local_id, community_is_local)) = community_found {
        match obj.into_inner() {
            KnownObject::Page(obj) => {
                handle_received_page_for_community(
                    community_local_id,
                    community_is_local,
                    found_from.as_announce(),
                    Verified(obj),
                    ctx,
                )
                .await?
            }
            KnownObject::Image(obj) => {
                handle_received_page_for_community(
                    community_local_id,
                    community_is_local,
                    found_from.as_announce(),
                    Verified(obj),
                    ctx,
                )
                .await?
            }
            KnownObject::Article(obj) => {
                handle_received_page_for_community(
                    community_local_id,
                    community_is_local,
                    found_from.as_announce(),
                    Verified(obj),
                    ctx,
                )
                .await?
            }
            KnownObject::Page(obj) => Ok(handle_received_page_for_community(
                community_local_id,
                community_is_local,
                found_from.as_announce(),
                Verified(obj),
                ctx,
            )
            .await?
            .map(|id| IngestResult::Other(crate::ThingLocalRef::Post(id)))),
            KnownObject::Image(obj) => Ok(handle_received_page_for_community(
                community_local_id,
                community_is_local,
                found_from.as_announce(),
                Verified(obj),
                ctx,
            )
            .await?
            .map(|id| IngestResult::Other(crate::ThingLocalRef::Post(id)))),
            KnownObject::Article(obj) => Ok(handle_received_page_for_community(
                community_local_id,
                community_is_local,
                found_from.as_announce(),
                Verified(obj),
                ctx,
            )
            .await?
            .map(|id| IngestResult::Other(crate::ThingLocalRef::Post(id)))),
            KnownObject::Note(obj) => {
                let content = obj.content().and_then(|x| x.as_single_xsd_string());
                let media_type = obj.media_type();


@@ 675,7 686,7 @@ async fn ingest_postlike(
                    if let Some(in_reply_to) = obj.in_reply_to() {
                        // it's a reply

                        handle_recieved_reply(
                        Ok(handle_recieved_reply(
                            object_id,
                            content.unwrap_or(""),
                            media_type,


@@ 685,7 696,8 @@ async fn ingest_postlike(
                            attachment_href,
                            ctx,
                        )
                        .await?;
                        .await?
                        .map(|id| IngestResult::Other(crate::ThingLocalRef::Comment(id))))
                    } else {
                        // not a reply, must be a top-level post
                        let title = obj


@@ 719,7 731,7 @@ async fn ingest_postlike(
                            .and_then(|href| href.iter().filter_map(|x| x.as_xsd_any_uri()).next())
                            .map(|href| href.as_str());

                        if let Some(object_id) = obj.id_unchecked() {
                        Ok(Some(IngestResult::Other(crate::ThingLocalRef::Post(
                            handle_recieved_post(
                                object_id.clone(),
                                title,


@@ 733,12 745,16 @@ async fn ingest_postlike(
                                found_from.as_announce(),
                                ctx,
                            )
                            .await?;
                        }
                            .await?,
                        ))))
                    }
                } else {
                    Ok(None)
                }
            }
            _ => {}
            _ => Err(crate::Error::InternalStrStatic(
                "ingest_postlike called with an unknown object",
            )),
        }
    } else {
        // not to a community, but might still match as a reply


@@ 780,7 796,7 @@ async fn ingest_postlike(
                        .and_then(|href| href.iter().filter_map(|x| x.as_xsd_any_uri()).next())
                        .map(|href| href.as_str());

                    handle_recieved_reply(
                    let id = handle_recieved_reply(
                        obj_id,
                        content.unwrap_or(""),
                        media_type,


@@ 791,12 807,18 @@ async fn ingest_postlike(
                        ctx,
                    )
                    .await?;

                    Ok(id.map(|id| IngestResult::Other(crate::ThingLocalRef::Comment(id))))
                } else {
                    Ok(None)
                }
            } else {
                Ok(None)
            }
        } else {
            Ok(None)
        }
    }

    Ok(None)
}

async fn handle_recieved_reply(


@@ 808,7 830,7 @@ async fn handle_recieved_reply(
    in_reply_to: &activitystreams::primitives::OneOrMany<activitystreams::base::AnyBase>,
    attachment_href: Option<&str>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
) -> Result<Option<CommentLocalID>, crate::Error> {
    let db = ctx.db_pool.get().await?;

    let author = match author {


@@ 893,8 915,9 @@ async fn handle_recieved_reply(
                    ).await?;

                if let Some(row) = row {
                    let id = CommentLocalID(row.get(0));
                    let info = crate::CommentInfo {
                        id: CommentLocalID(row.get(0)),
                        id,
                        author,
                        post,
                        parent,


@@ 910,12 933,32 @@ async fn handle_recieved_reply(
                    };

                    crate::on_post_add_comment(info, ctx);

                    Ok(Some(id))
                } else {
                    // not new, try to fetch id
                    // will probably be unnecessary when we implement comment editing

                    let row = db
                        .query_opt(
                            "SELECT id FROM reply WHERE ap_id=$1",
                            &[&object_id.as_str()],
                        )
                        .await?;
                    Ok(match row {
                        None => None,
                        Some(row) => Some(CommentLocalID(row.get(0))),
                    })
                }
            } else {
                Ok(None)
            }
        } else {
            Ok(None)
        }
    } else {
        Ok(None)
    }

    Ok(())
}

async fn handle_received_page_for_community<Kind: Clone + std::fmt::Debug>(


@@ 924,7 967,7 @@ async fn handle_received_page_for_community<Kind: Clone + std::fmt::Debug>(
    is_announce: Option<&url::Url>,
    obj: Verified<activitystreams::object::Object<Kind>>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
) -> Result<Option<PostLocalID>, crate::Error> {
    let title = obj
        .summary()
        .iter()


@@ 955,23 998,25 @@ async fn handle_received_page_for_community<Kind: Clone + std::fmt::Debug>(
            super::require_containment(&object_id, author)?;
        }

        handle_recieved_post(
            object_id.clone(),
            title,
            href,
            content,
            media_type,
            created.as_ref(),
            author,
            community_local_id,
            community_is_local,
            is_announce,
            ctx,
        )
        .await?;
        Ok(Some(
            handle_recieved_post(
                object_id.clone(),
                title,
                href,
                content,
                media_type,
                created.as_ref(),
                author,
                community_local_id,
                community_is_local,
                is_announce,
                ctx,
            )
            .await?,
        ))
    } else {
        Ok(None)
    }

    Ok(())
}

async fn handle_recieved_post(


@@ 986,7 1031,7 @@ async fn handle_recieved_post(
    community_is_local: bool,
    is_announce: Option<&url::Url>,
    ctx: Arc<crate::RouteContext>,
) -> Result<(), crate::Error> {
) -> Result<PostLocalID, crate::Error> {
    let db = ctx.db_pool.get().await?;
    let author = match author {
        Some(author) => Some(super::get_or_fetch_user_local_id(&author, &db, &ctx).await?),


@@ 1002,17 1047,16 @@ async fn handle_recieved_post(

    let approved = is_announce.is_some() || community_is_local;

    let row = db.query_opt(
    let row = db.query_one(
        "INSERT INTO post (author, href, content_text, content_html, title, created, community, local, ap_id, approved, approved_ap_id) VALUES ($1, $2, $3, $4, $5, COALESCE($6, current_timestamp), $7, FALSE, $8, $9, $10) ON CONFLICT (ap_id) DO UPDATE SET approved=$9, approved_ap_id=$10 RETURNING id",
        &[&author, &href, &content_text, &content_html, &title, &created, &community_local_id, &object_id.as_str(), &approved, &is_announce.map(|x| x.as_str())],
    ).await?;

    let post_local_id = PostLocalID(row.get(0));

    if community_is_local {
        if let Some(row) = row {
            let post_local_id = PostLocalID(row.get(0));
            crate::on_local_community_add_post(community_local_id, post_local_id, object_id, ctx);
        }
        crate::on_local_community_add_post(community_local_id, post_local_id, object_id, ctx);
    }

    Ok(())
    Ok(post_local_id)
}

M src/apub_util/mod.rs => src/apub_util/mod.rs +8 -1
@@ 297,6 297,13 @@ impl ActorLocalInfo {
            ActorLocalInfo::Community { public_key, .. } => public_key.as_ref(),
        }
    }

    pub fn as_ref(&self) -> crate::ThingLocalRef {
        match self {
            ActorLocalInfo::User { id, .. } => crate::ThingLocalRef::User(*id),
            ActorLocalInfo::Community { id, .. } => crate::ThingLocalRef::Community(*id),
        }
    }
}

#[derive(Clone, Debug, thiserror::Error)]


@@ 365,7 372,7 @@ pub async fn fetch_actor(
) -> Result<ActorLocalInfo, crate::Error> {
    let obj = fetch_ap_object(req_ap_id, &ctx.http_client).await?;
    match ingest::ingest_object_boxed(obj, ingest::FoundFrom::Other, ctx).await? {
        Some(info) => Ok(info),
        Some(ingest::IngestResult::Actor(info)) => Ok(info),
        _ => Err(crate::Error::InternalStrStatic("Unrecognized actor type")),
    }
}

M src/main.rs => src/main.rs +4 -0
@@ 336,9 336,13 @@ pub enum ActorLocalRef {
    Community(CommunityLocalID),
}

#[derive(Clone, Copy, Debug, Serialize)]
#[serde(tag = "type")]
pub enum ThingLocalRef {
    Post(PostLocalID),
    Comment(CommentLocalID),
    User(UserLocalID),
    Community(CommunityLocalID),
}

#[derive(Debug)]

M src/routes/api/mod.rs => src/routes/api/mod.rs +82 -0
@@ 188,6 188,13 @@ pub fn route_api() -> crate::RouteNode<()> {
                    crate::RouteNode::new()
                        .with_handler_async("GET", route_unstable_nodeinfo_20_get),
                )
                .with_child(
                    "objects:lookup",
                    crate::RouteNode::new().with_child_str(
                        crate::RouteNode::new()
                            .with_handler_async("GET", route_unstable_objects_lookup),
                    ),
                )
                .with_child("communities", communities::route_communities())
                .with_child(
                    "instance",


@@ 554,6 561,81 @@ async fn route_unstable_instance_patch(
    }
}

async fn route_unstable_objects_lookup(
    params: (String,),
    ctx: Arc<crate::RouteContext>,
    _req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, crate::Error> {
    let (query,) = params;
    println!("lookup {}", query);

    let lookup = parse_lookup(&query)?;

    let uri = match lookup {
        Lookup::URL(uri) => Some(uri),
        Lookup::WebFinger { user, host } => {
            let uri = format!(
                "https://{}/.well-known/webfinger?{}",
                host,
                serde_urlencoded::to_string(FingerRequestQuery {
                    resource: format!("acct:{}@{}", user, host).into(),
                    rel: Some("self".into()),
                })?
            );
            println!("{}", uri);
            let res = ctx
                .http_client
                .request(hyper::Request::get(uri).body(Default::default())?)
                .await?;

            if res.status() == hyper::StatusCode::NOT_FOUND {
                println!("not found");
                None
            } else {
                let res = crate::res_to_error(res).await?;

                let res = hyper::body::to_bytes(res.into_body()).await?;
                let res: FingerResponse = serde_json::from_slice(&res)?;

                let mut found_uri = None;
                for entry in res.links {
                    if entry.rel == "self"
                        && entry.type_.as_deref() == Some(crate::apub_util::ACTIVITY_TYPE)
                    {
                        if let Some(href) = entry.href {
                            found_uri = Some(href.parse()?);
                            break;
                        }
                    }
                }

                found_uri
            }
        }
    };

    let res = match &uri {
        Some(uri) => {
            let obj = crate::apub_util::fetch_ap_object(&uri, &ctx.http_client).await?;

            crate::apub_util::ingest::ingest_object(
                obj,
                crate::apub_util::ingest::FoundFrom::Other,
                ctx,
            )
            .await?
        }
        None => None,
    };

    match res {
        None => Ok(crate::common_response_builder()
            .header(hyper::header::CONTENT_TYPE, "application/json")
            .body("[]".into())?),
        Some(res) => crate::json_response(&[res.into_ref()]),
    }
}

async fn apply_comments_replies<'a, T>(
    comments: &mut Vec<(T, RespPostCommentInfo<'a>)>,
    include_your_for: Option<UserLocalID>,