~vikanezrimaya/kittybox

70a150d182a6b49042276140aeceaea5034fb410 — Vika 3 months ago f2e8915 microsub
WIP: Microsub database skeleton

I've tried to construct a rather primitive database schema that should
work with the Microsub API. While I am not entirely sure whether the SQL
is correct, it should provide some inspiration for future.

I should probably set up a test harness around the database object.
M Cargo.lock => Cargo.lock +26 -0
@@ 1723,6 1723,32 @@ dependencies = [
]

[[package]]
name = "kittybox-microsub"
version = "0.1.0"
dependencies = [
 "async-trait",
 "axum",
 "chrono",
 "futures-util",
 "hyper",
 "kittybox-util",
 "listenfd",
 "microformats",
 "reqwest",
 "serde",
 "serde_json",
 "serde_urlencoded",
 "sqlx",
 "thiserror",
 "tokio",
 "tokio-stream",
 "tokio-util",
 "tracing",
 "url",
 "uuid 1.4.0",
]

[[package]]
name = "kittybox-util"
version = "0.1.0"
dependencies = [

M Cargo.toml => Cargo.toml +2 -2
@@ 47,8 47,8 @@ path = "src/bin/kittybox-mf2.rs"
required-features = ["cli"]

[workspace]
members = [".", "./util", "./templates", "./indieauth", "./templates-neo"]
default-members = [".", "./util", "./templates", "./indieauth"]
members = [".", "./util", "./templates", "./indieauth", "./templates-neo", "./microsub"]
default-members = [".", "./util", "./templates", "./indieauth", "./microsub"]
[dependencies.kittybox-util]
version = "0.1.0"
path = "./util"

A microsub/Cargo.toml => microsub/Cargo.toml +55 -0
@@ 0,0 1,55 @@
[package]
name = "kittybox-microsub"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "^0.1.50"      # Type erasure for async trait methods
futures-util = "^0.3.14"     # Common utilities and extension traits for the futures-rs library
listenfd = "^0.5.0"          # A simple library to work with listenfds passed from the outside (systemd/catflap socket activation)
serde_json = "^1.0.64"       # A JSON serialization file format
serde_urlencoded = "^0.7.0"  # `x-www-form-urlencoded` meets Serde
tracing = { version = "0.1.34", features = [] }
uuid = { version = "^1.3.3", features = ["serde"] }
[dependencies.tokio]
version = "^1.29.1"
features = ["full", "tracing"] # TODO determine if my app doesn't need some features
#[dependencies.console-subscriber]
#version = "0.1.10"
[dependencies.tokio-stream]
version = "^0.1.8"
features = ["time", "net"]
[dependencies.tokio-util]
version = "^0.7.3"
features = ["io-util"]
[dependencies.axum]
version = "^0.6.18"
features = ["multipart", "json", "headers", "form", "macros"]
[dependencies.chrono]        # Date and time library for Rust
version = "^0.4.19"
features = ["serde"]
[dependencies.serde]         # A generic serialization/deserialization framework
version = "^1.0.170"
features = ["derive"]
[dependencies.url]           # URL library for Rust, based on the WHATWG URL Standard
version = "^2.2.1"
features = ["serde"]
[dependencies.hyper]
version = "^0.14.17"
features = ["stream", "runtime"]
[dependencies.reqwest]
version = "^0.11.10"
default-features = false
features = ["gzip", "brotli", "json", "stream"]
[dependencies.microformats]
version = "^0.3.0"
[dependencies.kittybox-util]
version = "0.1.0"
path = "../util"
[dependencies.thiserror]
version = "1.0.35"
[dependencies.sqlx]
version = "^0.7"
features = ["uuid", "chrono", "json", "postgres", "runtime-tokio"]
\ No newline at end of file

A microsub/migrations/0001_init.sql => microsub/migrations/0001_init.sql +63 -0
@@ 0,0 1,63 @@
CREATE SCHEMA kittybox_microsub;

CREATE TABLE kittybox_microsub.users (
    domain TEXT PRIMARY KEY,
);

CREATE TABLE kittybox_microsub.feeds (
    -- URL of the feed. We don't store the feed type here, since it will be autodetected by the firehose.
    url TEXT PRIMARY KEY,
    -- Do we have a websub subscription to this feed?
    websub BOOLEAN,
    -- Last time we crawled this feed or renewed our websub subscription.
    last_crawl TIMESTAMPTZ,
    -- How much to wait before querying this feed again or renewing our subscription.
    lifetime INTERVAL
);

CREATE TABLE kittybox_microsub.posts (
    -- internal ID of the post.
    _id UUID PRIMARY KEY,
    -- Feed from which this post was sourced. Can be `null`, in which case it's a notification.
    _source TEXT REFERENCES kittybox_microsub.feeds(url) ON DELETE CASCADE,
    -- Data about the post in JF2 format.
    jf2 JSONB NOT NULL,
);

CREATE TABLE kittybox_microsub.channels (
    -- internal ID of the channel.
    uid UUID PRIMARY KEY NOT NULL DEFAULT gen_random_uuid(),
    -- User-facing channel name.
    name TEXT NOT NULL,
    -- Owner of the channel.
    user TEXT NOT NULL REFERENCES kittybox_microsub.users(domain) ON DELETE CASCADE
);

CREATE TABLE kittybox_microsub.follows (
    channel UUID REFERENCES kittybox_microsub.channels(uid),
    feed TEXT REFERENCES kittybox_microsub.feeds(url),
    UNIQUE(channel, feed)
)

CREATE INDEX channels_by_user ON kittybox_microsub.channels (user);

CREATE TABLE kittybox_microsub.posts_read_status (
    channel UUID REFERENCES kittybox_microsub.channels(uid) ON DELETE CASCADE,
    post UUID NOT NULL REFERENCES kittybox_microsub.posts(_id) ON DELETE CASCADE,
    _is_read BOOLEAN NOT NULL DEFAULT false,
    UNIQUE(channel, post)
)

-- We use a special channel for notifications, because of how notifications are handled.
--
-- We don't really use a "feed" for notifications. Instead, notifications are stored directly in a special table.
CREATE TABLE kittybox_microsub.notifications (
    user TEXT NOT NULL REFERENCES kittybox_microsub.users(domain) ON DELETE CASCADE,
    post UUID UNIQUE NOT NULL
);

CREATE TABLE kittybox_microsub.muted (
    channel UUID REFERENCES kittybox_microsub.channels(uid) ON DELETE CASCADE,
    profile_url TEXT NOT NULL,
    UNIQUE(channel, profile_url)
)
\ No newline at end of file

A microsub/src/database/postgres.rs => microsub/src/database/postgres.rs +150 -0
@@ 0,0 1,150 @@
use std::num::NonZeroU8;

use crate::database::{Database, Error, Result};
use crate::microsub::{Post, Channel, JF2, Timeline, ChannelId};
use async_trait::async_trait;
use sqlx::postgres::PgPool;
use sqlx::migrate::Migrator;

static MIGRATOR: Migrator = sqlx::migrate!();

pub struct Postgres {
    pool: PgPool
}

impl Postgres {
    pub fn new(pool: PgPool) -> impl std::future::Future<Output = std::result::Result<Self, sqlx::Error>> {
        use futures_util::future::TryFutureExt;
        ({
            let pool = pool.clone();
            // We introduce an async block here to hold a borrow
            async move { MIGRATOR.run(&pool).await } 
        })
            .map_ok(move |()| Self { pool })
            .map_err(Into::into)
    }
}

#[async_trait]
impl Database for Postgres {
    async fn add_user(&self, user: &str) -> Result<()> {
        sqlx::query("INSERT INTO kittybox_microsub.users (domain) VALUES ($1)")
            .bind(user)
            .execute(&self.pool)
            .await
            .map_err(Error::backend)
            .map(|_| ())
    }
    async fn notify(&self, user: &str, post: &JF2) -> Result<()> {
        let id = sqlx::query_scalar::<_, uuid::Uuid>("INSERT INTO kittybox_microsub.posts (_source, post) VALUES (null, $1) RETURNING _id")
            .bind(post)
            .fetch_one(&self.pool)
            .await
            .map_err(Error::backend)?;

        sqlx::query("INSERT INTO kittybox_microsub.notifications (user, post) VALUES ($1, $2)")
            .bind(user)
            .bind(id)
            .execute(&self.pool)
            .await
            .map_err(Error::backend)?;

        Ok(())
    }
    async fn channel_list(&self, user: &str) -> Result<Vec<Channel>> {
        // XXX I have no idea if this works. Need to test.
        let channels = sqlx::query_as::<_, (uuid::Uuid, String, i32)>("\
SELECT id, name, COUNT(posts._is_read) FROM kittybox_micropub.channels
JOIN kittybox_microsub.follows ON follows.channel = channels.uid
JOIN kittybox_microsub.posts ON posts._source = follows.feed
JOIN kittybox_microsub.posts_read_status ON posts_read_status.channel = channels.uid AND posts_read_status.post = posts._id,
WHERE channels.user = $1 AND posts_read_status._is_read = false
GROUP BY channels.id")
            .bind(user)
            .fetch_all(&self.pool)
            .await
            .map_err(Error::backend)
            .map(|channels| channels.into_iter().map(
                |(uid, name, unread)| Channel {
                    uid: ChannelId::Uuid(uid), name, unread: unread as usize
                }
            ))?;

        let notifications: Channel = sqlx::query_scalar::<_, i32>("SELECT COUNT(*) FROM kittybox_micropub.posts_read_status WHERE channel IS NULL")
            .fetch_one(&self.pool)
            .await
            .map_err(Error::backend)
            .map(|unread| Channel { uid: ChannelId::Notifications, name: "Notifications".to_owned(), unread: unread as usize})?;

        let mut result = vec![notifications];
        result.extend(channels);

        Ok(result)
    }
    async fn add_channel(&self, user: &str, name: &str) -> Result<uuid::Uuid> {
        sqlx::query_scalar::<_, uuid::Uuid>("INSERT INTO kittybox_microsub.channels (name, user) RETURNING uid")
            .bind(name)
            .bind(user)
            .fetch_one(&self.pool)
            .await
            .map_err(Error::backend)
    }
    async fn rename_channel(&self, user: &str, channel: uuid::Uuid, name: &str) -> Result<()> {
        sqlx::query("UPDATE kittybox_microsub.channels SET name = $1 WHERE uid = $2 AND user = $3")
            .bind(name)
            .bind(channel)
            .bind(user)
            .execute(&self.pool)
            .await
            .map_err(Error::backend)
            .map(|_| ())
    }
    async fn delete_channel(&self, user: &str, channel: uuid::Uuid) -> Result<()> {
        sqlx::query("DELETE FROM kittybox_microsub.channels WHERE user = $1 AND uid = $2")
            .bind(user)
            .bind(channel)
            .execute(&self.pool)
            .await
            .map_err(Error::backend)
            .map(|_| ())
    }
    async fn add_feed(&self, user: &str, channel: &str, feed: &url::Url) -> Result<()> {
        todo!()
    }
    async fn add_post(&self, feed: &url::Url, post: &JF2) -> Result<uuid::Uuid> {
        todo!()
    }
    async fn timeline(&self, user: &str, channel: uuid::Uuid, after: Option<&str>, before: Option<&str>, limit: Option<NonZeroU8>) -> Result<Timeline> {
        todo!()
    }
    async fn timeline_mark_read(&self, user: &str, channel: uuid::Uuid, unread: bool, entries: &[uuid::Uuid]) -> Result<()> {
        todo!()
    }
    async fn timeline_mark_read_after(&self, user: &str, channel: uuid::Uuid, last_read: uuid::Uuid) -> Result<()> {
        todo!()
    }
    async fn timeline_remove(&self, user: &str, channel: uuid::Uuid, entries: &[uuid::Uuid]) -> Result<()> {
        todo!()
    }
    async fn follow_list(&self, user: &str, channel: uuid::Uuid) -> Result<Vec<url::Url>> {
        todo!()
    }
    async fn follow(&self, user: &str, channel: uuid::Uuid, feed: &url::Url) -> Result<()> {
        todo!()
    }
    async fn unfollow(&self, user: &str, channel: uuid::Uuid, feed: &url::Url) -> Result<()> {
        todo!()
    }
    async fn mute_list(&self, user: &str, channel: Option<uuid::Uuid>) -> Result<Vec<url::Url>> {
        todo!()
    }
    async fn mute(&self, user: &str, channel: Option<uuid::Uuid>, url: &url::Url) -> Result<()> {
        todo!()
    }
    async fn unmute(&self, user: &str, channel: Option<uuid::Uuid>, url: &url::Url) -> Result<()> {
        todo!()
    }
    async fn block(&self, user: &str, channel: Option<uuid::Uuid>, url: &url::Url) -> Result<()> {
        todo!()
    }
}

A microsub/src/firehose/mod.rs => microsub/src/firehose/mod.rs +0 -0
A microsub/src/lib.rs => microsub/src/lib.rs +79 -0
@@ 0,0 1,79 @@
pub mod firehose;
pub mod microsub;
pub mod database {
    use std::num::NonZeroU8;

    use async_trait::async_trait;

    use crate::microsub::{Timeline, Channel, Post, JF2};

    pub mod postgres;

    #[derive(Debug, thiserror::Error)]
    pub enum Error {
        #[error("storage backend error: {0}")]
        Backend(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
        #[error("user not found")]
        UserNotFound,
        #[error("channel not found")]
        ChannelNotFound,
        #[error("feed not found")]
        FeedNotFound,
        #[error("post not found")]
        PostNotFound,
        #[error("object already exists")]
        AlreadyExists
    }
    impl Error {
        fn backend<E: std::error::Error + Send + Sync + 'static>(error: E) -> Self {
            Self::Backend(Box::new(error))
        }
    }

    pub type Result<T> = std::result::Result<T, Error>;

    /// A storage backend for Microsub reader data.
    #[async_trait]
    pub trait Database: Send + Sync + 'static {
        /// Add user to the database.
        async fn add_user(&self, user: &str) -> Result<()>;
        /// Post a notification for a user in their notifications channel.
        async fn notify(&self, user: &str, post: &JF2) -> Result<()>;
        /// List channels and the amount of unread posts in them.
        /// 
        /// The `notifications` channel shall always be first.
        async fn channel_list(&self, user: &str) -> Result<Vec<Channel>>;
        /// Add channel to a user.
        async fn add_channel(&self, user: &str, name: &str) -> Result<uuid::Uuid>;
        /// Rename a channel.
        async fn rename_channel(&self, user: &str, channel: uuid::Uuid, name: &str) -> Result<()>;
        /// Delete a channel.
        async fn delete_channel(&self, user: &str, channel: uuid::Uuid) -> Result<()>;
        /// Add a feed to a channel.
        async fn add_feed(&self, user: &str, channel: &str, feed: &url::Url) -> Result<()>;
        /// Add a post to a feed.
        async fn add_post(&self, feed: &url::Url, post: &JF2) -> Result<uuid::Uuid>;
        /// Return a timeline corresponding to a channel owned by a certain user.
        async fn timeline(&self, user: &str, channel: uuid::Uuid, after: Option<&str>, before: Option<&str>, limit: Option<NonZeroU8>) -> Result<Timeline>;
        /// Mark an one or more entries as read or unread.
        async fn timeline_mark_read(&self, user: &str, channel: uuid::Uuid, unread: bool, entries: &[uuid::Uuid]) -> Result<()>;
        /// Mark all entries after a certain one as read.
        async fn timeline_mark_read_after(&self, user: &str, channel: uuid::Uuid, last_read: uuid::Uuid) -> Result<()>;
        /// Remove one or several entries from a timeline, hiding them from view.
        async fn timeline_remove(&self, user: &str, channel: uuid::Uuid, entries: &[uuid::Uuid]) -> Result<()>;
        /// List followed feeds in a channel.
        async fn follow_list(&self, user: &str, channel: uuid::Uuid) -> Result<Vec<url::Url>>;
        /// Follow a feed and file it into a channel.
        async fn follow(&self, user: &str, channel: uuid::Uuid, feed: &url::Url) -> Result<()>;
        /// Unfollow a feed from a channel.
        async fn unfollow(&self, user: &str, channel: uuid::Uuid, feed: &url::Url) -> Result<()>;
        /// List mutes in a channel or globally. Returns a list of profile URLs.
        async fn mute_list(&self, user: &str, channel: Option<uuid::Uuid>) -> Result<Vec<url::Url>>;
        /// Mute a user (by their h-card url/author url) in a channel or globally.
        async fn mute(&self, user: &str, channel: Option<uuid::Uuid>, url: &url::Url) -> Result<()>;
        /// Unmute a user in a channel or globally.
        async fn unmute(&self, user: &str, channel: Option<uuid::Uuid>, url: &url::Url) -> Result<()>;
        /// Block a user in a channel or globally, delete all their posts and don't store any of their further posts.
        async fn block(&self, user: &str, channel: Option<uuid::Uuid>, url: &url::Url) -> Result<()>;
    }
}
\ No newline at end of file

A microsub/src/main.rs => microsub/src/main.rs +4 -0
@@ 0,0 1,4 @@
#[tokio::main]
async fn main() {
    todo!()
}

A microsub/src/microsub/mod.rs => microsub/src/microsub/mod.rs +49 -0
@@ 0,0 1,49 @@
pub use kittybox_util::storage::{Storage, Error as MicropubStorageError};

/// XXX: replace this eventually
pub type JF2 = serde_json::Value;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Source {
    _id: url::Url,
    name: Option<String>,
    photo: Option<url::Url>,
    url: url::Url
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Post {
    #[serde(flatten)]
    pub jf2: JF2,
    pub _id: uuid::Uuid,
    pub _source: Source,
    pub _is_read: bool,
    pub _is_liked: Option<bool>,
    pub _is_reposted: Option<bool>,
    pub _is_bookmarked: Option<bool>,
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Paging {
    pub before: Option<String>,
    pub after: Option<String>
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Timeline {
    pub items: Vec<Post>,
    pub paging: Paging
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(untagged, rename_all = "lowercase")]
pub enum ChannelId {
    Notifications,
    Uuid(uuid::Uuid)
}
#[derive(sqlx::FromRow, Debug, serde::Serialize, serde::Deserialize)]
pub struct Channel {
    pub uid: ChannelId,
    pub name: String,
    pub unread: usize
}
\ No newline at end of file