~vpzom/lotide

82e930ca47f6cedf1f6e14cc459de0142c85ecd5 — Colin Reeder 18 days ago 9c4466b + 6f96d41 master
Merge branch 'sticky'
A migrations/20210709224628_sticky/down.sql => migrations/20210709224628_sticky/down.sql +3 -0
@@ 0,0 1,3 @@
BEGIN;
	ALTER TABLE post DROP COLUMN sticky;
COMMIT;

A migrations/20210709224628_sticky/up.sql => migrations/20210709224628_sticky/up.sql +3 -0
@@ 0,0 1,3 @@
BEGIN;
	ALTER TABLE post ADD COLUMN sticky BOOLEAN NOT NULL DEFAULT (FALSE);
COMMIT;

M openapi/openapi.json => openapi/openapi.json +11 -2
@@ 122,7 122,7 @@
			},
			"SomePostInfo": {
				"type": "object",
				"required": ["id", "title", "created", "community", "score"],
				"required": ["id", "title", "created", "community", "score", "sticky"],
				"properties": {
					"id": {"type": "integer"},
					"title": {"type": "string"},


@@ 151,6 151,7 @@
						}
					},
					"score": {"type": "integer"},
					"sticky": {"type": "boolean"},
					"your_vote": {"$ref": "#/components/schemas/YourVote"}
				}
			},


@@ 822,6 823,13 @@
						"schema": {"$ref": "#/components/schemas/SortType"}
					},
					{
						"name": "sort_sticky",
						"in": "query",
						"required": false,
						"schema": {"type": "boolean"},
						"description": "If true, will sort sticky posts before everything else"
					},
					{
						"name": "include_your",
						"in": "query",
						"required": false,


@@ 867,7 875,8 @@
							"schema": {
								"type": "object",
								"properties": {
									"approved": {"type": "boolean"}
									"approved": {"type": "boolean"},
									"sticky": {"type": "boolean"}
								}
							}
						}

