~vpzom/lotide

a2c65e98e69a5f0e22dc83ff62399de6850cb94b — Colin Reeder 19 days ago 50b3d31
Use new ingestion for fetch_actor, merge BaseContext and RouteContext
M src/apub_util/ingest.rs => src/apub_util/ingest.rs +8 -21
@@ 27,7 27,7 @@ impl FoundFrom {
pub async fn ingest_object(
    object: Verified<KnownObject>,
    found_from: FoundFrom,
    ctx: Arc<crate::RouteContext>,
    ctx: Arc<crate::BaseContext>,
) -> Result<Option<super::ActorLocalInfo>, crate::Error> {
    let db = ctx.db_pool.get().await?;
    match object.into_inner() {


@@ 157,13 157,8 @@ pub async fn ingest_object(
                crate::apub_util::require_containment(activity_ap_id, follower_ap_id)?;
                let follow = crate::apub_util::Contained(Cow::Borrowed(&follow));

                let follower_local_id = crate::apub_util::get_or_fetch_user_local_id(
                    follower_ap_id,
                    &db,
                    &ctx.host_url_apub,
                    &ctx.http_client,
                )
                .await?;
                let follower_local_id =
                    crate::apub_util::get_or_fetch_user_local_id(follower_ap_id, &db, &ctx).await?;

                if let Some(target) = target {
                    if let Some(community_id) =


@@ 363,7 358,7 @@ pub async fn ingest_object(
pub fn ingest_object_boxed(
    object: Verified<KnownObject>,
    found_from: FoundFrom,
    ctx: Arc<crate::RouteContext>,
    ctx: Arc<crate::BaseContext>,
) -> std::pin::Pin<
    Box<dyn Future<Output = Result<Option<super::ActorLocalInfo>, crate::Error>> + Send>,
> {


@@ 383,9 378,7 @@ pub async fn ingest_like(
    if let Some(actor_id) = activity.actor_unchecked().as_single_id() {
        super::require_containment(activity_id, actor_id)?;

        let actor_local_id =
            super::get_or_fetch_user_local_id(actor_id, &db, &ctx.host_url_apub, &ctx.http_client)
                .await?;
        let actor_local_id = super::get_or_fetch_user_local_id(actor_id, &db, &ctx).await?;

        if let Some(object_id) = activity.object().as_single_id() {
            let thing_local_ref = if let Some(remaining) =


@@ 566,7 559,7 @@ pub async fn ingest_undo(

pub async fn ingest_create(
    activity: Verified<activitystreams::activity::Create>,
    ctx: Arc<crate::RouteContext>,
    ctx: Arc<crate::BaseContext>,
) -> Result<(), crate::Error> {
    for req_obj in activity.object().iter() {
        let object_id = req_obj.id();


@@ 819,10 812,7 @@ async fn handle_recieved_reply(
    let db = ctx.db_pool.get().await?;

    let author = match author {
        Some(author) => Some(
            super::get_or_fetch_user_local_id(&author, &db, &ctx.host_url_apub, &ctx.http_client)
                .await?,
        ),
        Some(author) => Some(super::get_or_fetch_user_local_id(&author, &db, &ctx).await?),
        None => None,
    };



@@ 999,10 989,7 @@ async fn handle_recieved_post(
) -> Result<(), crate::Error> {
    let db = ctx.db_pool.get().await?;
    let author = match author {
        Some(author) => Some(
            super::get_or_fetch_user_local_id(&author, &db, &ctx.host_url_apub, &ctx.http_client)
                .await?,
        ),
        Some(author) => Some(super::get_or_fetch_user_local_id(&author, &db, &ctx).await?),
        None => None,
    };


M src/apub_util/mod.rs => src/apub_util/mod.rs +13 -120
@@ 361,116 361,11 @@ pub async fn fetch_ap_object(

pub async fn fetch_actor(
    req_ap_id: &url::Url,
    db: &tokio_postgres::Client,
    http_client: &crate::HttpClient,
    ctx: Arc<crate::BaseContext>,
) -> Result<ActorLocalInfo, crate::Error> {
    let obj = fetch_ap_object(req_ap_id, http_client).await?;
    let ap_id = req_ap_id;

    match obj.deref() {
        KnownObject::Person(person) => {
            let username = person
                .preferred_username()
                .or_else(|| {
                    person
                        .name()
                        .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
                })
                .unwrap_or("");
            let inbox = person.inbox_unchecked().as_str();
            let shared_inbox = person
                .endpoints_unchecked()
                .and_then(|endpoints| endpoints.shared_inbox)
                .map(|url| url.as_str());
            let public_key = person
                .ext_one
                .public_key
                .as_ref()
                .map(|key| key.public_key_pem.as_bytes());
            let public_key_sigalg = person
                .ext_one
                .public_key
                .as_ref()
                .and_then(|key| key.signature_algorithm.as_deref());
            let description = person
                .summary()
                .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
                .unwrap_or("");

            let avatar = person.icon().and_then(|icon| {
                icon.iter()
                    .filter_map(|icon| {
                        if icon.kind_str() == Some("Image") {
                            match activitystreams::object::Image::from_any_base(icon.clone()) {
                                Err(_) | Ok(None) => None,
                                Ok(Some(icon)) => Some(icon),
                            }
                        } else {
                            None
                        }
                    })
                    .next()
            });
            let avatar = avatar
                .as_ref()
                .and_then(|icon| icon.url().and_then(|url| url.as_single_id()))
                .map(|x| x.as_str());

            let id = UserLocalID(db.query_one(
                "INSERT INTO person (username, local, created_local, ap_id, ap_inbox, ap_shared_inbox, public_key, public_key_sigalg, description, avatar) VALUES ($1, FALSE, localtimestamp, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (ap_id) DO UPDATE SET ap_inbox=$3, ap_shared_inbox=$4, public_key=$5, public_key_sigalg=$6, description=$7, avatar=$8 RETURNING id",
                &[&username, &ap_id.as_str(), &inbox, &shared_inbox, &public_key, &public_key_sigalg, &description, &avatar],
            ).await?.get(0));

            Ok(ActorLocalInfo::User {
                id,
                public_key: public_key.map(|key| PubKeyInfo {
                    algorithm: get_message_digest(public_key_sigalg),
                    key: key.to_owned(),
                }),
            })
        }
        KnownObject::Group(group) => {
            let name = group
                .preferred_username()
                .or_else(|| {
                    group
                        .name()
                        .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
                })
                .unwrap_or("");
            let description = group
                .summary()
                .and_then(|maybe| maybe.iter().filter_map(|x| x.as_xsd_string()).next())
                .unwrap_or("");
            let inbox = group.inbox_unchecked().as_str();
            let shared_inbox = group
                .endpoints_unchecked()
                .and_then(|endpoints| endpoints.shared_inbox)
                .map(|url| url.as_str());
            let public_key = group
                .ext_one
                .public_key
                .as_ref()
                .map(|key| key.public_key_pem.as_bytes());
            let public_key_sigalg = group
                .ext_one
                .public_key
                .as_ref()
                .and_then(|key| key.signature_algorithm.as_deref());

            let id = CommunityLocalID(db.query_one(
                "INSERT INTO community (name, local, ap_id, ap_inbox, ap_shared_inbox, public_key, public_key_sigalg, description) VALUES ($1, FALSE, $2, $3, $4, $5, $6, $7) ON CONFLICT (ap_id) DO UPDATE SET ap_inbox=$3, ap_shared_inbox=$4, public_key=$5, public_key_sigalg=$6, description=$7 RETURNING id",
                &[&name, &ap_id.as_str(), &inbox, &shared_inbox, &public_key, &public_key_sigalg, &description],
            ).await?.get(0));

            Ok(ActorLocalInfo::Community {
                id,
                public_key: public_key.map(|key| PubKeyInfo {
                    algorithm: get_message_digest(public_key_sigalg),
                    key: key.to_owned(),
                }),
            })
        }
    let obj = fetch_ap_object(req_ap_id, &ctx.http_client).await?;
    match ingest::ingest_object_boxed(obj, ingest::FoundFrom::Other, ctx).await? {
        Some(info) => Ok(info),
        _ => Err(crate::Error::InternalStrStatic("Unrecognized actor type")),
    }
}


@@ 478,10 373,9 @@ pub async fn fetch_actor(
pub async fn get_or_fetch_user_local_id(
    ap_id: &url::Url,
    db: &tokio_postgres::Client,
    host_url_apub: &BaseURL,
    http_client: &crate::HttpClient,
    ctx: &Arc<crate::BaseContext>,
) -> Result<UserLocalID, crate::Error> {
    if let Some(remaining) = try_strip_host(ap_id, host_url_apub) {
    if let Some(remaining) = try_strip_host(ap_id, &ctx.host_url_apub) {
        if remaining.starts_with("/users/") {
            Ok(remaining[7..].parse()?)
        } else {


@@ 499,7 393,7 @@ pub async fn get_or_fetch_user_local_id(
            None => {
                // Not known yet, time to fetch

                let actor = fetch_actor(ap_id, db, http_client).await?;
                let actor = fetch_actor(ap_id, ctx.clone()).await?;

                if let ActorLocalInfo::User { id, .. } = actor {
                    Ok(id)


@@ 1489,7 1383,7 @@ pub async fn check_signature_for_actor(
    headers: &hyper::header::HeaderMap,
    actor_ap_id: &url::Url,
    db: &tokio_postgres::Client,
    http_client: &crate::HttpClient,
    ctx: &Arc<crate::BaseContext>,
) -> Result<bool, crate::Error> {
    let found_key = db.query_opt("(SELECT public_key, public_key_sigalg FROM person WHERE ap_id=$1) UNION ALL (SELECT public_key, public_key_sigalg FROM community WHERE ap_id=$1) LIMIT 1", &[&actor_ap_id.as_str()]).await?
        .and_then(|row| {


@@ 1521,7 1415,7 @@ pub async fn check_signature_for_actor(
    // Either no key found or failed to verify
    // Try fetching the actor/key

    let actor = fetch_actor(actor_ap_id, db, http_client).await?;
    let actor = fetch_actor(actor_ap_id, ctx.clone()).await?;

    if let Some(key_info) = actor.public_key() {
        let key = openssl::pkey::PKey::public_key_from_pem(&key_info.key)?;


@@ 1544,8 1438,7 @@ pub async fn check_signature_for_actor(
pub async fn verify_incoming_object(
    mut req: hyper::Request<hyper::Body>,
    db: &tokio_postgres::Client,
    http_client: &crate::HttpClient,
    apub_proxy_rewrites: bool,
    ctx: &Arc<crate::BaseContext>,
) -> Result<Verified<KnownObject>, crate::Error> {
    let req_body = hyper::body::to_bytes(req.body_mut()).await?;



@@ 1556,7 1449,7 @@ pub async fn verify_incoming_object(
                crate::Error::InternalStrStatic("Missing id in received activity")
            })?;

            let res_body = fetch_ap_object(&ap_id, http_client).await?;
            let res_body = fetch_ap_object(&ap_id, &ctx.http_client).await?;

            Ok(res_body)
        }


@@ 1582,7 1475,7 @@ pub async fn verify_incoming_object(
                .as_str();

            // path ends up wrong with our recommended proxy config
            let path_and_query = if apub_proxy_rewrites {
            let path_and_query = if ctx.apub_proxy_rewrites {
                req.headers()
                    .get("x-forwarded-path")
                    .map(|x| x.to_str())


@@ 1599,7 1492,7 @@ pub async fn verify_incoming_object(
                &req.headers(),
                &actor_ap_id,
                db,
                http_client,
                &ctx,
            )
            .await?
            {

M src/main.rs => src/main.rs +8 -20
@@ 142,6 142,8 @@ pub struct BaseContext {
    pub api_ratelimit: henry::RatelimitBucket<std::net::IpAddr>,

    pub local_hostname: String,

    worker_trigger: tokio::sync::mpsc::Sender<()>,
}

impl BaseContext {


@@ 190,14 192,7 @@ impl BaseContext {
            href.into()
        }
    }
}

pub struct RouteContext {
    base: Arc<BaseContext>,
    worker_trigger: tokio::sync::mpsc::Sender<()>,
}

impl RouteContext {
    pub async fn enqueue_task<T: crate::tasks::TaskDef>(
        &self,
        task: &T,


@@ 217,13 212,7 @@ impl RouteContext {
    }
}

impl std::ops::Deref for RouteContext {
    type Target = BaseContext;

    fn deref(&self) -> &BaseContext {
        &self.base
    }
}
pub type RouteContext = BaseContext;

pub type RouteNode<P> = trout::Node<
    P,


@@ 991,8 980,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
        panic!("SMTP_URL was provided, but SMTP_FROM was not");
    }

    let (worker_trigger, worker_rx) = tokio::sync::mpsc::channel(1);

    let routes = Arc::new(routes::route_root());
    let base_context = Arc::new(BaseContext {
    let context = Arc::new(BaseContext {
        local_hostname: get_url_host(&host_url_apub)
            .expect("Couldn't find host in HOST_URL_ACTIVITYPUB"),



@@ 1005,15 996,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
        http_client: hyper::Client::builder().build(hyper_tls::HttpsConnector::new()),
        apub_proxy_rewrites,
        api_ratelimit: henry::RatelimitBucket::new(300),
    });

    let worker_trigger = worker::start_worker(base_context.clone());

    let context = Arc::new(RouteContext {
        base: base_context,
        worker_trigger,
    });

    worker::start_worker(context.clone(), worker_rx);

    let server = hyper::Server::bind(&(std::net::Ipv6Addr::UNSPECIFIED, port).into()).serve(
        hyper::service::make_service_fn(|sock: &hyper::server::conn::AddrStream| {
            let addr_direct = sock.remote_addr().ip();

M src/routes/api/mod.rs => src/routes/api/mod.rs +1 -3
@@ 258,8 258,6 @@ async fn route_unstable_actors_lookup(
    let (query,) = params;
    println!("lookup {}", query);

    let db = ctx.db_pool.get().await?;

    let lookup = parse_lookup(&query)?;

    let uri = match lookup {


@@ 314,7 312,7 @@ async fn route_unstable_actors_lookup(
        }
    };

    let actor = crate::apub_util::fetch_actor(&uri, &db, &ctx.http_client).await?;
    let actor = crate::apub_util::fetch_actor(&uri, ctx).await?;

    let info = match actor {
        crate::apub_util::ActorLocalInfo::Community { id, .. } => {

M src/routes/apub/mod.rs => src/routes/apub/mod.rs +1 -7
@@ 229,13 229,7 @@ async fn inbox_common(
) -> Result<hyper::Response<hyper::Body>, crate::Error> {
    let db = ctx.db_pool.get().await?;

    let object = crate::apub_util::verify_incoming_object(
        req,
        &db,
        &ctx.http_client,
        ctx.apub_proxy_rewrites,
    )
    .await?;
    let object = crate::apub_util::verify_incoming_object(req, &db, &ctx).await?;

    crate::apub_util::ingest::ingest_object(
        object,

M src/tasks.rs => src/tasks.rs +6 -7
@@ 1,12 1,13 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::sync::Arc;

#[async_trait]
pub trait TaskDef: Serialize + std::fmt::Debug + Sync {
    const KIND: &'static str;
    const MAX_ATTEMPTS: i16 = 8;
    async fn perform(self, ctx: &crate::BaseContext) -> Result<(), crate::Error>;
    async fn perform(self, ctx: Arc<crate::BaseContext>) -> Result<(), crate::Error>;
}

#[derive(Deserialize, Serialize, Debug)]


@@ 20,7 21,7 @@ pub struct DeliverToInbox<'a> {
impl<'a> TaskDef for DeliverToInbox<'a> {
    const KIND: &'static str = "deliver_to_inbox";

    async fn perform(self, ctx: &crate::BaseContext) -> Result<(), crate::Error> {
    async fn perform(self, ctx: Arc<crate::BaseContext>) -> Result<(), crate::Error> {
        let db = ctx.db_pool.get().await?;

        let signing_info: Option<(_, _)> = match self.sign_as {


@@ 75,7 76,7 @@ pub struct DeliverToFollowers {
impl TaskDef for DeliverToFollowers {
    const KIND: &'static str = "deliver_to_followers";

    async fn perform(self, ctx: &crate::BaseContext) -> Result<(), crate::Error> {
    async fn perform(self, ctx: Arc<crate::BaseContext>) -> Result<(), crate::Error> {
        let community_id = match self.actor {
            crate::ActorLocalRef::Community(id) => id,
            crate::ActorLocalRef::Person(_) => return Ok(()), // We don't have user followers at this point


@@ 101,10 102,8 @@ pub struct FetchActor<'a> {
impl<'a> TaskDef for FetchActor<'a> {
    const KIND: &'static str = "fetch_actor";

    async fn perform(self, ctx: &crate::BaseContext) -> Result<(), crate::Error> {
        let db = ctx.db_pool.get().await?;

        crate::apub_util::fetch_actor(&self.actor_ap_id, &db, &ctx.http_client).await?;
    async fn perform(self, ctx: Arc<crate::BaseContext>) -> Result<(), crate::Error> {
        crate::apub_util::fetch_actor(&self.actor_ap_id, ctx).await?;

        Ok(())
    }

M src/worker.rs => src/worker.rs +3 -6
@@ 1,10 1,7 @@
use std::sync::Arc;

pub fn start_worker(ctx: Arc<crate::BaseContext>) -> tokio::sync::mpsc::Sender<()> {
    let (tx, rx) = tokio::sync::mpsc::channel(1);
pub fn start_worker(ctx: Arc<crate::BaseContext>, rx: tokio::sync::mpsc::Receiver<()>) {
    crate::spawn_task(run_worker(ctx, rx));

    tx
}

async fn run_worker(


@@ 39,7 36,7 @@ async fn run_worker(
            let kind: &str = row.get(1);
            let params: serde_json::Value = row.get(2);

            let result = perform_task(&ctx, kind, params).await;
            let result = perform_task(ctx.clone(), kind, params).await;
            if let Err(err) = result {
                let err = format!("{:?}", err);
                db.execute(


@@ 63,7 60,7 @@ async fn run_worker(
}

async fn perform_task(
    ctx: &crate::BaseContext,
    ctx: Arc<crate::BaseContext>,
    kind: &str,
    params: serde_json::Value,
) -> Result<(), crate::Error> {