~witcher/rss-email

e8767d29f5641d9555033a980746cf6d9dbe5214 — witcher 1 year, 6 months ago 56324d7 database-abstraction
WIP: Add postgres support
15 files changed, 329 insertions(+), 63 deletions(-)

M .gitignore
M Cargo.lock
M Cargo.toml
R migrations/{20221120130636_create_posts_table.down.sql => postgres/20221228201020_create_table.down.sql}
A migrations/postgres/20221228201020_create_table.up.sql
A migrations/sqlite/20221120130636_create_posts_table.down.sql
R migrations/{20221120130636_create_posts_table.up.sql => sqlite/20221120130636_create_posts_table.up.sql}
R migrations/{20221203144155_remove_feedurl_author.down.sql => sqlite/20221203144155_remove_feedurl_author.down.sql}
R migrations/{20221203144155_remove_feedurl_author.up.sql => sqlite/20221203144155_remove_feedurl_author.up.sql}
A src/database.rs
A src/database/postgres.rs
A src/database/sqlite.rs
D src/db.rs
M src/feed.rs
M src/main.rs
M .gitignore => .gitignore +1 -0
@@ 1,3 1,4 @@
/target
.env
dev.db
.vim

M Cargo.lock => Cargo.lock +113 -2
@@ 30,9 30,9 @@ checksum = "508b352bb5c066aac251f6daf6b36eccd03e8a88e8081cd44959ea277a3af9a8"

