~vpzom/lotide

dffdf0a6bcdd8e0de94cfefa08478ace1622bf05 — Colin Reeder 18 days ago cbaa0b1
Make federation of sticky posts actually work
5 files changed, 100 insertions(+), 12 deletions(-)

M src/apub_util/ingest.rs
M src/apub_util/mod.rs
M src/routes/apub/communities.rs
M src/tasks.rs
M src/worker.rs
M src/apub_util/ingest.rs => src/apub_util/ingest.rs +4 -0
@@ 239,6 239,10 @@ pub async fn ingest_object(
                &[&name, &ap_id.as_str(), &inbox, &shared_inbox, &public_key, &public_key_sigalg, &description_html],
            ).await?.get(0));

            if let Some(featured_url) = group.ext_two.featured {
                crate::apub_util::spawn_enqueue_fetch_community_featured(id, featured_url, ctx);
            }

            Ok(Some(IngestResult::Actor(
                super::ActorLocalInfo::Community {
                    id,

M src/apub_util/mod.rs => src/apub_util/mod.rs +29 -1
@@ 65,9 65,10 @@ pub enum KnownObject {
        >,
    ),
    Group(
        activitystreams_ext::Ext1<
        activitystreams_ext::Ext2<
            activitystreams::actor::ApActor<activitystreams::actor::Group>,
            PublicKeyExtension<'static>,
            FeaturedExtension,
        >,
    ),
    Article(activitystreams::object::Article),


@@ 102,6 103,19 @@ pub struct PublicKeyExtension<'a> {
    pub public_key: Option<PublicKey<'a>>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FeaturedExtension {
    #[serde(skip_serializing_if = "Option::is_none")]
    pub featured: Option<url::Url>,
}

#[derive(Deserialize)]
#[serde(untagged)]
pub enum AnyCollection {
    Unordered(activitystreams::collection::UnorderedCollection),
    Ordered(activitystreams::collection::OrderedCollection),
}

pub fn try_strip_host<'a>(url: &'a impl AsRef<str>, host_url: &url::Url) -> Option<&'a str> {
    let host_url = host_url.as_str();
    let host_url = host_url.trim_end_matches('/');


@@ 510,6 524,20 @@ pub async fn fetch_or_create_local_actor_privkey(
    })
}

pub fn spawn_enqueue_fetch_community_featured(
    community: CommunityLocalID,
    featured_url: url::Url,
    ctx: Arc<crate::RouteContext>,
) {
    crate::spawn_task(async move {
        ctx.enqueue_task(&crate::tasks::FetchCommunityFeatured {
            community_id: community,
            featured_url,
        })
        .await
    });
}

pub fn spawn_enqueue_send_new_community_update(
    community: CommunityLocalID,
    ctx: Arc<crate::RouteContext>,

M src/routes/apub/communities.rs => src/routes/apub/communities.rs +6 -11
@@ 1,6 1,5 @@
use crate::{CommentLocalID, CommunityLocalID, PostLocalID, UserLocalID};
use activitystreams::prelude::*;
use serde_derive::{Deserialize, Serialize};
use std::ops::Deref;
use std::sync::Arc;



@@ 14,12 13,6 @@ lazy_static::lazy_static! {
    })).unwrap();
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FeaturedExtension {
    #[serde(skip_serializing_if = "Option::is_none")]
    featured: Option<url::Url>,
}

pub fn route_communities() -> crate::RouteNode<()> {
    crate::RouteNode::new().with_child_parse::<CommunityLocalID, _>(
        crate::RouteNode::new()


@@ 178,7 171,7 @@ async fn handler_communities_get(
            })
            .set_preferred_username(name);

            let featured_ext = FeaturedExtension {
            let featured_ext = crate::apub_util::FeaturedExtension {
                featured: Some(crate::apub_util::get_local_community_featured_apub_id(community_id, &ctx.host_url_apub).into()),
            };



@@ 295,9 288,11 @@ async fn handler_communities_featured_list(
        .collect();
    let items = items?;

    let mut body = activitystreams::collection::Collection::<
        activitystreams::collection::kind::CollectionType,
    >::new();
    let mut body = activitystreams::collection::UnorderedCollection::new();
    body.set_id(
        crate::apub_util::get_local_community_featured_apub_id(community_id, &ctx.host_url_apub)
            .into(),
    );
    body.set_context(activitystreams::context());
    body.set_total_items(items.len() as u64);
    body.set_many_items(items);

M src/tasks.rs => src/tasks.rs +57 -0
@@ 1,3 1,5 @@
use crate::{CommunityLocalID, PostLocalID};

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;


@@ 131,3 133,58 @@ impl<'a> TaskDef for FetchActor<'a> {
        Ok(())
    }
}

#[derive(Deserialize, Serialize, Debug)]
pub struct FetchCommunityFeatured {
    pub community_id: CommunityLocalID,
    pub featured_url: url::Url,
}

#[async_trait]
impl TaskDef for FetchCommunityFeatured {
    const KIND: &'static str = "fetch_community_featured";

    async fn perform(self, ctx: Arc<crate::BaseContext>) -> Result<(), crate::Error> {
        use activitystreams::prelude::*;

        let obj =
            crate::apub_util::fetch_ap_object_raw(&self.featured_url, &ctx.http_client).await?;
        let obj: crate::apub_util::AnyCollection = serde_json::from_value(obj)?;

        let items = match &obj {
            crate::apub_util::AnyCollection::Unordered(obj) => obj.items(),
            crate::apub_util::AnyCollection::Ordered(obj) => obj.ordered_items(),
        };

        let (local_items, remote_items) = match items {
            None => (Vec::new(), Vec::new()),
            Some(items) => items
                .iter()
                .map(|item| item.as_xsd_any_uri())
                .filter_map(|x| x)
                .map(|x| x.as_str())
                .partition(|x| x.starts_with(ctx.host_url_apub.as_str())),
        };

        let local_items: Vec<PostLocalID> = local_items
            .into_iter()
            .filter_map(|ap_id| {
                let rest = crate::apub_util::try_strip_host(&ap_id, &ctx.host_url_apub).unwrap();
                if let Some(rest) = rest.strip_prefix("/posts/") {
                    rest.parse().ok()
                } else {
                    None
                }
            })
            .collect();

        let db = ctx.db_pool.get().await?;

        db.execute(
            "UPDATE post SET sticky=COALESCE((ap_id = ANY($1)) OR (id = ANY($2)), FALSE) WHERE community=$3",
            &[&remote_items, &local_items, &self.community_id],
        ).await?;

        Ok(())
    }
}

M src/worker.rs => src/worker.rs +4 -0
@@ 79,6 79,10 @@ async fn perform_task(
            let def: crate::tasks::FetchActor = serde_json::from_value(params)?;
            def.perform(ctx).await?;
        }
        crate::tasks::FetchCommunityFeatured::KIND => {
            let def: crate::tasks::FetchCommunityFeatured = serde_json::from_value(params)?;
            def.perform(ctx).await?;
        }
        _ => {
            return Err(crate::Error::InternalStr(format!(
                "Unrecognized task type: {}",