~whbboyd/russet

9ad437d57735e83d88d3e849f39a93619da5f7e3 — Will Boyd 5 months ago a8aa64f
proactively cleanup expired sessions
6 files changed, 44 insertions(+), 6 deletions(-)

M Cargo.lock
M Cargo.toml
M src/domain/user.rs
M src/persistence/mod.rs
M src/persistence/sql/user.rs
M src/server.rs
M Cargo.lock => Cargo.lock +1 -1
@@ 1833,7 1833,7 @@ dependencies = [

[[package]]
name = "russet"
version = "0.9.0"
version = "0.9.1"
dependencies = [
 "argon2",
 "atom_syndication",

M Cargo.toml => Cargo.toml +1 -1
@@ 1,6 1,6 @@
[package]
name = "russet"
version = "0.9.0"
version = "0.9.1"
edition = "2021"
license = "AGPL-3.0"


M src/domain/user.rs => src/domain/user.rs +5 -0
@@ 118,6 118,11 @@ where Persistence: RussetUserPersistenceLayer {
		self.persistence.delete_sessions_for_user(&user.id).await?;
		Ok(())
	}
	
	pub async fn  cleanup_expired_sessions(&self) -> Result<()> {
		let expiry = Timestamp::new(SystemTime::now());
		self.persistence.delete_expired_sessions(&expiry).await
	}

	pub async fn auth_user(&self, token: &str) -> Result<Option<User>> {
		match self.persistence.get_user_by_session(&token).await? {

M src/persistence/mod.rs => src/persistence/mod.rs +4 -1
@@ 2,7 2,7 @@ pub mod model;
pub mod sql;

use crate::Result;
use crate::model::{ EntryId, FeedId, Pagination, UserId };
use crate::model::{ EntryId, FeedId, Pagination, Timestamp, UserId };
use model::{ Entry, Feed, Session, User, UserEntry };
use reqwest::Url;
use std::future::Future;


@@ 93,6 93,9 @@ pub trait RussetUserPersistenceLayer: Send + Sync + std::fmt::Debug + 'static {

	fn delete_session(&self, session_token: &str)
		-> impl Future<Output = Result<()>> + Send;
	
	fn delete_expired_sessions(&self, expiry: &Timestamp)
		-> impl Future<Output = Result<()>> + Send;

	async fn delete_sessions_for_user(&self, user_id: &UserId) -> Result<u32>;


M src/persistence/sql/user.rs => src/persistence/sql/user.rs +17 -3
@@ 1,4 1,4 @@
use crate::model::{ FeedId, UserId };
use crate::model::{ FeedId, Timestamp, UserId };
use crate::persistence::model::{ PasswordHash, Session, SessionToken, User };
use crate::persistence::RussetUserPersistenceLayer;
use crate::persistence::sql::SqlDatabase;


@@ 145,7 145,7 @@ impl RussetUserPersistenceLayer for SqlDatabase {
	async fn delete_session(&self, session_token: &str) -> Result<()> {
		let rows = sqlx::query!("
				DELETE FROM sessions
				WHERE token = ?",
				WHERE token = ?;",
				session_token,
			)
			.execute(&self.pool)


@@ 159,11 159,25 @@ impl RussetUserPersistenceLayer for SqlDatabase {
	}

	#[tracing::instrument]
	async fn delete_expired_sessions(&self, expiry: &Timestamp) -> Result<()> {
		let expiry = TryInto::<i64>::try_into(expiry.clone())?;
		sqlx::query!("
				DELETE FROM sessions
				wHERE expiration < ?;",
				expiry,
			)
			.execute(&self.pool)
			.await?
			.rows_affected();
		Ok(())
	}

	#[tracing::instrument]
	async fn delete_sessions_for_user(&self, user_id: &UserId) -> Result<u32> {
		let user_id = user_id.to_string();
		let rows = sqlx::query!("
				DELETE FROM sessions
				WHERE user_id = ?",
				WHERE user_id = ?;",
				user_id,
			)
			.execute(&self.pool)

M src/server.rs => src/server.rs +16 -0
@@ 3,14 3,18 @@ use crate::domain::RussetDomainService;
use crate::http::{ AppState, russet_router };
use crate::persistence::RussetPersistenceLayer;
use std::sync::Arc;
use std::time::Duration;
use tracing::{ error, info };

const SESSION_CLEANUP_INTERVAL: Duration = Duration::from_secs(3_600);

pub async fn start<Persistence>(
	domain_service: Arc<RussetDomainService<Persistence>>,
	listen: String,
) -> Result<()>
where Persistence: RussetPersistenceLayer {
	info!("Starting {}…", crate::APP_NAME);

	// Start the feed update coroutine
	let update_service = domain_service.clone();
	let check_interval = domain_service.feed_check_interval.clone();


@@ 24,6 28,18 @@ where Persistence: RussetPersistenceLayer {
		}
	} );

	// Start the expired session cleanup coroutine
	let session_cleanup_service = domain_service.clone();
	tokio::spawn(async move {
		loop {
			info!("Removing expired sessions");
			if let Err(e) = session_cleanup_service.cleanup_expired_sessions().await {
				error!(error = e.as_ref(), "Error removing expired sessions");
			}
			tokio::time::sleep(SESSION_CLEANUP_INTERVAL).await;
		}
	} );

	// Setup for Axum
	let app_state = AppState { domain_service: domain_service.clone() };
	let routes = russet_router()