[[package]]
name = "async-trait"
version = "0.1.58"
version = "0.1.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e805d94e6b5001b651426cf4cd446b1ab5f319d27bab5c644f61de0a804360c"
checksum = "677d1d8ab452a3936018a687b20e6f7cf5363d713b732b8884001317b0e48aa3"
dependencies = [
 "proc-macro2",
 "quote",


@@ 327,6 327,7 @@ checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f"
dependencies = [
 "block-buffer",
 "crypto-common",
 "subtle",
]

[[package]]


@@ 348,6 349,15 @@ dependencies = [
]

[[package]]
name = "dirs"
version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059"
dependencies = [
 "dirs-sys",
]

[[package]]
name = "dirs-sys"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 596,6 606,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"

[[package]]
name = "hkdf"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "791a029f6b9fc27657f6f188ec6e5e43f6911f6f878e0dc5501396e09809d437"
dependencies = [
 "hmac",
]

[[package]]
name = "hmac"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
dependencies = [
 "digest",
]

[[package]]
name = "http"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 821,6 849,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"

[[package]]
name = "md-5"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca"
dependencies = [
 "digest",
]

[[package]]
name = "memchr"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 992,6 1029,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae"

[[package]]
name = "ppv-lite86"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"

[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 1050,6 1093,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fee2dce59f7a43418e3382c766554c614e06a552d53a8f07ef499ea4b332c0f"

[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
 "libc",
 "rand_chacha",
 "rand_core",
]

[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
 "ppv-lite86",
 "rand_core",
]

[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
 "getrandom",
]

[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 1140,6 1213,7 @@ name = "rss-email"
version = "0.3.0"
dependencies = [
 "anyhow",
 "async-trait",
 "atom_syndication",
 "chrono",
 "clap",


@@ 1242,6 1316,17 @@ dependencies = [
]

[[package]]
name = "sha1"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3"
dependencies = [
 "cfg-if",
 "cpufeatures",
 "digest",
]

[[package]]
name = "sha2"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 1343,11 1428,13 @@ checksum = "dcbc16ddba161afc99e14d1713a453747a2b07fc097d2009f4c300ec99286105"
dependencies = [
 "ahash",
 "atoi",
 "base64",
 "bitflags",
 "byteorder",
 "bytes",
 "crc",
 "crossbeam-queue",
 "dirs",
 "dotenvy",
 "either",
 "event-listener",


@@ 1359,18 1446,24 @@ dependencies = [
 "futures-util",
 "hashlink",
 "hex",
 "hkdf",
 "hmac",
 "indexmap",
 "itoa",
 "libc",
 "libsqlite3-sys",
 "log",
 "md-5",
 "memchr",
 "once_cell",
 "paste",
 "percent-encoding",
 "rand",
 "rustls",
 "rustls-pemfile",
 "serde",
 "serde_json",
 "sha1",
 "sha2",
 "smallvec",
 "sqlformat",


@@ 1380,6 1473,7 @@ dependencies = [
 "tokio-stream",
 "url",
 "webpki-roots",
 "whoami",
]

[[package]]


@@ 1432,6 1526,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"

[[package]]
name = "subtle"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"

[[package]]
name = "syn"
version = "1.0.99"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 1826,6 1926,17 @@ dependencies = [
]

[[package]]
name = "whoami"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6631b6a2fd59b1841b622e8f1a7ad241ef0a46f2d580464ce8140ac94cbd571"
dependencies = [
 "bumpalo",
 "wasm-bindgen",
 "web-sys",
]

[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"

M Cargo.toml => Cargo.toml +6 -1
@@ 9,6 9,10 @@ keywords = ["email", "rss", "atom"]
license = "WTFPL"
edition = "2021"

[features]
sqlite = ["sqlx/sqlite"]
postgres = ["sqlx/postgres"]

[profile.release]
strip = "symbols"



@@ 26,6 30,7 @@ serde = { version = "1.0", features = ["derive"] }
directories = "4.0.1"
log = "0.4.17"
tokio = { version = "1.21.2", default-features = false, features = ["rt-multi-thread", "macros"] }
sqlx = { version = "0.6.2", features = ["runtime-tokio-rustls", "migrate", "sqlite", "offline"] }
sqlx = { version = "0.6.2", features = ["runtime-tokio-rustls", "migrate", "sqlite", "postgres", "offline"] }
atom_syndication = "0.11.0"
simple_logger = "4.0.0"
async-trait = "0.1.60"

R migrations/20221120130636_create_posts_table.down.sql => migrations/postgres/20221228201020_create_table.down.sql +0 -0
A migrations/postgres/20221228201020_create_table.up.sql => migrations/postgres/20221228201020_create_table.up.sql +10 -0
@@ 0,0 1,10 @@
CREATE TABLE posts (
	guid		TEXT	NOT NULL,
	title		TEXT,
	url			TEXT,
	pub_date	BIGINT,
	content		TEXT,
	sent		BOOLEAN	NOT NULL DEFAULT false,

	PRIMARY KEY(guid)
);

A migrations/sqlite/20221120130636_create_posts_table.down.sql => migrations/sqlite/20221120130636_create_posts_table.down.sql +1 -0
@@ 0,0 1,1 @@
DROP TABLE posts;

R migrations/20221120130636_create_posts_table.up.sql => migrations/sqlite/20221120130636_create_posts_table.up.sql +0 -0
R migrations/20221203144155_remove_feedurl_author.down.sql => migrations/sqlite/20221203144155_remove_feedurl_author.down.sql +0 -0
R migrations/20221203144155_remove_feedurl_author.up.sql => migrations/sqlite/20221203144155_remove_feedurl_author.up.sql +0 -0
A src/database.rs => src/database.rs +34 -0
@@ 0,0 1,34 @@
use crate::models::Post;
use async_trait::async_trait;

#[cfg(all(feature = "sqlite", feature = "postgres"))]
compile_error!("Features \"sqlite\" and \"postgres\" cannot both be enabled");

#[cfg(not(any(feature = "sqlite", feature = "postgres")))]
compile_error!("Either the feature \"sqlite\" or \"postgres\" needs to be enabled");

#[cfg(feature = "postgres")]
mod postgres;
#[cfg(features = "sqlite")]
mod sqlite;

#[cfg(feature = "postgres")]
pub use postgres::Postgres;
#[cfg(features = "sqlite")]
pub use sqlite::Sqlite;

/// A trait modelling a database. Different database drivers can be implemented using this trait,
/// e.g. sqlite and Postgres.
#[async_trait]
pub trait Database {
    type DB: sqlx::Database;

    /// Apply all database schema migrations
    async fn migrate(&self) -> Result<(), sqlx::Error>;
    /// Acquire a connection to the underlying database pool
    async fn acquire(&self) -> Result<sqlx::pool::PoolConnection<Self::DB>, sqlx::Error>;
    /// Insert a new post or update an old one with the same GUID
    async fn insert_item(&self, post: &Post) -> Result<(), sqlx::Error>;
    /// Retrieve all unsent posts from the database
    async fn get_unsent(&self) -> Result<Vec<Post>, sqlx::Error>;
}

A src/database/postgres.rs => src/database/postgres.rs +64 -0
@@ 0,0 1,64 @@
use super::Database;
use crate::models::Post;

use async_trait::async_trait;

pub struct Postgres {
    pool: sqlx::Pool<sqlx::Postgres>,
}

impl Postgres {
    pub async fn new<S>(path: S) -> anyhow::Result<Self>
    where
        S: AsRef<str>,
    {
        let path = path.as_ref();
        debug!("Establishing connection to postgres database at {:?}", path);

        Ok(Self {
            pool: sqlx::postgres::PgPoolOptions::new()
                .max_connections(5)
                .connect(path)
                .await?,
        })
    }
}

#[async_trait]
impl Database for Postgres {
    type DB = sqlx::Postgres;

    async fn migrate(&self) -> Result<(), sqlx::Error> {
        sqlx::migrate!("./migrations/postgres")
            .run(&self.pool)
            .await
            .map_err(Into::into)
    }

    async fn acquire(&self) -> Result<sqlx::pool::PoolConnection<Self::DB>, sqlx::Error> {
        self.pool.acquire().await
    }

    async fn insert_item(&self, post: &Post) -> Result<(), sqlx::Error> {
        sqlx::query!(
            "INSERT INTO posts (guid, title, url, pub_date, content) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING",
            post.guid,
            post.title,
            post.url,
            post.pub_date,
            post.content
        )
        .execute(&self.pool)
        .await
        .map(|_| ())
    }

    async fn get_unsent(&self) -> Result<Vec<Post>, sqlx::Error> {
        sqlx::query_as!(
            Post,
            "SELECT * FROM posts WHERE sent != true ORDER BY pub_date DESC"
        )
        .fetch_all(&self.pool)
        .await
    }
}

A src/database/sqlite.rs => src/database/sqlite.rs +64 -0
@@ 0,0 1,64 @@
use super::Database;
use crate::models::Post;

use async_trait::async_trait;

pub struct Sqlite {
    pool: sqlx::Pool<sqlx::Sqlite>,
}

impl Sqlite {
    pub async fn new<S>(path: S) -> Result<Self, sqlx::Error>
    where
        S: AsRef<str>,
    {
        let path = path.as_ref();
        debug!("Establishing connection to sqlite database at {:?}", path);

        Ok(Self {
            pool: sqlx::sqlite::SqlitePoolOptions::new()
                .max_connections(5)
                .connect(path)
                .await?,
        })
    }
}

#[async_trait]
impl Database for Sqlite {
    type DB = sqlx::Sqlite;

    async fn migrate(&self) -> Result<(), sqlx::Error> {
        sqlx::migrate!("./migrations/sqlite")
            .run(&self.pool)
            .await
            .map_err(Into::into)
    }

    async fn acquire(&self) -> Result<sqlx::pool::PoolConnection<Self::DB>, sqlx::Error> {
        self.pool.acquire().await
    }

    async fn insert_item(&self, post: &Post) -> Result<(), sqlx::Error> {
        sqlx::query!(
            "INSERT OR IGNORE INTO posts (guid, title, url, pub_date, content) VALUES (?, ?, ?, ?, ?)",
            post.guid,
            post.title,
            post.url,
            post.pub_date,
            post.content
        )
        .execute(&self.pool)
        .await
        .map(|_| ())
    }

    async fn get_unsent(&self) -> Result<Vec<Post>, sqlx::Error> {
        sqlx::query_as!(
            Post,
            "SELECT * FROM posts WHERE sent != true ORDER BY pub_date DESC"
        )
        .fetch_all(&self.pool)
        .await
    }
}

D src/db.rs => src/db.rs +0 -20
@@ 1,20 0,0 @@
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;

use crate::models::Post;

// inserts a new post or updates an old one with the same guid
pub async fn insert_item(mut conn: PoolConnection<Sqlite>, post: &Post) -> anyhow::Result<()> {
    sqlx::query!(
        "insert or ignore into posts (guid, title, url, pub_date, content) values (?, ?, ?, ?, ?)",
        post.guid,
        post.title,
        post.url,
        post.pub_date,
        post.content
    )
    .execute(&mut conn)
    .await?;

    Ok(())
}

M src/feed.rs => src/feed.rs +23 -0
@@ 2,6 2,7 @@ use atom_syndication;
use rss;

use crate::anyhow::Context;
use crate::database::Database;
use crate::models::Post;

pub async fn fetch_new<S: AsRef<str>>(url: S) -> anyhow::Result<Vec<Post>> {


@@ 44,3 45,25 @@ pub async fn fetch_new_atom(bytes: &[u8]) -> anyhow::Result<Vec<Post>> {
        })
        .collect::<Vec<_>>())
}

pub async fn fetch_feeds<DB: Database>(db: &DB, urls: Vec<String>) -> anyhow::Result<()> {
    use tokio::task::JoinSet;

    let mut set = JoinSet::new();
    for u in urls {
        set.spawn(async move { fetch_new(u).await });
    }

    while let Some(new) = set.join_next().await {
        let posts = new??;

        for i in posts.into_iter() {
            db.insert_item(&i).await.context(format!(
                "Unable to insert item from {:?} with GUID {:?}",
                i.url, i.guid
            ))?;
        }
    }

    Ok(())
}

M src/main.rs => src/main.rs +13 -40
@@ 5,13 5,14 @@ extern crate anyhow;

pub mod cli;
pub mod config;
pub mod db;
pub mod database;
pub mod feed;
pub mod mail;
pub mod models;

use crate::{
    config::AppConfig,
    database::Database,
    mail::{get_mailer, Mail},
};
use anyhow::Context;


@@ 22,7 23,6 @@ use std::{
    sync::Arc,
};

use sqlx::{sqlite::SqlitePoolOptions, Sqlite};
use tokio::task::JoinSet;

#[tokio::main]


@@ 37,27 37,20 @@ async fn main() -> anyhow::Result<()> {
    .filter(|l| !l.starts_with('#'))
    .collect();

    let db_path = &config.database_path;
    debug!("Establishing connection to database at {:?}", db_path);
    let pool = SqlitePoolOptions::new()
        .max_connections(5)
        .connect(db_path.as_str())
        .await?;
    #[cfg(feature = "sqlite")]
    let db = database::Sqlite::new(&config.database_path).await?;
    #[cfg(feature = "postgres")]
    let db = database::Postgres::new(&config.database_path).await?;

    sqlx::migrate!("./migrations").run(&pool).await?;
    Database::migrate(&db).await?;

    if !config.no_fetch {
        fetch_feeds(urls, &pool).await?;
        feed::fetch_feeds(&db, urls).await?;
    } else {
        info!("Not fetching any feeds as \"--no-fetch\" has been passed");
    }

    let results = sqlx::query_as!(
        models::Post,
        "select * from posts where sent != true order by pub_date desc"
    )
    .fetch_all(&pool)
    .await?;
    let results = Database::get_unsent(&db).await?;

    let mailer = get_mailer(
        config.smtp_user.clone(),


@@ 68,7 61,7 @@ async fn main() -> anyhow::Result<()> {

    let mut handles = JoinSet::new();
    for result in results {
        let mut conn = pool.acquire().await?;
        let mut conn = db.acquire().await?;
        let mailer = mailer.clone();
        let config = config.clone();



@@ 95,28 88,7 @@ async fn main() -> anyhow::Result<()> {
    Ok(())
}

async fn fetch_feeds(urls: Vec<String>, pool: &sqlx::Pool<Sqlite>) -> anyhow::Result<()> {
    let mut set = JoinSet::new();
    for u in urls {
        set.spawn(async move { feed::fetch_new(u).await });
    }

    while let Some(new) = set.join_next().await {
        let posts = new??;

        for i in posts.into_iter() {
            let conn = pool.acquire().await?;
            db::insert_item(conn, &i).await.context(format!(
                "Unable to insert item from {:?} with GUID {:?}",
                i.url, i.guid
            ))?;
        }
    }

    Ok(())
}

async fn send_post<'a, E>(
async fn send_post<'a, E, DB>(
    conn: E,
    mailer: AsyncSmtpTransport<Tokio1Executor>,
    from: &'a str,


@@ 125,7 97,8 @@ async fn send_post<'a, E>(
    dry_run: bool,
) -> anyhow::Result<()>
where
    E: sqlx::Executor<'a, Database = Sqlite>,
    DB: sqlx::Database,
    E: sqlx::Executor<'a, Database = DB>,
{
    if !dry_run {
        Mail::new(post.title, post.content, post.url)