~vpzom/shoved

35e4b5fbb90fb15212840949d40900450f63d2cc — Colin Reeder 1 year, 17 days ago e9507ee
Implement exponential backoff for reconnections
1 files changed, 36 insertions(+), 1 deletions(-)

M src/main.rs
M src/main.rs => src/main.rs +36 -1
@@ 15,6 15,9 @@ const SERVER_URL: &str = "wss://push.services.mozilla.com";
// Server is expected to ping every 5 minutes, so at that point the connection is probably dead
const PING_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(9 * 60);

const START_BACKOFF_TIME: std::time::Duration = std::time::Duration::from_secs(1);
const MAX_BACKOFF_TIME: std::time::Duration = std::time::Duration::from_secs(10 * 60);

#[derive(Debug)]
enum ConnectionMessageOut {
    Register { request_id: u64 },


@@ 55,6 58,7 @@ async fn handle_connection(
    existing_channel_ids: Vec<String>,
    channel_tx: tokio::sync::mpsc::UnboundedSender<ConnectionMessageIn>,
    mut channel_rx: tokio::sync::mpsc::UnboundedReceiver<ConnectionMessageOut>,
    successfully_connected_tx: tokio::sync::mpsc::Sender<()>,
) -> Result<(), anyhow::Error> {
    let (conn, _) = tokio_tungstenite::connect_async(SERVER_URL).await?;



@@ 112,6 116,7 @@ async fn handle_connection(
                    let channel_tx = channel_tx.clone();
                    let register_map = &register_map;
                    let sink_tx = sink_tx.clone();
                    let successfully_connected_tx = successfully_connected_tx.clone();
                    async move {
                        println!("{:?}", msg);



@@ 121,6 126,16 @@ async fn handle_connection(
                                    anyhow::bail!("Unexpected status in Hello: {}", status);
                                }

                                match successfully_connected_tx.try_send(()) {
                                    Ok(_) => {}
                                    Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
                                        panic!("Received multiple Hello responses")
                                    }
                                    Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
                                        panic!("somehow lost controller")
                                    }
                                }

                                channel_tx.send(ConnectionMessageIn::SetUAID(uaid))?;
                            }
                            types::MozillaPushMessageS2A::Notification {


@@ 367,13 382,33 @@ async fn main() {
        async move {
            futures_util::try_join!(
                async {
                    let mut backoff_time = START_BACKOFF_TIME;
                    loop {
                        let (successfully_connected_tx, mut successfully_connected_rx) = tokio::sync::mpsc::channel(1);
                        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
                        out_tx_send.send(out_tx).unwrap();
                        let uaid_copy = uaid.lock().unwrap().clone();
                        let existing_channel_ids_copy = existing_channel_ids.lock().unwrap().clone();
                        let result = handle_connection(uaid_copy, existing_channel_ids_copy, in_tx.clone(), out_rx).await;
                        let result = handle_connection(uaid_copy, existing_channel_ids_copy, in_tx.clone(), out_rx, successfully_connected_tx).await;
                        println!("connection ended with {:?}", result);

                        let successfully_connected = match successfully_connected_rx.try_recv() {
                            Ok(_) => true,
                            Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => false,
                            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => panic!("somehow successfully_connected_tx still exists"),
                        };

                        if successfully_connected {
                            backoff_time = START_BACKOFF_TIME;
                        } else {
                            println!("connection failed, waiting {} seconds", backoff_time.as_secs());
                            tokio::time::sleep(backoff_time).await;

                            backoff_time *= 2;
                            if backoff_time > MAX_BACKOFF_TIME {
                                backoff_time = MAX_BACKOFF_TIME;
                            }
                        }
                    }

                    // for type inference