~witcher/rss-email

2308b399c6a8a0a159e8617cec506a7cc5e4288e — witcher 1 year, 8 months ago 279e15e postgres-support
Prototype preparation for postgres support

Not everything has been refactored yet, but a few queries have been
extracted into a struct for interaction with a sqlite database.
I'm not sure whether this is the interface I'd like, especially since
the `anyhow::Result`s should be replaced with proper error types, but
this is a good start.
4 files changed, 101 insertions(+), 40 deletions(-)

M Cargo.lock
M Cargo.toml
M src/db.rs
M src/main.rs
M Cargo.lock => Cargo.lock +12 -0
@@ 38,6 38,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "508b352bb5c066aac251f6daf6b36eccd03e8a88e8081cd44959ea277a3af9a8"

[[package]]
name = "async-trait"
version = "0.1.58"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e805d94e6b5001b651426cf4cd446b1ab5f319d27bab5c644f61de0a804360c"
dependencies = [
 "proc-macro2",
 "quote",
 "syn",
]

[[package]]
name = "atoi"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 1141,6 1152,7 @@ name = "rss-email"
version = "0.2.0"
dependencies = [
 "anyhow",
 "async-trait",
 "chrono",
 "clap",
 "directories",

M Cargo.toml => Cargo.toml +1 -0
@@ 21,3 21,4 @@ log = "0.4.17"
env_logger = "0.9.0"
tokio = { version = "1.21.2", default-features = false, features = ["rt-multi-thread", "macros"] }
sqlx = { version = "0.6.2", features = ["runtime-tokio-rustls", "migrate", "sqlite", "offline"] }
async-trait = "0.1.58"

M src/db.rs => src/db.rs +72 -22
@@ 1,25 1,75 @@
use async_trait::async_trait;
use chrono::DateTime;
use rss::Item;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;

// inserts a new post or updates an old one with the same guid
pub async fn insert_item(mut conn: PoolConnection<Sqlite>, item: &Item) -> anyhow::Result<()> {
    let time = item.pub_date().map(|date| {
        DateTime::parse_from_rfc2822(date)
            .unwrap_or_else(|_| DateTime::default())
            .timestamp()
    });

    let guid = item.guid().ok_or_else(|| anyhow!("No guid found"))?.value();
    let title = item.title();
    let author = item.author();
    let url = item.link();
    let feedurl = item.source().map(|s| s.url());
    let pub_date = time;
    let content = item.content().or_else(|| item.description());

    sqlx::query!("insert or ignore into posts (guid, title, author, url, feedurl, pub_date, content) values (?, ?, ?, ?, ?, ?, ?)", guid, title, author, url, feedurl, pub_date, content).execute(&mut conn).await?;

    Ok(())
use sqlx::{pool::Pool, sqlite::SqlitePoolOptions};

#[async_trait]
pub trait DatabaseOperations<DB: sqlx::Database> {
    /// Insert a new post or updates an old one in the database with the same guid.
    async fn insert_item(&self, item: &Item) -> anyhow::Result<()>;
    /// Acquire a new database connection from the underlying connection pool.
    async fn acquire_connection(&self) -> anyhow::Result<sqlx::pool::PoolConnection<DB>>;
    async fn get_unsent_posts(&self) -> anyhow::Result<Vec<crate::models::Post>>;
}

pub struct Sqlite {
    pool: Pool<sqlx::Sqlite>,
}

impl Sqlite {
    /// Creates a new Database pool and applies migrations to the database specified in `db_path`.
    ///
    /// # Errors
    ///
    /// If a connection to the database can't be established or the migrations can't run, then an
    /// error is returned.
    // TODO: implement a custom error
    pub async fn try_new<S>(db_path: S) -> anyhow::Result<Self>
    where
        S: AsRef<str>,
    {
        let pool = SqlitePoolOptions::new()
            .max_connections(5)
            .connect(db_path.as_ref())
            .await?;
        sqlx::migrate!("./migrations").run(&pool).await?;

        Ok(Self { pool })
    }
}

#[async_trait]
impl DatabaseOperations<sqlx::Sqlite> for Sqlite {
    async fn insert_item(&self, item: &Item) -> anyhow::Result<()> {
        let time = item.pub_date().map(|date| {
            DateTime::parse_from_rfc2822(date)
                .unwrap_or_else(|_| DateTime::default())
                .timestamp()
        });

        let guid = item.guid().ok_or_else(|| anyhow!("No guid found"))?.value();
        let title = item.title();
        let author = item.author();
        let url = item.link();
        let feedurl = item.source().map(|s| s.url());
        let pub_date = time;
        let content = item.content().or_else(|| item.description());

        sqlx::query!("insert or ignore into posts (guid, title, author, url, feedurl, pub_date, content) values (?, ?, ?, ?, ?, ?, ?)", guid, title, author, url, feedurl, pub_date, content).execute(&mut self.pool.acquire().await?).await?;

        Ok(())
    }

    async fn acquire_connection(&self) -> anyhow::Result<sqlx::pool::PoolConnection<sqlx::Sqlite>> {
        Ok(self.pool.acquire().await?)
    }

    async fn get_unsent_posts(&self) -> anyhow::Result<Vec<crate::models::Post>> {
        Ok(sqlx::query_as!(
            crate::models::Post,
            "select * from posts where sent != true order by pub_date desc"
        )
        .fetch_all(&mut self.acquire_connection().await?)
        .await?)
    }
}

M src/main.rs => src/main.rs +16 -18
@@ 10,15 10,18 @@ pub mod mail;
pub mod models;
pub mod rss;

use crate::mail::{get_mailer, send_email};
use crate::{
    db::{DatabaseOperations, Sqlite},
    mail::{get_mailer, send_email},
};

use anyhow::Context;
use config::Config;
use sqlx::pool::PoolConnection;
use std::{
    fs::File,
    io::{BufRead, BufReader},
};

use sqlx::{pool::PoolConnection, sqlite::SqlitePoolOptions, Sqlite};
use tokio::task::JoinSet;

#[tokio::main]


@@ 40,12 43,7 @@ async fn main() -> anyhow::Result<()> {

    let db_path = args.database_path.unwrap();
    debug!("Establishing connection to database at {:?}", db_path);
    let pool = SqlitePoolOptions::new()
        .max_connections(5)
        .connect(db_path.to_str().unwrap())
        .await?;

    sqlx::migrate!("./migrations").run(&pool).await?;
    let db = Sqlite::try_new(db_path.to_str().unwrap()).await?;

    let mut set = JoinSet::new();
    for u in urls {


@@ 59,8 57,7 @@ async fn main() -> anyhow::Result<()> {
        debug!("Found {} new items", items.len());

        for i in items {
            let conn = pool.acquire().await?;
            db::insert_item(conn, i).await.context(format!(
            db.insert_item(i).await.context(format!(
                "Unable to insert item from {:?} with GUID {:?}",
                i.link(),
                i.guid()


@@ 68,20 65,21 @@ async fn main() -> anyhow::Result<()> {
        }
    }

    let results = sqlx::query_as!(
        models::Post,
        "select * from posts where sent != true order by pub_date desc"
    let results = db.get_unsent_posts().await?;

    send_posts(
        db.acquire_connection().await?,
        &config,
        results,
        args.dry_run,
    )
    .fetch_all(&pool)
    .await?;

    send_posts(pool.acquire().await?, &config, results, args.dry_run).await?;

    Ok(())
}

async fn send_posts(
    mut conn: PoolConnection<Sqlite>,
    mut conn: PoolConnection<sqlx::Sqlite>,
    config: &Config,
    items: Vec<models::Post>,
    dry_run: bool,