~whbboyd/russet

a5f69f24de718cee8582480fb89614d04fdf6701 — Will Boyd 4 months ago 1ff23e6
Refactor mostly-duplicated query in SqlDatabase entry impl
1 files changed, 94 insertions(+), 67 deletions(-)

M src/persistence/sql/entry.rs
M src/persistence/sql/entry.rs => src/persistence/sql/entry.rs +94 -67
@@ 113,7 113,90 @@ impl RussetEntryPersistenceLayer for SqlDatabase {
		user_id: &UserId,
		pagination: &Pagination,
	) -> Vec<Result<(Entry, Option<UserEntry>)>> {
		self.get_userentries(user_id, None, None, pagination).await
	}

	#[tracing::instrument]
	async fn get_entries_for_user_feed(
		&self,
		user_id: &UserId,
		feed_id: &FeedId,
		pagination: &Pagination,
	) -> impl IntoIterator<Item = Result<(Entry, Option<UserEntry>)>> {
		self.get_userentries(user_id, Some(feed_id), None, pagination).await
	}

	#[tracing::instrument]
	async fn get_entry_and_set_userentry(
		&self,
		entry_id: &EntryId,
		user_id: &UserId,
		user_entry: &UserEntry
	) -> Result<Entry> {
		let entry_id = entry_id.to_string();
		let user_id = user_id.to_string();
		let read: Option<i64> = user_entry.read
			.clone()
			.and_then(|timestamp| timestamp.try_into().ok());
		let tombstone: Option<i64> = user_entry.tombstone
			.clone()
			.and_then(|timestamp| timestamp.try_into().ok());
		let mut tx = self.pool.begin().await?;
		// Query the entry first to make sure it actually exists
		let row = sqlx::query!("
				SELECT
					id, feed_id, internal_id, fetch_index, article_date, title, url
				FROM entries
				WHERE id = ?;",
				entry_id,
			)
			.fetch_one(&mut *tx)
			.await?;
		sqlx::query!("
				INSERT INTO user_entry_settings (
					user_id, entry_id, read, tombstone
				) VALUES ( ?, ?, ?, ?)
				ON CONFLICT (user_id, entry_id)
				DO UPDATE SET
					read = excluded.read,
					tombstone = excluded.tombstone;",
				user_id,
				entry_id,
				read,
				tombstone,
			)
			.execute(&mut *tx)
			.await?;
		tx.commit().await?;
		let id = EntryId(Ulid::from_string(&row.id)?);
		let feed_id = FeedId(Ulid::from_string(&row.feed_id)?);
		let url = row.url.map(|url| Url::parse(&url)).transpose()?;
		Ok(Entry {
			id,
			feed_id,
			internal_id: row.internal_id,
			fetch_index: row.fetch_index as u32,
			article_date: row.article_date.into(),
			title: row.title,
			url,
		} )
	}
}

impl SqlDatabase {
	/// Helper for entry/user_entry fetching.
	async fn get_userentries(
		&self,
		user_id: &UserId,
		feed_id: Option<&FeedId>,
		entry_id: Option<&EntryId>,
		pagination: &Pagination,
	) -> Vec<Result<(Entry, Option<UserEntry>)>> {
		let user_id_str = user_id.to_string();
		let no_feed = feed_id.is_none();
		let feed_id_str = feed_id.map(|id| id.to_string());
		let no_entry = entry_id.is_none();
		let entry_id_str = entry_id.map(|id| id.to_string());
		let page_size: i64 = match pagination.page_size.try_into() {
			Ok(i) => i,
			Err(e) => return vec![Err(e.into())]


@@ 123,6 206,11 @@ impl RussetEntryPersistenceLayer for SqlDatabase {
			Err(e) => return vec![Err(e.into())]
		};
		// TODO: Maybe do paging later. Or figure out how to stream from sqlx.

		// This query is this way because in order to pass it to query!, it must
		// be a &'static str, which means no dynamically-added query clauses.
		// The (? OR id = ?) clauses allow us to skip these checks if we weren't
		// provied an ID to check against.
		let rows = sqlx::query!(r#"
				SELECT
					e.id AS "id!",


@@ 141,10 229,16 @@ impl RussetEntryPersistenceLayer for SqlDatabase {
				LEFT OUTER JOIN user_entry_settings AS u
					ON s.user_id = u.user_id AND e.id = u.entry_id
				WHERE s.user_id = ?
					AND (? OR s.feed_id = ?)
					AND (? OR e.id = ?)
				ORDER BY fetch_index DESC, article_date DESC
				LIMIT ?
				OFFSET ?;"#,
				user_id_str,
				no_feed,
				feed_id_str,
				no_entry,
				entry_id_str,
				page_size,
				page_offset,
			)


@@ 184,71 278,4 @@ impl RussetEntryPersistenceLayer for SqlDatabase {
		};
		rv
	}

	#[tracing::instrument]
	async fn get_entries_for_user_feed(
		&self,
		user_id: &UserId,
		feed_id: &FeedId,
		pagination: &Pagination,
	) -> impl IntoIterator<Item = Result<(Entry, Option<UserEntry>)>> {
		todo!();
		vec![]
	}

	#[tracing::instrument]
	async fn get_entry_and_set_userentry(
		&self,
		entry_id: &EntryId,
		user_id: &UserId,
		user_entry: &UserEntry
	) -> Result<Entry> {
		let entry_id = entry_id.to_string();
		let user_id = user_id.to_string();
		let read: Option<i64> = user_entry.read
			.clone()
			.and_then(|timestamp| timestamp.try_into().ok());
		let tombstone: Option<i64> = user_entry.tombstone
			.clone()
			.and_then(|timestamp| timestamp.try_into().ok());
		let mut tx = self.pool.begin().await?;
		// Query the entry first to make sure it actually exists
		let row = sqlx::query!("
				SELECT
					id, feed_id, internal_id, fetch_index, article_date, title, url
				FROM entries
				WHERE id = ?;",
				entry_id,
			)
			.fetch_one(&mut *tx)
			.await?;
		sqlx::query!("
				INSERT INTO user_entry_settings (
					user_id, entry_id, read, tombstone
				) VALUES ( ?, ?, ?, ?)
				ON CONFLICT (user_id, entry_id)
				DO UPDATE SET
					read = excluded.read,
					tombstone = excluded.tombstone;",
				user_id,
				entry_id,
				read,
				tombstone,
			)
			.execute(&mut *tx)
			.await?;
		tx.commit().await?;
		let id = EntryId(Ulid::from_string(&row.id)?);
		let feed_id = FeedId(Ulid::from_string(&row.feed_id)?);
		let url = row.url.map(|url| Url::parse(&url)).transpose()?;
		Ok(Entry {
			id,
			feed_id,
			internal_id: row.internal_id,
			fetch_index: row.fetch_index as u32,
			article_date: row.article_date.into(),
			title: row.title,
			url,
		} )
	}
}