From cb323691644f9d01bf790b2d1c09e2497b86899e Mon Sep 17 00:00:00 2001 From: Evin Yulo Date: Fri, 16 Dec 2022 08:13:44 -0500 Subject: [PATCH] improve failed send model --- src/store.rs | 84 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 55 insertions(+), 29 deletions(-) diff --git a/src/store.rs b/src/store.rs index 33b2399..08a30b6 100644 --- a/src/store.rs +++ b/src/store.rs @@ -23,15 +23,14 @@ impl Store { if let Some(cache) = self.cache.get_mut(&room_id) { cache.push(m.clone()); } - let stmt = "INSERT INTO messages VALUES (?, ?, ?, ?, ?, ?, ?)"; + let stmt = "INSERT INTO messages VALUES (?, ?, ?, ?, ?, ?)"; self.conn.prepare(stmt).unwrap().into_cursor().bind(&[ sqlite::Value::Integer(room_id), sqlite::Value::Integer(m.timestamp as i64), sqlite::Value::Binary(m.sender.to_bytes()), sqlite::Value::Binary(m.content.to_bytes()), sqlite::Value::Integer(m.hash_sent as i64), - sqlite::Value::Integer(m.all_verified as i64), - sqlite::Value::Integer(m.failed as i64), + sqlite::Value::Integer(m.status as u8 as i64), ]).unwrap().try_next().unwrap(); } pub fn get(&mut self, room_id: RoomId) -> &[HistoryEntry] { @@ -65,8 +64,7 @@ impl Store { sender: SufecAddr::from_bytes(row[1].as_binary().unwrap()).unwrap(), content: MessageContent::from_bytes(row[2].as_binary().unwrap()).unwrap(), hash_sent: row[3].as_integer().unwrap() == 1, - all_verified: row[4].as_integer().unwrap() == 1, - failed: row[5].as_integer().unwrap() == 1, + status: (row[4].as_integer().unwrap() as u8).try_into().unwrap(), } } pub fn check_hashes(&self, room: &Room, entry: &HistoryEntry) -> bool { @@ -122,7 +120,7 @@ impl Store { ]).unwrap().try_next().unwrap(); if let Some(cache) = self.cache.get_mut(&room_id) { let msg = cache.iter_mut().find(|m| m.timestamp == timestamp).unwrap(); - msg.all_verified = true; + msg.status = Status::Verified; } } pub fn add_pending(&mut self, room_id: RoomId, recipient: &SufecAddr, message: &Message) { @@ -142,14 +140,15 @@ impl Store { sqlite::Value::Binary(recipient.to_bytes()), sqlite::Value::Integer(timestamp as i64), ]).unwrap().try_next().unwrap(); - let stmt = "UPDATE messages SET failed = 1 WHERE room_id = ? AND timestamp = ?"; + let stmt = "UPDATE messages SET status = ? WHERE room_id = ? AND timestamp = ?"; self.conn.prepare(stmt).unwrap().into_cursor().bind(&[ + sqlite::Value::Integer(Status::Failed as u8 as i64), sqlite::Value::Integer(room_id as i64), sqlite::Value::Integer(timestamp as i64), ]).unwrap().try_next().unwrap(); if let Some(cache) = self.cache.get_mut(&room_id) { if let Some(msg) = cache.iter_mut().find(|m| m.timestamp == timestamp) { - msg.failed = true; + msg.status = Status::Failed; } } } @@ -159,22 +158,25 @@ impl Store { sqlite::Value::Binary(recipient.to_bytes()), sqlite::Value::Integer(timestamp as i64), ]).unwrap().try_next().unwrap(); - // Determine if there are any other failed sends of the message. - let query = "SELECT * FROM pending WHERE recipient = ? AND timestamp = ? AND error NOT NULL"; - let no_failures = self.conn.prepare(query).unwrap().into_cursor().bind(&[ - sqlite::Value::Binary(recipient.to_bytes()), + // Determine if there are any other pending sends of the message. + let query = "SELECT error FROM pending WHERE room_id = ? AND timestamp = ? ORDER BY error NULLS LAST LIMIT 1"; + let mut cursor = self.conn.prepare(query).unwrap().into_cursor().bind(&[ + sqlite::Value::Integer(room_id as i64), sqlite::Value::Integer(timestamp as i64), - ]).unwrap().try_next().unwrap().is_none(); - if no_failures { - let stmt = "UPDATE messages SET failed = 0 WHERE room_id = ? AND timestamp = ?"; - self.conn.prepare(stmt).unwrap().into_cursor().bind(&[ - sqlite::Value::Integer(room_id as i64), - sqlite::Value::Integer(timestamp as i64), - ]).unwrap().try_next().unwrap(); - if let Some(cache) = self.cache.get_mut(&room_id) { - if let Some(msg) = cache.iter_mut().find(|m| m.timestamp == timestamp) { - msg.failed = false; - } + ]).unwrap(); + let status = match cursor.try_next().unwrap() { + None => Status::Sent, + Some(r) => if r[0].kind() == sqlite::Type::Null { Status::Sending } else { Status::Failed }, + }; + let stmt = "UPDATE messages SET status = ? WHERE room_id = ? AND timestamp = ?"; + self.conn.prepare(stmt).unwrap().into_cursor().bind(&[ + sqlite::Value::Integer(status as u8 as i64), + sqlite::Value::Integer(room_id as i64), + sqlite::Value::Integer(timestamp as i64), + ]).unwrap().try_next().unwrap(); + if let Some(cache) = self.cache.get_mut(&room_id) { + if let Some(msg) = cache.iter_mut().find(|m| m.timestamp == timestamp) { + msg.status = status; } } } @@ -200,8 +202,7 @@ CREATE TABLE messages ( sender blob not null, content blob not null, hash_sent integer not null, - all_verified integer not null, - failed integer not null + status integer not null ); CREATE TABLE hashes ( room_id integer not null, @@ -217,20 +218,45 @@ CREATE TABLE pending ( error text )"; -const SELECT_MESSAGES: &str = "SELECT timestamp, sender, content, hash_sent, all_verified, failed FROM messages"; + +const SELECT_MESSAGES: &str = "SELECT timestamp, sender, content, hash_sent, status FROM messages"; const MAX_HASHES_PER_MESSAGE: usize = u8::MAX as usize; +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +#[repr(u8)] +/// For incoming messages, only `Sent` and `Verified` are used. +pub enum Status { + /// No sends have failed, but not all have succeeded. + Sending = 0, + /// At least one send has failed. + Failed = 1, + /// All sends have succeeded, but not all recipients have verified it. + Sent = 2, + /// All recipients have verified it. + Verified = 3, +} +impl TryFrom for Status { + type Error = (); + fn try_from(n: u8) -> Result { + Ok(match n { + 0 => Status::Sending, + 1 => Status::Failed, + 2 => Status::Sent, + 3 => Status::Verified, + _ => return Err(()), + }) + } +} + #[derive(Clone, Debug)] pub struct HistoryEntry { pub sender: SufecAddr, pub timestamp: u64, pub content: MessageContent, - pub all_verified: bool, // Only meaningful for incoming messages: whether we have sent out a hash of this. pub hash_sent: bool, - // Only meaningful for outgiong messages: whether it failed to send to at least one recipient. - pub failed: bool, + pub status: Status, } impl HistoryEntry { pub fn hash(&self) -> Digest { -- 2.45.2