~yujiri/libsufec

cb323691644f9d01bf790b2d1c09e2497b86899e — Evin Yulo 1 year, 9 months ago 099bbcf
improve failed send model
1 files changed, 55 insertions(+), 29 deletions(-)

M src/store.rs
M src/store.rs => src/store.rs +55 -29
@@ 23,15 23,14 @@ impl Store {
		if let Some(cache) = self.cache.get_mut(&room_id) {
			cache.push(m.clone());
		}
		let stmt = "INSERT INTO messages VALUES (?, ?, ?, ?, ?, ?, ?)";
		let stmt = "INSERT INTO messages VALUES (?, ?, ?, ?, ?, ?)";
		self.conn.prepare(stmt).unwrap().into_cursor().bind(&[
			sqlite::Value::Integer(room_id),
			sqlite::Value::Integer(m.timestamp as i64),
			sqlite::Value::Binary(m.sender.to_bytes()),
			sqlite::Value::Binary(m.content.to_bytes()),
			sqlite::Value::Integer(m.hash_sent as i64),
			sqlite::Value::Integer(m.all_verified as i64),
			sqlite::Value::Integer(m.failed as i64),
			sqlite::Value::Integer(m.status as u8 as i64),
		]).unwrap().try_next().unwrap();
	}
	pub fn get(&mut self, room_id: RoomId) -> &[HistoryEntry] {


@@ 65,8 64,7 @@ impl Store {
			sender: SufecAddr::from_bytes(row[1].as_binary().unwrap()).unwrap(),
			content: MessageContent::from_bytes(row[2].as_binary().unwrap()).unwrap(),
			hash_sent: row[3].as_integer().unwrap() == 1,
			all_verified: row[4].as_integer().unwrap() == 1,
			failed: row[5].as_integer().unwrap() == 1,
			status: (row[4].as_integer().unwrap() as u8).try_into().unwrap(),
		}
	}
	pub fn check_hashes(&self, room: &Room, entry: &HistoryEntry) -> bool {


@@ 122,7 120,7 @@ impl Store {
		]).unwrap().try_next().unwrap();
		if let Some(cache) = self.cache.get_mut(&room_id) {
			let msg = cache.iter_mut().find(|m| m.timestamp == timestamp).unwrap();
			msg.all_verified = true;
			msg.status = Status::Verified;
		}
	}
	pub fn add_pending(&mut self, room_id: RoomId, recipient: &SufecAddr, message: &Message) {


@@ 142,14 140,15 @@ impl Store {
			sqlite::Value::Binary(recipient.to_bytes()),
			sqlite::Value::Integer(timestamp as i64),
		]).unwrap().try_next().unwrap();
		let stmt = "UPDATE messages SET failed = 1 WHERE room_id = ? AND timestamp = ?";
		let stmt = "UPDATE messages SET status = ? WHERE room_id = ? AND timestamp = ?";
		self.conn.prepare(stmt).unwrap().into_cursor().bind(&[
			sqlite::Value::Integer(Status::Failed as u8 as i64),
			sqlite::Value::Integer(room_id as i64),
			sqlite::Value::Integer(timestamp as i64),
		]).unwrap().try_next().unwrap();
		if let Some(cache) = self.cache.get_mut(&room_id) {
			if let Some(msg) = cache.iter_mut().find(|m| m.timestamp == timestamp) {
				msg.failed = true;
				msg.status = Status::Failed;
			}
		}
	}


@@ 159,22 158,25 @@ impl Store {
			sqlite::Value::Binary(recipient.to_bytes()),
			sqlite::Value::Integer(timestamp as i64),
		]).unwrap().try_next().unwrap();
		// Determine if there are any other failed sends of the message.
		let query = "SELECT * FROM pending WHERE recipient = ? AND timestamp = ? AND error NOT NULL";
		let no_failures = self.conn.prepare(query).unwrap().into_cursor().bind(&[
			sqlite::Value::Binary(recipient.to_bytes()),
		// Determine if there are any other pending sends of the message.
		let query = "SELECT error FROM pending WHERE room_id = ? AND timestamp = ? ORDER BY error NULLS LAST LIMIT 1";
		let mut cursor = self.conn.prepare(query).unwrap().into_cursor().bind(&[
			sqlite::Value::Integer(room_id as i64),
			sqlite::Value::Integer(timestamp as i64),
		]).unwrap().try_next().unwrap().is_none();
		if no_failures {
			let stmt = "UPDATE messages SET failed = 0 WHERE room_id = ? AND timestamp = ?";
			self.conn.prepare(stmt).unwrap().into_cursor().bind(&[
				sqlite::Value::Integer(room_id as i64),
				sqlite::Value::Integer(timestamp as i64),
			]).unwrap().try_next().unwrap();
			if let Some(cache) = self.cache.get_mut(&room_id) {
				if let Some(msg) = cache.iter_mut().find(|m| m.timestamp == timestamp) {
					msg.failed = false;
				}
		]).unwrap();
		let status = match cursor.try_next().unwrap() {
			None => Status::Sent,
			Some(r) => if r[0].kind() == sqlite::Type::Null { Status::Sending } else { Status::Failed },
		};
		let stmt = "UPDATE messages SET status = ? WHERE room_id = ? AND timestamp = ?";
		self.conn.prepare(stmt).unwrap().into_cursor().bind(&[
			sqlite::Value::Integer(status as u8 as i64),
			sqlite::Value::Integer(room_id as i64),
			sqlite::Value::Integer(timestamp as i64),
		]).unwrap().try_next().unwrap();
		if let Some(cache) = self.cache.get_mut(&room_id) {
			if let Some(msg) = cache.iter_mut().find(|m| m.timestamp == timestamp) {
				msg.status = status;
			}
		}
	}


@@ 200,8 202,7 @@ CREATE TABLE messages (
	sender blob not null,
	content blob not null,
	hash_sent integer not null,
	all_verified integer not null,
	failed integer not null
	status integer not null
);
CREATE TABLE hashes (
	room_id integer not null,


@@ 217,20 218,45 @@ CREATE TABLE pending (
	error text
)";

const SELECT_MESSAGES: &str = "SELECT timestamp, sender, content, hash_sent, all_verified, failed FROM messages";

const SELECT_MESSAGES: &str = "SELECT timestamp, sender, content, hash_sent, status FROM messages";

const MAX_HASHES_PER_MESSAGE: usize = u8::MAX as usize;

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[repr(u8)]
/// For incoming messages, only `Sent` and `Verified` are used.
pub enum Status {
	/// No sends have failed, but not all have succeeded.
	Sending = 0,
	/// At least one send has failed.
	Failed = 1,
	/// All sends have succeeded, but not all recipients have verified it.
	Sent = 2,
	/// All recipients have verified it.
	Verified = 3,
}
impl TryFrom<u8> for Status {
	type Error = ();
	fn try_from(n: u8) -> Result<Status, Self::Error> {
		Ok(match n {
			0 => Status::Sending,
			1 => Status::Failed,
			2 => Status::Sent,
			3 => Status::Verified,
			_ => return Err(()),
		})
	}
}

#[derive(Clone, Debug)]
pub struct HistoryEntry {
	pub sender: SufecAddr,
	pub timestamp: u64,
	pub content: MessageContent,
	pub all_verified: bool,
	// Only meaningful for incoming messages: whether we have sent out a hash of this.
	pub hash_sent: bool,
	// Only meaningful for outgiong messages: whether it failed to send to at least one recipient.
	pub failed: bool,
	pub status: Status,
}
impl HistoryEntry {
	pub fn hash(&self) -> Digest {