~nicohman/signal-rs

c9f649c299bf94f7d76a8a152b7785a9b04149d8 — nicohman 9 months ago 1c0a90d
Use Dbus for scli
5 files changed, 182 insertions(+), 50 deletions(-)

M Cargo.lock
M Cargo.toml
M src/main.rs
M src/scli.rs
M src/signal.rs
M Cargo.lock => Cargo.lock +35 -0
@@ 376,6 376,30 @@ dependencies = [
]

[[package]]
name = "dbus"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22c08adfeb70c940c14d8af988fa854dcb5529e6141f2397e4e0fa4c9375d094"
dependencies = [
 "futures-channel",
 "futures-util",
 "libc",
 "libdbus-sys",
]

[[package]]
name = "dbus-tokio"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6840421e249bf51f6ad1873b53f7423f14e712208792dfe4b3a8ff99f713309"
dependencies = [
 "dbus",
 "libc",
 "mio 0.6.22",
 "tokio",
]

[[package]]
name = "derivative"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 1159,6 1183,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "755456fae044e6fa1ebbbd1b3e902ae19e73097ed4ed87bb79934a867c007bc3"

[[package]]
name = "libdbus-sys"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc12a3bc971424edbbf7edaf6e5740483444db63aa8e23d3751ff12a30f306f0"
dependencies = [
 "pkg-config",
]

