~hime/protochat

752914e8e162347fbc1a647249a94651851c67c4 — drbawb 11 months ago 98d70f9
(linetest/net) fix issue w/ malformed packets

- Currently malformed packets will termiante the decoder. The protocol is,
  at present, not robust enough to continue following loss of sync. This decision
  may be revisited later if the protocol ever grows some packet framing.

- Added a `quit` channel to allow packet decoding task failure to cascade into
  the main event loop itself.
3 files changed, 21 insertions(+), 4 deletions(-)

M Cargo.lock
M linetest/Cargo.toml
M linetest/src/shell/net.rs
M Cargo.lock => Cargo.lock +1 -0
@@ 315,6 315,7 @@ dependencies = [
 "thiserror",
 "unicode-segmentation",
 "unicode-width",
 "void",
]

[[package]]

M linetest/Cargo.toml => linetest/Cargo.toml +1 -0
@@ 20,6 20,7 @@ textwrap = "0.12"
thiserror = "1.0"
unicode-segmentation = "1.0"
unicode-width = "0.1"
void = "1.0"

[dependencies.smolboi]
version = "0.1.0"

M linetest/src/shell/net.rs => linetest/src/shell/net.rs +19 -4
@@ 189,8 189,13 @@ async fn net_worker_task(
) -> anyhow::Result<()> {
    // start packet decoding thread
    let (packets_tx, packets_rx) = async_channel::unbounded();
        
    let (quit_tx, quit_rx) = async_channel::unbounded::<void::Void>();

    // copy these fields so they don't move into task
    let mut packet_decoding_sock = sock.clone();
    let packet_shell_outbox = shell_mx.outbox.clone();
    let packet_conn_id = conn.id; 

    smol::Task::spawn(async move {
        loop {
            match decode_packet(&mut packet_decoding_sock).await {


@@ 201,14 206,21 @@ async fn net_worker_task(
                },

                Err(msg) => {
                    //warn!("malformed packet from client: {:?}", msg);
                    continue; // TODO: kill the client?

                    let log_line = ConnMsg::LogLine { 
                        msg: format!("FATAL: conn({}) received malformed packet: {:?}.", packet_conn_id, msg), 
                    };

                    let _ = packet_shell_outbox.send(log_line).await;
                    break;
                },   
            }
        }

        drop(quit_tx);
    }).detach();

    //send registration packet
    // send out registration packet
    let packet = Packet::Register {
        name: conn.name.to_string(),
    };


@@ 227,6 239,7 @@ async fn net_worker_task(

    let mut packets_rx = packets_rx.fuse();
    let mut task_rx = shell_mx.inbox.fuse();
    let mut quit_rx = quit_rx.fuse();

    let mut client = Client::with(conn, &sock, shell_mx.outbox.clone());



@@ 253,6 266,8 @@ async fn net_worker_task(

                None => { break },
            },

            quit = quit_rx.next() => if quit.is_none() { break },
        }
    }