~vpzom/shoved

e9507eea70f5d7f148bf284d52905d92ad7f5671 — Colin Reeder 1 year, 17 days ago fe7fd69
Add timeout
3 files changed, 33 insertions(+), 11 deletions(-)

M Cargo.lock
M Cargo.toml
M src/main.rs
M Cargo.lock => Cargo.lock +12 -0
@@ 781,6 781,7 @@ dependencies = [
 "serde",
 "serde_json",
 "tokio",
 "tokio-stream",
 "tokio-tungstenite",
 "uuid",
]


@@ 913,6 914,17 @@ dependencies = [
]

[[package]]
name = "tokio-stream"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842"
dependencies = [
 "futures-core",
 "pin-project-lite",
 "tokio",
]

[[package]]
name = "tokio-tungstenite"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"

M Cargo.toml => Cargo.toml +1 -0
@@ 17,5 17,6 @@ rusqlite = "0.29.0"
serde = { version = "1.0.163", features = ["derive"] }
serde_json = "1.0.96"
tokio = { version = "1.28.2", features = ["macros", "rt", "sync"] }
tokio-stream = "0.1.14"
tokio-tungstenite = { version = "0.19.0", features = ["native-tls"] }
uuid = { version = "1.3.3", features = ["v4"] }

M src/main.rs => src/main.rs +20 -11
@@ 11,6 11,10 @@ mod types;

const SERVER_URL: &str = "wss://push.services.mozilla.com";

// Timeout after 9 minutes.
// Server is expected to ping every 5 minutes, so at that point the connection is probably dead
const PING_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(9 * 60);

#[derive(Debug)]
enum ConnectionMessageOut {
    Register { request_id: u64 },


@@ 67,18 71,23 @@ async fn handle_connection(

    let (sink_tx, mut sink_rx) = tokio::sync::mpsc::unbounded_channel();

    let stream = stream
        .map_err(anyhow::Error::from)
        .try_filter_map(|msg| async {
            println!("{:?}", msg);
            match msg {
                tokio_tungstenite::tungstenite::protocol::Message::Text(msg) => {
                    let msg: types::MozillaPushMessageS2A = serde_json::from_str(&msg)?;
                    Ok(Some(msg))
    // not importing this trait because it conflicts with the one from futures_util
    let stream =
        tokio_stream::StreamExt::timeout(stream.map_err(anyhow::Error::from), PING_TIMEOUT)
            .map(|item| match item {
                Ok(value) => value,
                Err(_) => Err(anyhow::anyhow!("Timeout")),
            })
            .try_filter_map(|msg| async {
                println!("{:?}", msg);
                match msg {
                    tokio_tungstenite::tungstenite::protocol::Message::Text(msg) => {
                        let msg: types::MozillaPushMessageS2A = serde_json::from_str(&msg)?;
                        Ok(Some(msg))
                    }
                    _ => Ok(None),
                }
                _ => Ok(None),
            }
        });
            });

    sink.send(types::MozillaPushMessageA2S::Hello {
        uaid,