[[package]]
name = "libhandy"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 2210,6 2243,8 @@ dependencies = [
 "cairo-rs",
 "confy",
 "crossbeam-channel",
 "dbus",
 "dbus-tokio",
 "enum_variant_type",
 "futures-util",
 "gdk",

M Cargo.toml => Cargo.toml +3 -1
@@ 32,4 32,6 @@ reqwest = "0.10.8"
libhandy = "0.7.1"
confy = "0.4.0"
serde_repr = "0.1"
glib-macros = "0.10.0"
\ No newline at end of file
glib-macros = "0.10.0"
dbus-tokio = "0.6.0"
dbus = "0.9.0"
\ No newline at end of file

M src/main.rs => src/main.rs +2 -1
@@ 6,17 6,18 @@ extern crate futures_util;
extern crate gdk_pixbuf;
extern crate gio;
extern crate glib;
extern crate glib_macros;
extern crate gtk;
extern crate libhandy;
extern crate mio;
extern crate reqwest;
extern  crate dbus;
extern crate serde;
extern crate serde_json;
extern crate tokio;
extern crate tokio_tungstenite;
extern crate tungstenite;
extern crate url;
extern crate dbus_tokio;
#[macro_use]
extern crate lazy_static;
use futures_util::StreamExt;

M src/scli.rs => src/scli.rs +141 -48
@@ 1,12 1,17 @@
use crate::Message;
use crate::*;
use config::*;
use signal::*;
use dbus::message::MatchRule;
use dbus::nonblock;
use dbus_tokio::connection;
use serde::{Deserialize, Deserializer, Serialize};
use std::io::BufRead;
use std::process::Stdio;
use futures_util::StreamExt;
use signal::*;
use std::fs;
use std::io::BufRead;
use std::path::*;
use std::process::Stdio;
use std::time::Duration;
pub struct SCLISession {
    pub contacts: HashMap<String, Contact>,
    pub messages: HashMap<String, BTreeMap<i64, Message>>,


@@ 14,10 19,14 @@ pub struct SCLISession {
    pub res_sender: glib::Sender<SignalResponse>,
}
impl SCLISession {
	pub fn save(&self) {
		let datafile = DataFile::from_data(self.contacts.clone(), self.messages.clone());
		fs::write(Path::new(&self.config.data_dir).join("data.json"), serde_json::to_string(&datafile).unwrap().as_bytes()).expect("Couldn't write data");
	}
    pub fn save(&self) {
        let datafile = DataFile::from_data(self.contacts.clone(), self.messages.clone());
        fs::write(
            Path::new(&self.config.data_dir).join("data.json"),
            serde_json::to_string(&datafile).unwrap().as_bytes(),
        )
        .expect("Couldn't write data");
    }
    pub fn fetch_contacts_scli(username: &str) -> Vec<Contact> {
        let output = Command::new("signal-cli")
            .arg("--username")


@@ 47,17 56,63 @@ impl SCLISession {
            .arg("--json")
            .stdout(Stdio::piped())
            .kill_on_drop(true);
        let (resource, conn) = connection::new_session_sync().expect("Couldn't connect to DBus");
        tokio::spawn(async {
            let err = resource.await;
            println!("Lost connection to DBus: {}", err);
        });
        let proxy = nonblock::Proxy::new(
            "org.asamk.Signal",
            "/org/asamk/Signal",
            Duration::from_secs(10),
            conn.clone(),
        );
        let mr_mr = MatchRule::new_signal("org.asamk.Signal", "MessageReceived");
        let sy_mr = MatchRule::new_signal("org.asamk.Signal", "SyncMessageReceived");
        let mut inc = conn.add_match(mr_mr).await.unwrap().msg_stream();
        let mut inc_sync = conn.add_match(sy_mr).await.unwrap().msg_stream();
        // NOTE: HOLY SHIT GO DBUS
        let mut child = cmd.spawn().expect("Couldn't run signal-cli");
        let stdout = child.stdout.take().unwrap();
        let mut reader = BufReader::new(stdout).lines();
        let cmd_handle = tokio::spawn(async { child.await });
        loop {
            tokio::select! {
                msg_line = reader.next_line() => {
            	msg  = inc.1.next() => {
            		let (timestamp, source , group_id, message, attachments) : (i64, String, Vec<u8>, String, Vec<String>) = msg.unwrap().read5().unwrap();
            		let con_msg = SignalMessage {
            			message,
            			attachment: None,
            			ID: 0,
            			outgoing: false,
            			source,
            			sent_at: timestamp,
            		};
            		res_sender.send(SignalResponse::IDLessMessage(con_msg)).expect("COuldn't send IDLessMessage");
            	},
            	msg = inc_sync.1.next() => {
            		let (timestamp, mut source, dest, group_id, message, attachments) : (i64,String,String,Vec<u8>,String,Vec<String>) = msg.unwrap().read_all().unwrap();
            		let mut outgoing = false;
            		if source == config.username {
            			source = dest;
            			outgoing = true;
            		}
            		println!("{}", message);
                       		let con_msg = SignalMessage {
            			message,
            			attachment: None,
            			ID: 0,
            			outgoing,
            			source,
            			sent_at: timestamp,
            		};
            		res_sender.send(SignalResponse::IDLessMessage(con_msg)).expect("COuldn't send IDLessMessage"); 		
            	}
                /*msg_line = reader.next_line() => {
                    println!("Incoming!");
                    let parsed : SignalCLIContainer= serde_json::from_str(&msg_line.unwrap().unwrap()).unwrap();
                    res_sender.send(SignalResponse::SignalCLIEnvelope(parsed.envelope)).expect("Couldn't send SignalCLIEnvelope");
                }
                }*/
                inc = req_receiver.recv() => {
                    if let Some(v) = inc {
                        match v {


@@ 66,6 121,17 @@ impl SCLISession {
                                //let contacts = Self::fetch_contacts_scli(config.username.as_str());
                                //res_sender.send(SignalResponse::ContactList(contacts)).expect("Can't send ContactList");
                            }
                            SignalRequest::SendMessage { to, message } => {
                            	let (timestamp,) : (i64,) = proxy.method_call("org.asamk.Signal", "sendMessage", (&message,Vec::<String>::new(), vec![&to] )).await.unwrap();
                                res_sender.send(SignalResponse::IDLessMessage(SignalMessage {
                                	message: message,
                                	source: to,
                                	outgoing: true,
                                	attachment: None,
                                	sent_at: timestamp,
                                	ID: 0,
                                })).expect("Couldnt send IDLessMessage");
                            }
                            _ => {

                            }


@@ 74,6 140,8 @@ impl SCLISession {
                }
            }
        }
        unreachable!();
        conn.remove_match(inc.0.token()).await.unwrap();
    }
    pub fn add_message(&mut self, message: SignalMessage, new: bool) {
        if !self.messages.contains_key(&message.source) {


@@ 113,7 181,7 @@ impl SCLISession {
        if let Some(v) = self.messages.get(&msg.source) {
            let last_id = v.values().map(|msg| msg.msg.ID).max().unwrap();
            let made_msg = SignalMessage {
                ID: last_id,
                ID: last_id+1,
                source: msg.source,
                message: message.unwrap(),
                outgoing: false,


@@ 132,32 200,45 @@ impl SCLISession {
            };
            self.add_message(made_msg, true);
        }
    }	
    }
}
impl SignalBackend for SCLISession {
    fn new(res_sender: glib::Sender<SignalResponse>, config: Config) -> SCLISession {
    	let scli_config = config.scli.expect("No configured SCLI settings");
    	let mut messages = HashMap::new();
        let scli_config = config.scli.expect("No configured SCLI settings");
        let mut messages = HashMap::new();
        let mut contacts = HashMap::new();
        let mut to_save = false;
    	if fs::metadata(&scli_config.data_dir).is_err() {
    		fs::create_dir_all(&scli_config.data_dir).expect("Couldn't create data directory");
    		let base_contacts = Self::fetch_contacts_scli(&scli_config.username);
    		for contact in base_contacts {
    			contacts.insert(contact.tel.clone(), contact);
    		}
    		to_save = true;
    	} else {
    		let data = DataFile::load_data(Path::new(&scli_config.data_dir).join("data.json").to_str().unwrap()).expect("Couldn't load data file");
    		for contact in data.contacts.values() {
    			contacts.insert(contact.tel.clone(), contact.clone());
    		}
    		messages = data.messages.into_iter().map(|(k, x)| {
    			(k, x.into_iter().map(|(k, msg)| {
    				(k, Message::new(msg))
    			}).collect())
    		}).collect();
    	}
        if fs::metadata(&scli_config.data_dir).is_err() {
            fs::create_dir_all(&scli_config.data_dir).expect("Couldn't create data directory");
            let base_contacts = Self::fetch_contacts_scli(&scli_config.username);
            for contact in base_contacts {
                contacts.insert(contact.tel.clone(), contact);
            }
            to_save = true;
        } else {
            let data = DataFile::load_data(
                Path::new(&scli_config.data_dir)
                    .join("data.json")
                    .to_str()
                    .unwrap(),
            )
            .expect("Couldn't load data file");
            for contact in data.contacts.values() {
                contacts.insert(contact.tel.clone(), contact.clone());
            }
            messages = data
                .messages
                .into_iter()
                .map(|(k, x)| {
                    (
                        k,
                        x.into_iter()
                            .map(|(k, msg)| (k, Message::new(msg)))
                            .collect(),
                    )
                })
                .collect();
        }
        let session = SCLISession {
            res_sender,
            config: scli_config,


@@ 165,7 246,7 @@ impl SignalBackend for SCLISession {
            contacts,
        };
        if to_save {
        	session.save();
            session.save();
        }
        session
    }


@@ 181,6 262,14 @@ impl SignalBackend for SCLISession {
                println!("Processing");
                self.process_scli_msg(msg);
            }
            SignalResponse::IDLessMessage(mut msg) => {
            	let mut id = 0;
            	if let Some(v) = self.messages.get(&msg.source) {
            		  id = v.values().map(|msg| msg.msg.ID).max().unwrap() + 1;
            	}
            	msg.ID = id;
            	self.add_message(msg, true);
            }
            SignalResponse::ContactList(contacts) => {
                for contact in contacts.into_iter() {
                    if let Some(contact_p) = self.contacts.get_mut(&contact.tel) {


@@ 198,21 287,25 @@ impl SignalBackend for SCLISession {
#[derive(Serialize, Deserialize)]
pub struct DataFile {
    pub contacts: HashMap<String, Contact>,
    pub messages: HashMap<String, BTreeMap<i64, SignalMessage>>
    pub messages: HashMap<String, BTreeMap<i64, SignalMessage>>,
}
impl DataFile {
	pub fn from_data(contacts: HashMap<String, Contact>, messages: HashMap<String, BTreeMap<i64, Message>>) -> DataFile {
		DataFile {
			contacts,
			messages: messages.into_iter().map(|(k, b)| {
				(k, b.into_iter().map(|(k, v)| {
					(k, v.msg)
				}).collect())
			}).collect()
		}
	}
	pub fn load_data(path: &str) ->  std::result::Result<DataFile, std::boxed::Box<dyn std::error::Error>> {
		let stri = fs::read_to_string(&path)?;
		Ok(serde_json::from_str(&stri)?)
	}
}
\ No newline at end of file
    pub fn from_data(
        contacts: HashMap<String, Contact>,
        messages: HashMap<String, BTreeMap<i64, Message>>,
    ) -> DataFile {
        DataFile {
            contacts,
            messages: messages
                .into_iter()
                .map(|(k, b)| (k, b.into_iter().map(|(k, v)| (k, v.msg)).collect()))
                .collect(),
        }
    }
    pub fn load_data(
        path: &str,
    ) -> std::result::Result<DataFile, std::boxed::Box<dyn std::error::Error>> {
        let stri = fs::read_to_string(&path)?;
        Ok(serde_json::from_str(&stri)?)
    }
}

M src/signal.rs => src/signal.rs +1 -0
@@ 91,6 91,7 @@ pub enum SignalResponse {
    AddHist(String, i64, bool),
    #[serde(skip)]
    ShowTheme(crate::config::Theme),
    IDLessMessage(SignalMessage),
}

#[derive(Serialize, Deserialize, Clone, Debug)]