~gotlou/p2p-file-transfer

3e6ab6792931c03333db4f3096bcd89214f8d4de — Saksham Mittal 1 year, 10 months ago 49acb6c reliable
WIP: reliable packet transfer
4 files changed, 102 insertions(+), 19 deletions(-)

M src/client.rs
M src/main.rs
M src/protocol.rs
M src/server.rs
M src/client.rs => src/client.rs +11 -1
@@ 18,6 18,7 @@ pub struct Client {
    lastpacket: usize,
    filesize: usize,
    state: protocol::ClientState,
    lastmsg: Vec<u8>,
}

pub fn init(


@@ 29,7 30,7 @@ pub fn init(
    let fd = Arc::new(Mutex::new(
        File::create(&filename).expect("Couldn't create file!"),
    ));

    let emptyvec = Vec::<u8>::new();
    let client_obj = Client {
        socket,
        file: fd,


@@ 39,6 40,7 @@ pub fn init(
        lastpacket: 0,
        filesize: 0,
        state: protocol::ClientState::NoState,
        lastmsg: emptyvec,
    };
    Arc::new(Mutex::new(client_obj))
}


@@ 51,6 53,7 @@ impl Client {
    pub async fn init_connection(&mut self) {
        println!("Client has new connection to make!");
        let filereq = protocol::send_req(&self.filename, &self.authtoken);
        self.lastmsg = filereq.to_vec();
        protocol::send(&self.socket, &filereq).await;
        self.state = protocol::ClientState::ACKorNACK;
    }


@@ 73,6 76,10 @@ impl Client {
        false
    }

    pub async fn resend_msg(&self) {
        protocol::send(&self.socket, &self.lastmsg).await;
    }

    //pass message received here to determine what to do; action will be taken asynchronously
    pub async fn process_msg(
        &mut self,


@@ 107,11 114,13 @@ impl Client {
                if decision {
                    println!("Sending ACK");
                    protocol::send(&self.socket, &protocol::ACK.to_vec()).await;
                    self.lastmsg = protocol::ACK.to_vec();
                    println!("Sent ACK");
                    self.state = protocol::ClientState::SendFile;
                    true
                } else {
                    println!("Stopping transfer");
                    self.lastmsg = protocol::NACK.to_vec();
                    protocol::send(&self.socket, &protocol::NACK.to_vec()).await;
                    println!("Sent NACK");
                    self.state = protocol::ClientState::EndConn;


@@ 162,6 171,7 @@ impl Client {
                self.lastpacket
            );
            self.lastpacket += protocol::MTU;
            self.lastmsg = protocol::last_received_packet(self.lastpacket).to_vec();
            protocol::send(
                &self.socket,
                &protocol::last_received_packet(self.lastpacket),

M src/main.rs => src/main.rs +49 -16
@@ 3,6 3,7 @@ use std::fs::File;
use std::io;
use std::io::Read;
use std::sync::Arc;
use tokio::time::timeout;
use tokio::net::UdpSocket;

mod auth;


@@ 72,12 73,28 @@ async fn serve(filename: &String, authtoken: &String) {
    //main loop which listens for connections and serves data depending on stage
    loop {
        let mut buf = [0u8; protocol::MTU];
        let (amt, src) = protocol::recv(&socket, &mut buf).await;
        server_obj
            .lock()
            .await
            .process_msg(&src, buf, amt, server_obj.clone())
            .await;
        let f = protocol::recv(&socket, &mut buf);
        match timeout(protocol::MAX_WAIT_TIME, f).await {
            Ok((amt,src)) => {
                if protocol::parse_mack(buf, amt) {
                    server_obj
                        .lock()
                        .await
                        .process_msg(&src, buf, amt, server_obj.clone())
                        .await;
                } else if protocol::parse_resend(buf, amt) {
                    //resend last packet
                    server_obj
                        .lock()
                        .await
                        .resend_msg(&src)
                        .await;
                }
            },
            Err(_) => {
            },
        }
        
    }
}



@@ 120,17 137,33 @@ async fn client(file_to_get: &String, filename: &String, authtoken: &String) {
    //listen for server responses and deal with them accordingly
    loop {
        let mut buf = [0u8; protocol::MTU];
        let (amt, _) = protocol::recv(&socket, &mut buf).await;
        //make sure program exits gracefully
        let continue_with_loop = client_obj
            .lock()
            .await
            .process_msg(buf, amt, client_obj.clone())
            .await;
        if !continue_with_loop {
            println!("Client exiting...");
            break;
        //let (amt, _) = protocol::recv(&socket, &mut buf).await;
        let f = protocol::recv(&socket, &mut buf);
        match timeout(protocol::MAX_WAIT_TIME, f).await {
            Ok((amt,_)) => {
                if protocol::parse_mack(buf, amt) {
                    //make sure program exits gracefully
                    let continue_with_loop = client_obj
                        .lock()
                        .await
                        .process_msg(buf, amt, client_obj.clone())
                        .await;
                    if !continue_with_loop {
                        println!("Client exiting...");
                        break;
                    }
                } else if protocol::parse_resend(buf, amt) {
                    //resend last packet
                    client_obj.lock().await.resend_msg().await;
                }
            },
            Err(_) => {
                println!("Not received!");
                println!("Sending resend packet...");
                protocol::send(&socket, &protocol::RESEND.to_vec()).await;
            },
        }
        
    }
}


M src/protocol.rs => src/protocol.rs +20 -2
@@ 15,7 15,7 @@ pub enum ClientState {
    EndConn,
}

const MAX_WAIT_TIME : u64 = 2;
pub const MAX_WAIT_TIME : Duration = Duration::from_secs(10);

pub async fn init_nat_traversal(socket: Arc<UdpSocket>, other_machine: &String) {
    thread::sleep(Duration::from_secs(5));


@@ 38,7 38,7 @@ pub async fn init_nat_traversal(socket: Arc<UdpSocket>, other_machine: &String) 
            println!("Sent useless message to get firewall to open up...");
            let mut buf = [0u8; MTU];
            let f = recv(&socket, &mut buf);
            match timeout(Duration::from_secs(MAX_WAIT_TIME), f).await {
            match timeout(MAX_WAIT_TIME, f).await {
                Ok(_) => {
                    println!("Seemed to get some data from somewhere, perhaps it is other machine");
                    connected = true;


@@ 124,6 124,24 @@ pub fn parse_end(message: [u8; MTU], amt: usize) -> bool {
    false
}

pub const MACK : [u8; 4] = *b"MACK";

pub fn parse_mack(message: [u8; MTU], amt: usize) -> bool {
    if amt == 4 && message[..4] == MACK {
        return true;
    }
    false
}

pub const RESEND : [u8; 6] = *b"RESEND";

pub fn parse_resend(message: [u8; MTU], amt: usize) -> bool {
    if amt == 6 && message[..6] == RESEND {
        return true;
    }
    false
}

fn parse_generic_req(message: [u8; MTU], amt: usize) -> String {
    let req = match str::from_utf8(&message[..amt]) {
        Ok(x) => x.to_string(),

M src/server.rs => src/server.rs +22 -0
@@ 17,6 17,7 @@ pub struct Server {
    dummy_size_msg: Vec<u8>,
    src_state_map: HashMap<SocketAddr, ClientState>,
    authchecker: auth::AuthChecker,
    lastmsgs: HashMap<SocketAddr, Vec<u8>>,
}

pub fn init(


@@ 25,6 26,7 @@ pub fn init(
    filename: String,
    authtoken: String,
) -> Arc<Mutex<Server>> {
    let emptyvec = Vec::<u8>::new();
    let filesize = data.len();
    let server_obj = Server {
        socket,


@@ 33,6 35,7 @@ pub fn init(
        dummy_size_msg: protocol::filesize_packet(0),
        src_state_map: HashMap::new(),
        authchecker: auth::init(authtoken, filename),
        lastmsgs: HashMap::new(),
    };
    Arc::new(Mutex::new(server_obj))
}


@@ 100,16 103,30 @@ impl Server {
        }
    }

    pub async fn resend_msg(&mut self, src : &SocketAddr) {
        println!("Resending data...");
        if let Some(msg) = self.lastmsgs.get(src) {
            protocol::send_to(&self.socket, src, msg).await;
        }
    }

    fn change_src_state(&mut self, src: &SocketAddr, newstate: ClientState) {
        if let Some(_v) = self.src_state_map.remove(src) {
            self.src_state_map.insert(*src, newstate);
        }
    }

    fn change_lastmsg(&mut self, src: &SocketAddr, msg : Vec<u8>) {
        if let Some(_v) = self.lastmsgs.remove(src) {
            self.lastmsgs.insert(*src, msg);
        }
    }

    async fn end_connection(&mut self, src: &SocketAddr) {
        println!("Sending END to {}", src);
        protocol::send_to(&self.socket, src, &protocol::END.to_vec()).await;
        self.src_state_map.remove(src);
        self.lastmsgs.remove(src);
    }

    async fn initiate_transfer_server(


@@ 123,6 140,7 @@ impl Server {
            println!("Client authentication check succeeded...");
            println!("Sending client size of file");
            protocol::send_to(&self.socket, src, &self.size_msg).await;
            self.change_lastmsg(src, self.size_msg.to_vec());
            println!("Awaiting response from client...");
            self.change_src_state(src, ClientState::ACKorNACK);
        } else {


@@ 130,6 148,8 @@ impl Server {
            println!("Client was not able to be authenticated!");
            println!("Sending 0 size file...");
            protocol::send_to(&self.socket, src, &self.dummy_size_msg).await;
            self.change_lastmsg(src, self.dummy_size_msg.to_vec());

            //end connection
            self.end_connection(src).await;
        }


@@ 172,6 192,7 @@ impl Server {
                &self.data[offset..offset + protocol::MTU].to_vec(),
            )
            .await;
            self.change_lastmsg(src, self.data[offset..offset + protocol::MTU].to_vec());
        } else {
            //send remaining data and end connection
            protocol::send_to(


@@ 180,6 201,7 @@ impl Server {
                &self.data[offset..self.data.len()].to_vec(),
            )
            .await;
            self.change_lastmsg(src, self.data[offset..self.data.len()].to_vec());
            println!("File sent completely");
            self.end_connection(src).await;
        }