M src/apub_util/ingest.rs => src/apub_util/ingest.rs +4 -0
@@ 239,6 239,10 @@ pub async fn ingest_object(
                &[&name, &ap_id.as_str(), &inbox, &shared_inbox, &public_key, &public_key_sigalg, &description_html],
            ).await?.get(0));

            if let Some(featured_url) = group.ext_two.featured {
                crate::apub_util::spawn_enqueue_fetch_community_featured(id, featured_url, ctx);
            }

            Ok(Some(IngestResult::Actor(
                super::ActorLocalInfo::Community {
                    id,

M src/apub_util/mod.rs => src/apub_util/mod.rs +38 -1
@@ 65,9 65,10 @@ pub enum KnownObject {
        >,
    ),
    Group(
        activitystreams_ext::Ext1<
        activitystreams_ext::Ext2<
            activitystreams::actor::ApActor<activitystreams::actor::Group>,
            PublicKeyExtension<'static>,
            FeaturedExtension,
        >,
    ),
    Article(activitystreams::object::Article),


@@ 102,6 103,19 @@ pub struct PublicKeyExtension<'a> {
    pub public_key: Option<PublicKey<'a>>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FeaturedExtension {
    #[serde(skip_serializing_if = "Option::is_none")]
    pub featured: Option<url::Url>,
}

#[derive(Deserialize)]
#[serde(untagged)]
pub enum AnyCollection {
    Unordered(activitystreams::collection::UnorderedCollection),
    Ordered(activitystreams::collection::OrderedCollection),
}

pub fn try_strip_host<'a>(url: &'a impl AsRef<str>, host_url: &url::Url) -> Option<&'a str> {
    let host_url = host_url.as_str();
    let host_url = host_url.trim_end_matches('/');


@@ 190,6 204,15 @@ pub fn get_local_community_apub_id(
    res
}

pub fn get_local_community_featured_apub_id(
    community: CommunityLocalID,
    host_url_apub: &BaseURL,
) -> BaseURL {
    let mut res = get_local_community_apub_id(community, host_url_apub);
    res.path_segments_mut().push("featured");
    res
}

pub fn get_local_community_outbox_apub_id(
    community: CommunityLocalID,
    host_url_apub: &BaseURL,


@@ 501,6 524,20 @@ pub async fn fetch_or_create_local_actor_privkey(
    })
}

pub fn spawn_enqueue_fetch_community_featured(
    community: CommunityLocalID,
    featured_url: url::Url,
    ctx: Arc<crate::RouteContext>,
) {
    crate::spawn_task(async move {
        ctx.enqueue_task(&crate::tasks::FetchCommunityFeatured {
            community_id: community,
            featured_url,
        })
        .await
    });
}

pub fn spawn_enqueue_send_new_community_update(
    community: CommunityLocalID,
    ctx: Arc<crate::RouteContext>,

M src/routes/api/communities.rs => src/routes/api/communities.rs +68 -24
@@ 5,6 5,7 @@ use crate::routes::api::{
use crate::{CommunityLocalID, PostLocalID, UserLocalID};
use serde_derive::{Deserialize, Serialize};
use std::borrow::Cow;
use std::ops::Deref;
use std::sync::Arc;

#[derive(Serialize)]


@@ 693,6 694,8 @@ async fn route_unstable_communities_posts_list(
        sort: super::SortType,
        #[serde(default)]
        include_your: bool,
        #[serde(default)]
        sort_sticky: bool,
    }

    let query: Query = serde_urlencoded::from_str(req.uri().query().unwrap_or(""))?;


@@ 747,13 750,18 @@ async fn route_unstable_communities_posts_list(

    let mut values: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = vec![&community_id, &limit];
    let sql: &str = &format!(
        "SELECT post.id, post.author, post.href, post.content_text, post.title, post.created, post.content_html, person.username, person.local, person.ap_id, person.avatar, (SELECT COUNT(*) FROM post_like WHERE post_like.post = post.id), (SELECT COUNT(*) FROM reply WHERE reply.post = post.id){} FROM post LEFT OUTER JOIN person ON (person.id = post.author) WHERE post.community = $1 AND post.approved=TRUE AND post.deleted=FALSE ORDER BY {} LIMIT $2",
        "SELECT post.id, post.author, post.href, post.content_text, post.title, post.created, post.content_html, person.username, person.local, person.ap_id, person.avatar, (SELECT COUNT(*) FROM post_like WHERE post_like.post = post.id), (SELECT COUNT(*) FROM reply WHERE reply.post = post.id), post.sticky{} FROM post LEFT OUTER JOIN person ON (person.id = post.author) WHERE post.community = $1 AND post.approved=TRUE AND post.deleted=FALSE ORDER BY {}{} LIMIT $2",
        if let Some(user) = &include_your_for {
            values.push(user);
            ", EXISTS(SELECT 1 FROM post_like WHERE post=post.id AND person=$3)"
        } else {
            ""
        },
        if query.sort_sticky {
            "sticky DESC, "
        } else {
            ""
        },
        query.sort.post_sort_sql(),
    );



@@ 805,8 813,9 @@ async fn route_unstable_communities_posts_list(
                community: &community,
                replies_count_total: Some(row.get(12)),
                score: row.get(11),
                sticky: row.get(13),
                your_vote: if include_your_for.is_some() {
                    Some(if row.get(13) {
                    Some(if row.get(14) {
                        Some(crate::Empty {})
                    } else {
                        None


@@ 829,6 838,8 @@ async fn route_unstable_communities_posts_patch(
    ctx: Arc<crate::RouteContext>,
    req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, crate::Error> {
    use std::fmt::Write;

    let (community_id, post_id) = params;

    let lang = crate::get_lang_for_req(&req);


@@ 839,6 850,7 @@ async fn route_unstable_communities_posts_patch(
    #[derive(Deserialize)]
    struct CommunityPostEditBody {
        approved: Option<bool>,
        sticky: Option<bool>,
    }

    let body = hyper::body::to_bytes(req.into_body()).await?;


@@ 862,7 874,7 @@ async fn route_unstable_communities_posts_patch(

    let old_row = db
        .query_opt(
            "SELECT community, approved, local, ap_id FROM post WHERE id=$1",
            "SELECT community, approved, local, ap_id, sticky FROM post WHERE id=$1",
            &[&post_id],
        )
        .await?


@@ 881,6 893,7 @@ async fn route_unstable_communities_posts_patch(
    }

    let old_approved: bool = old_row.get(1);
    let old_sticky: bool = old_row.get(4);

    let post_ap_id = if old_row.get(2) {
        crate::apub_util::get_local_post_apub_id(post_id, &ctx.host_url_apub).into()


@@ 888,28 901,59 @@ async fn route_unstable_communities_posts_patch(
        std::str::FromStr::from_str(old_row.get(3))?
    };

    if let Some(approved) = body.approved {
        db.execute(
            "UPDATE post SET approved=$1 WHERE id=$2",
            &[&approved, &post_id],
        )
        .await?;
    let mut sql = "UPDATE post SET ".to_owned();
    let mut values: Vec<&(dyn postgres_types::ToSql + Sync)> = vec![&post_id];
    let mut any_changes = false;

        if approved != old_approved {
            if approved {
                crate::apub_util::spawn_announce_community_post(
                    community_id,
                    post_id,
                    post_ap_id,
                    ctx,
                );
            } else {
                crate::apub_util::spawn_enqueue_send_community_post_announce_undo(
                    community_id,
                    post_id,
                    post_ap_id,
                    ctx,
                );
    if let Some(approved) = &body.approved {
        if !any_changes {
            any_changes = true;
        } else {
            sql.push(',');
        }
        values.push(approved);
        write!(sql, "approved=${}", values.len()).unwrap();
    }
    if let Some(sticky) = &body.sticky {
        if !any_changes {
            any_changes = true;
        } else {
            sql.push(',');
        }
        values.push(sticky);
        write!(sql, "sticky=${}", values.len()).unwrap();
    }

    if any_changes {
        sql.push_str(" WHERE id=$1");

        log::debug!("sql = {}", sql);

        db.execute(sql.deref(), &values).await?;

        if let Some(approved) = body.approved {
            if approved != old_approved {
                if approved {
                    crate::apub_util::spawn_announce_community_post(
                        community_id,
                        post_id,
                        post_ap_id,
                        ctx.clone(),
                    );
                } else {
                    crate::apub_util::spawn_enqueue_send_community_post_announce_undo(
                        community_id,
                        post_id,
                        post_ap_id,
                        ctx.clone(),
                    );
                }
            }
        }

        if let Some(sticky) = body.sticky {
            if sticky != old_sticky {
                crate::apub_util::spawn_enqueue_send_new_community_update(community_id, ctx);
            }
        }
    }

M src/routes/api/mod.rs => src/routes/api/mod.rs +4 -2
@@ 245,6 245,7 @@ struct RespPostListPost<'a> {
    #[serde(skip_serializing_if = "Option::is_none")]
    replies_count_total: Option<i64>,
    score: i64,
    sticky: bool,
    #[serde(skip_serializing_if = "Option::is_none")]
    your_vote: Option<Option<crate::Empty>>,
}


@@ 1024,7 1025,7 @@ async fn route_unstable_misc_render_markdown(
}

fn common_posts_list_query(include_your_idx: Option<usize>) -> Cow<'static, str> {
    const BASE: &str = "post.id, post.author, post.href, post.content_text, post.title, post.created, post.content_html, community.id, community.name, community.local, community.ap_id, person.username, person.local, person.ap_id, person.avatar, (SELECT COUNT(*) FROM post_like WHERE post_like.post = post.id), (SELECT COUNT(*) FROM reply WHERE reply.post = post.id)";
    const BASE: &str = "post.id, post.author, post.href, post.content_text, post.title, post.created, post.content_html, community.id, community.name, community.local, community.ap_id, person.username, person.local, person.ap_id, person.avatar, (SELECT COUNT(*) FROM post_like WHERE post_like.post = post.id), (SELECT COUNT(*) FROM reply WHERE reply.post = post.id), post.sticky";
    match include_your_idx {
        None => BASE.into(),
        Some(idx) => format!(


@@ 1101,9 1102,10 @@ async fn handle_common_posts_list(
                created: &created.to_rfc3339(),
                community: &community,
                score: row.get(15),
                sticky: row.get(17),
                replies_count_total: Some(row.get(16)),
                your_vote: if include_your {
                    Some(if row.get(17) {
                    Some(if row.get(18) {
                        Some(crate::Empty {})
                    } else {
                        None

M src/routes/api/posts.rs => src/routes/api/posts.rs +2 -1
@@ 452,7 452,7 @@ async fn route_unstable_posts_get(

    let (row, your_vote) = futures::future::try_join(
        db.query_opt(
            "SELECT post.author, post.href, post.content_text, post.title, post.created, post.content_html, community.id, community.name, community.local, community.ap_id, person.username, person.local, person.ap_id, (SELECT COUNT(*) FROM post_like WHERE post_like.post = $1), post.approved, person.avatar, post.local FROM community, post LEFT OUTER JOIN person ON (person.id = post.author) WHERE post.community = community.id AND post.id = $1",
            "SELECT post.author, post.href, post.content_text, post.title, post.created, post.content_html, community.id, community.name, community.local, community.ap_id, person.username, person.local, person.ap_id, (SELECT COUNT(*) FROM post_like WHERE post_like.post = $1), post.approved, person.avatar, post.local, post.sticky FROM community, post LEFT OUTER JOIN person ON (person.id = post.author) WHERE post.community = community.id AND post.id = $1",
            &[&post_id],
        )
        .map_err(crate::Error::from),


@@ 533,6 533,7 @@ async fn route_unstable_posts_get(
                community: &community,
                replies_count_total: None,
                score: row.get(13),
                sticky: row.get(17),
                your_vote,
            };


M src/routes/apub/communities.rs => src/routes/apub/communities.rs +71 -0
@@ 3,6 3,16 @@ use activitystreams::prelude::*;
use std::ops::Deref;
use std::sync::Arc;

lazy_static::lazy_static! {
    static ref FEATURED_CONTEXT: activitystreams::base::AnyBase = activitystreams::base::AnyBase::from_arbitrary_json(serde_json::json!({
        "toot": "http://joinmastodon.org/ns#",
        "featured": {
            "@id": "toot:featured",
            "@type": "@id"
        }
    })).unwrap();
}

pub fn route_communities() -> crate::RouteNode<()> {
    crate::RouteNode::new().with_child_parse::<CommunityLocalID, _>(
        crate::RouteNode::new()


@@ 18,6 28,11 @@ pub fn route_communities() -> crate::RouteNode<()> {
                ),
            )
            .with_child(
                "featured",
                crate::RouteNode::new()
                    .with_handler_async("GET", handler_communities_featured_list),
            )
            .with_child(
                "followers",
                crate::RouteNode::new()
                    .with_handler_async("GET", handler_communities_followers_list)


@@ 129,6 144,7 @@ async fn handler_communities_get(
                activitystreams::context(),
                activitystreams::security(),
            ])
            .add_context(FEATURED_CONTEXT.clone())
            .set_id(community_ap_id.deref().clone())
            .set_name(name.as_ref())
            .set_summary(description);


@@ 155,6 171,12 @@ async fn handler_communities_get(
            })
            .set_preferred_username(name);

            let featured_ext = crate::apub_util::FeaturedExtension {
                featured: Some(crate::apub_util::get_local_community_featured_apub_id(community_id, &ctx.host_url_apub).into()),
            };

            let info = activitystreams_ext::Ext1::new(info, featured_ext);

            let key_id = format!(
                "{}/communities/{}#main-key",
                ctx.host_url_apub, community_id


@@ 233,6 255,55 @@ async fn handler_communities_comments_announce_get(
    }
}

async fn handler_communities_featured_list(
    params: (CommunityLocalID,),
    ctx: Arc<crate::RouteContext>,
    _req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, crate::Error> {
    let (community_id,) = params;
    let db = ctx.db_pool.get().await?;

    let rows = db
        .query(
            "SELECT id, local, ap_id FROM post WHERE community=$1 AND sticky",
            &[&community_id],
        )
        .await?;

    let items: Result<Vec<_>, _> = rows
        .into_iter()
        .map(|row| {
            use std::str::FromStr;

            if row.get(1) {
                Ok(crate::apub_util::get_local_post_apub_id(
                    PostLocalID(row.get(0)),
                    &ctx.host_url_apub,
                )
                .into())
            } else {
                url::Url::from_str(row.get(2))
            }
        })
        .collect();
    let items = items?;

    let mut body = activitystreams::collection::UnorderedCollection::new();
    body.set_id(
        crate::apub_util::get_local_community_featured_apub_id(community_id, &ctx.host_url_apub)
            .into(),
    );
    body.set_context(activitystreams::context());
    body.set_total_items(items.len() as u64);
    body.set_many_items(items);

    let body = serde_json::to_vec(&body)?;

    Ok(hyper::Response::builder()
        .header(hyper::header::CONTENT_TYPE, crate::apub_util::ACTIVITY_TYPE)
        .body(body.into())?)
}

async fn handler_communities_followers_list(
    params: (CommunityLocalID,),
    ctx: Arc<crate::RouteContext>,

M src/tasks.rs => src/tasks.rs +57 -0
@@ 1,3 1,5 @@
use crate::{CommunityLocalID, PostLocalID};

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;


@@ 131,3 133,58 @@ impl<'a> TaskDef for FetchActor<'a> {
        Ok(())
    }
}

#[derive(Deserialize, Serialize, Debug)]
pub struct FetchCommunityFeatured {
    pub community_id: CommunityLocalID,
    pub featured_url: url::Url,
}

#[async_trait]
impl TaskDef for FetchCommunityFeatured {
    const KIND: &'static str = "fetch_community_featured";

    async fn perform(self, ctx: Arc<crate::BaseContext>) -> Result<(), crate::Error> {
        use activitystreams::prelude::*;

        let obj =
            crate::apub_util::fetch_ap_object_raw(&self.featured_url, &ctx.http_client).await?;
        let obj: crate::apub_util::AnyCollection = serde_json::from_value(obj)?;

        let items = match &obj {
            crate::apub_util::AnyCollection::Unordered(obj) => obj.items(),
            crate::apub_util::AnyCollection::Ordered(obj) => obj.ordered_items(),
        };

        let (local_items, remote_items) = match items {
            None => (Vec::new(), Vec::new()),
            Some(items) => items
                .iter()
                .map(|item| item.as_xsd_any_uri())
                .filter_map(|x| x)
                .map(|x| x.as_str())
                .partition(|x| x.starts_with(ctx.host_url_apub.as_str())),
        };

        let local_items: Vec<PostLocalID> = local_items
            .into_iter()
            .filter_map(|ap_id| {
                let rest = crate::apub_util::try_strip_host(&ap_id, &ctx.host_url_apub).unwrap();
                if let Some(rest) = rest.strip_prefix("/posts/") {
                    rest.parse().ok()
                } else {
                    None
                }
            })
            .collect();

        let db = ctx.db_pool.get().await?;

        db.execute(
            "UPDATE post SET sticky=COALESCE((ap_id = ANY($1)) OR (id = ANY($2)), FALSE) WHERE community=$3",
            &[&remote_items, &local_items, &self.community_id],
        ).await?;

        Ok(())
    }
}

M src/worker.rs => src/worker.rs +4 -0
@@ 79,6 79,10 @@ async fn perform_task(
            let def: crate::tasks::FetchActor = serde_json::from_value(params)?;
            def.perform(ctx).await?;
        }
        crate::tasks::FetchCommunityFeatured::KIND => {
            let def: crate::tasks::FetchCommunityFeatured = serde_json::from_value(params)?;
            def.perform(ctx).await?;
        }
        _ => {
            return Err(crate::Error::InternalStr(format!(
                "Unrecognized task type: {}",