~hime/protochat

45744c5763ac53dde6447aa97a060e9076a42093 — Robert Straw 8 months ago 7a5732a + 9760ede master
Merge branch 'feature/rustfmt-spaces'
A .editorconfig => .editorconfig +5 -0
@@ 0,0 1,5 @@
root = true

[*.rs]
indent_style = space
indent_size = 4

M linetest/src/main.rs => linetest/src/main.rs +14 -16
@@ 2,28 2,26 @@ use std::thread;

use futures_util::future;


mod shell;

fn main() -> anyhow::Result<()> {
	// create channels just to keep task thread alive
	//let (chan_tx, chan_rx) = async_channel::bounded(1024);

    // create channels just to keep task thread alive
    //let (chan_tx, chan_rx) = async_channel::bounded(1024);

	// run shell on its own dedicated thread
	let ui_handle = thread::spawn(move || { shell::ui::start_task() });
    // run shell on its own dedicated thread
    let ui_handle = thread::spawn(move || shell::ui::start_task());

	// start async worker pool
	// TODO: moar threads?
	let _threadpool_handle = thread::spawn(move || { smol::run(future::pending::<()>()) });
    // start async worker pool
    // TODO: moar threads?
    let _threadpool_handle = thread::spawn(move || smol::run(future::pending::<()>()));

	// probably having a bad day if we get here ...
	if let Err(msg) = ui_handle.join() {
		panic!("err: {:?}", msg);
	}
    // probably having a bad day if we get here ...
    if let Err(msg) = ui_handle.join() {
        panic!("err: {:?}", msg);
    }

	// TODO: gracefully shutdown threadpool
	println!("goodbye ...");
    // TODO: gracefully shutdown threadpool
    println!("goodbye ...");

	Ok(())
    Ok(())
}

M linetest/src/shell/mod.rs => linetest/src/shell/mod.rs +134 -126
@@ 1,4 1,4 @@
use async_channel::{Sender, Receiver, TryRecvError};
use async_channel::{Receiver, Sender, TryRecvError};
use std::collections::hash_map::HashMap;
use std::net::{SocketAddr, ToSocketAddrs};
use thiserror::Error;


@@ 8,159 8,167 @@ pub mod ui;

#[derive(Debug, Error)]
pub enum Error {
	#[error("could not parse at least one socket address.")]
	NoSocketAddr,
    #[error("could not parse at least one socket address.")]
    NoSocketAddr,

	#[error("command missing required argument")]
	MissingArgument,
    #[error("command missing required argument")]
    MissingArgument,

	#[error("could not parse the number in position: {}", pos)]
	ParseError {
		pos: &'static str,
    #[error("could not parse the number in position: {}", pos)]
    ParseError {
        pos: &'static str,

		#[source]
		source: std::num::ParseIntError,
	},
        #[source]
        source: std::num::ParseIntError,
    },

	#[error("specified connection does not exist!?")]
	MissingConnectionId,
    #[error("specified connection does not exist!?")]
    MissingConnectionId,
}

/// These are operations which can be exchanged between the terminal
/// interface and the network connection task.
pub enum ConnMsg {
	/// A line from the client to be shown in the top-level system log.
	LogLine { msg: String },
    /// A line from the client to be shown in the top-level system log.
    LogLine { msg: String },

	/// A line from the client to be sent/displayed in the corresponding buffer.
	/// Where `id` is the name of an associated room.
	BufLine { id: String, msg: String },
    /// A line from the client to be sent/displayed in the corresponding buffer.
    /// Where `id` is the name of an associated room.
    BufLine { id: String, msg: String },

	/// The server rejected our connection (temporarily)
	RejectTemp { reason: String },
    /// The server rejected our connection (temporarily)
    RejectTemp { reason: String },

	/// Change username
	ChangeNick { name: String },
    /// Change username
    ChangeNick { name: String },

	/// Join room
	JoinRoom { room: String },
    /// Join room
    JoinRoom { room: String },

	/// Part room
	PartRoom { room: String },
    /// Part room
    PartRoom { room: String },
}

#[derive(Clone, Debug)]
pub struct Connection {
	id: i64,
	name: String,
	addr: SocketAddr,
	is_connected: bool,
    id: i64,
    name: String,
    addr: SocketAddr,
    is_connected: bool,
}

pub struct ConnectionMap {
	next_connection_id: i64,
	connections: HashMap<i64, Connection>,
	
	socket_rx: HashMap<i64, Receiver<ConnMsg>>,
	socket_tx: HashMap<i64, Sender<ConnMsg>>,
    next_connection_id: i64,
    connections: HashMap<i64, Connection>,

    socket_rx: HashMap<i64, Receiver<ConnMsg>>,
    socket_tx: HashMap<i64, Sender<ConnMsg>>,
}

/// This is the mailbox which should be passed to the new connection task.
/// This allows the UI thread & network thread to communicate bidirectionally
/// by exchaning `ConnMsg` structs.
pub struct ConnectionMailbox {
	outbox: Sender<ConnMsg>,
	inbox: Receiver<ConnMsg>,

    outbox: Sender<ConnMsg>,
    inbox: Receiver<ConnMsg>,
}

impl ConnectionMap {
	pub fn new() -> Self {
		Self {
			next_connection_id: 0,
			connections: HashMap::new(),			
			socket_rx: HashMap::new(),
			socket_tx: HashMap::new(),
		}
	}

	/// Creates a pending connection and returns the corresponding
	/// connection id & channel.
	pub fn create_connection<Sock: ToSocketAddrs>(&mut self, addr: Sock, name: &str) -> anyhow::Result<(i64, ConnectionMailbox)> {
		// fetch an ID & increment the counter
		let next_id = self.next_connection_id;
		self.next_connection_id += 1;

		let addr = addr.to_socket_addrs()?
			.next()
			.ok_or_else(|| { Error::NoSocketAddr })?;

		// create a new connection
		let (net_outbox_tx, net_outbox_rx) = async_channel::bounded(1024);
		let (net_inbox_tx, net_inbox_rx) = async_channel::bounded(1024);

		let connection = Connection {
			id: next_id,
			name: name.to_string(),
			addr: addr,
			is_connected: false,
		};

		if let Some(_) = self.connections.insert(next_id, connection) {
			// TODO: handle the collision?
			panic!("BUG: collision in connection map.");
		}

		if let Some(_) = self.socket_rx.insert(next_id, net_outbox_rx) {
			// TODO: handle the collision?
			panic!("BUG: collision in connection map.");
		}

		if let Some(_) = self.socket_tx.insert(next_id, net_inbox_tx) {
			// TODO: handle the collision?
			panic!("BUG: collision in connection map.");
		}

		Ok((next_id, ConnectionMailbox { outbox: net_outbox_tx, inbox: net_inbox_rx }))
	}

	pub fn poll_messages(&mut self) -> HashMap<i64, Vec<ConnMsg>> {
		let mut messages = HashMap::new();

		let mut dead_peers = vec![];

		// drain messages for each connection, non-blocking
		for (id, conn) in &self.socket_rx {
			let mut msg_q = vec![];

			loop {
				match conn.try_recv() {
					Ok(msg) => msg_q.push(msg),
					Err(TryRecvError::Empty) => break,
					Err(TryRecvError::Closed) => {
						dead_peers.push(*id);
						break;
					},
				}
			}

			if let Some(_) = messages.insert(*id, msg_q) {
				panic!("BUG: collision in incoming messages map");
			}
		}

		self.reap_dead_peers(&dead_peers);

		return messages;
	}

	fn reap_dead_peers(&mut self, ids: &Vec<i64>) {
		for peer_id in ids {
			self.socket_rx.remove(&peer_id);
			self.socket_tx.remove(&peer_id);
			self.connections.remove(&peer_id);
		}

	}
}
\ No newline at end of file
    pub fn new() -> Self {
        Self {
            next_connection_id: 0,
            connections: HashMap::new(),
            socket_rx: HashMap::new(),
            socket_tx: HashMap::new(),
        }
    }

    /// Creates a pending connection and returns the corresponding
    /// connection id & channel.
    pub fn create_connection<Sock: ToSocketAddrs>(
        &mut self,
        addr: Sock,
        name: &str,
    ) -> anyhow::Result<(i64, ConnectionMailbox)> {
        // fetch an ID & increment the counter
        let next_id = self.next_connection_id;
        self.next_connection_id += 1;

        let addr = addr
            .to_socket_addrs()?
            .next()
            .ok_or_else(|| Error::NoSocketAddr)?;

        // create a new connection
        let (net_outbox_tx, net_outbox_rx) = async_channel::bounded(1024);
        let (net_inbox_tx, net_inbox_rx) = async_channel::bounded(1024);

        let connection = Connection {
            id: next_id,
            name: name.to_string(),
            addr: addr,
            is_connected: false,
        };

        if let Some(_) = self.connections.insert(next_id, connection) {
            // TODO: handle the collision?
            panic!("BUG: collision in connection map.");
        }

        if let Some(_) = self.socket_rx.insert(next_id, net_outbox_rx) {
            // TODO: handle the collision?
            panic!("BUG: collision in connection map.");
        }

        if let Some(_) = self.socket_tx.insert(next_id, net_inbox_tx) {
            // TODO: handle the collision?
            panic!("BUG: collision in connection map.");
        }

        Ok((
            next_id,
            ConnectionMailbox {
                outbox: net_outbox_tx,
                inbox: net_inbox_rx,
            },
        ))
    }

    pub fn poll_messages(&mut self) -> HashMap<i64, Vec<ConnMsg>> {
        let mut messages = HashMap::new();

        let mut dead_peers = vec![];

        // drain messages for each connection, non-blocking
        for (id, conn) in &self.socket_rx {
            let mut msg_q = vec![];

            loop {
                match conn.try_recv() {
                    Ok(msg) => msg_q.push(msg),
                    Err(TryRecvError::Empty) => break,
                    Err(TryRecvError::Closed) => {
                        dead_peers.push(*id);
                        break;
                    }
                }
            }

            if let Some(_) = messages.insert(*id, msg_q) {
                panic!("BUG: collision in incoming messages map");
            }
        }

        self.reap_dead_peers(&dead_peers);

        return messages;
    }

    fn reap_dead_peers(&mut self, ids: &Vec<i64>) {
        for peer_id in ids {
            self.socket_rx.remove(&peer_id);
            self.socket_tx.remove(&peer_id);
            self.connections.remove(&peer_id);
        }
    }
}

M linetest/src/shell/net.rs => linetest/src/shell/net.rs +385 -375
@@ 1,418 1,428 @@
use crate::shell::{self, Connection, ConnMsg};
use crate::shell::{self, ConnMsg, Connection};
use smolboi::protocol::packet::Packet;
use std::net::TcpStream;

use async_channel::Sender;
use async_dup::Arc;
use byteorder::{ReadBytesExt, WriteBytesExt, NetworkEndian};
use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
use futures_util::{select, AsyncReadExt, AsyncWriteExt, StreamExt};
use smol::Async;
use thiserror::Error;

#[derive(Debug, Error)]
enum ClientError {
	#[error("an unspecified error has occurred.")]
	Unspecified,
    #[error("an unspecified error has occurred.")]
    Unspecified,

	#[error("the packet was well-formed, but not expected at this time.")]
	InvalidState,
    #[error("the packet was well-formed, but not expected at this time.")]
    InvalidState,
}

#[derive(Debug, Eq, PartialEq)]
enum ClientState {
	Connecting,
	Connected,
    Connecting,
    Connected,
}

#[derive(Debug)]
struct Client<S> {
	conn: Connection,
	outbox: Sender<ConnMsg>,
	sock: S,
	state: ClientState,
    conn: Connection,
    outbox: Sender<ConnMsg>,
    sock: S,
    state: ClientState,
}

impl<S> Client<S> 
where S: AsyncReadExt + AsyncWriteExt + Unpin {
	pub fn with(conn: Connection, sock: S, outbox: Sender<ConnMsg>) -> Self {
		Self {
			conn: conn,
			sock: sock,
			outbox: outbox,
			state: ClientState::Connecting,
		}
	}

	/// Updates client state based on incoming packet.
	/// The state change will be relayed to the parent task which created this client
	/// by way of the `outbox` parameter supplied to the `with` constructor.
	pub async fn handle_net_pkt(&mut self, pkt: Packet) -> anyhow::Result<()> {
		match self.state {
			ClientState::Connecting => match pkt {
				Packet::RegistrationAccepted { name } => {
					// !!! use the name the server accepted !!!
					//
					// this avoids race where user tries to change nick before
					// the server finishes sending the accept/reject packet for
					// the outstanding request.
					self.conn.name = name;
					self.state = ClientState::Connected;
					
					let log_line = ConnMsg::LogLine { msg: "registered ... ok.".to_string() };
					self.outbox.send(log_line).await?;
					Ok(())
				},

				Packet::RegistrationRejected => {
					let msg = ConnMsg::RejectTemp { 
						reason: "[server]: registration rejected. try another username.".to_string() 
					};

					Ok(self.outbox.send(msg).await?)
				},

				Packet::Notice { body } => {
					let msg = ConnMsg::LogLine { msg: format!("wtf: {}", body) };
					Ok(self.outbox.send(msg).await?)
				},

				_ => Err(ClientError::InvalidState.into()),
			},

			ClientState::Connected => match pkt {
				Packet::MessageRoom { room, from, body } => {
					let msg = ConnMsg::BufLine {
						id: room,
						msg: format!("{}: {}", from, body),
					};

					Ok(self.outbox.send(msg).await?)
				},

				Packet::Notice { body } => {
					let msg = ConnMsg::LogLine { msg: body };
					Ok(self.outbox.send(msg).await?)
				},

				_ => Err(ClientError::InvalidState.into()),
			},
		}
	}

	pub async fn handle_shell_msg(&mut self, msg: ConnMsg) -> anyhow::Result<()> {
		match self.state {
			ClientState::Connecting => match msg {
				ConnMsg::ChangeNick { name } => {
					Ok(write_packet(&mut self.sock, Packet::Register { name: name }).await?)
				},

				_ => Err(ClientError::Unspecified.into()),
			},

			ClientState::Connected => match msg {
				ConnMsg::BufLine { id, msg } => {
					let packet = Packet::MessageRoom { 
						room: id,
						from: self.conn.name.clone(),
						body: msg,
					};

					Ok(write_packet(&mut self.sock, packet).await?)
				},

				ConnMsg::JoinRoom { room } => {
					Ok(write_packet(&mut self.sock, Packet::Join {  room: room }).await?)
				},

				ConnMsg::PartRoom { room } => {
					Ok(write_packet(&mut self.sock, Packet::Part { room: room }).await?)
				},

				_ => Err(ClientError::Unspecified.into()),
			},
		}
	}
impl<S> Client<S>
where
    S: AsyncReadExt + AsyncWriteExt + Unpin,
{
    pub fn with(conn: Connection, sock: S, outbox: Sender<ConnMsg>) -> Self {
        Self {
            conn: conn,
            sock: sock,
            outbox: outbox,
            state: ClientState::Connecting,
        }
    }

    /// Updates client state based on incoming packet.
    /// The state change will be relayed to the parent task which created this client
    /// by way of the `outbox` parameter supplied to the `with` constructor.
    pub async fn handle_net_pkt(&mut self, pkt: Packet) -> anyhow::Result<()> {
        match self.state {
            ClientState::Connecting => match pkt {
                Packet::RegistrationAccepted { name } => {
                    // !!! use the name the server accepted !!!
                    //
                    // this avoids race where user tries to change nick before
                    // the server finishes sending the accept/reject packet for
                    // the outstanding request.
                    self.conn.name = name;
                    self.state = ClientState::Connected;

                    let log_line = ConnMsg::LogLine {
                        msg: "registered ... ok.".to_string(),
                    };
                    self.outbox.send(log_line).await?;
                    Ok(())
                }

                Packet::RegistrationRejected => {
                    let msg = ConnMsg::RejectTemp {
                        reason: "[server]: registration rejected. try another username."
                            .to_string(),
                    };

                    Ok(self.outbox.send(msg).await?)
                }

                Packet::Notice { body } => {
                    let msg = ConnMsg::LogLine {
                        msg: format!("wtf: {}", body),
                    };
                    Ok(self.outbox.send(msg).await?)
                }

                _ => Err(ClientError::InvalidState.into()),
            },

            ClientState::Connected => match pkt {
                Packet::MessageRoom { room, from, body } => {
                    let msg = ConnMsg::BufLine {
                        id: room,
                        msg: format!("{}: {}", from, body),
                    };

                    Ok(self.outbox.send(msg).await?)
                }

                Packet::Notice { body } => {
                    let msg = ConnMsg::LogLine { msg: body };
                    Ok(self.outbox.send(msg).await?)
                }

                _ => Err(ClientError::InvalidState.into()),
            },
        }
    }

    pub async fn handle_shell_msg(&mut self, msg: ConnMsg) -> anyhow::Result<()> {
        match self.state {
            ClientState::Connecting => match msg {
                ConnMsg::ChangeNick { name } => {
                    Ok(write_packet(&mut self.sock, Packet::Register { name: name }).await?)
                }

                _ => Err(ClientError::Unspecified.into()),
            },

            ClientState::Connected => match msg {
                ConnMsg::BufLine { id, msg } => {
                    let packet = Packet::MessageRoom {
                        room: id,
                        from: self.conn.name.clone(),
                        body: msg,
                    };

                    Ok(write_packet(&mut self.sock, packet).await?)
                }

                ConnMsg::JoinRoom { room } => {
                    Ok(write_packet(&mut self.sock, Packet::Join { room: room }).await?)
                }

                ConnMsg::PartRoom { room } => {
                    Ok(write_packet(&mut self.sock, Packet::Part { room: room }).await?)
                }

                _ => Err(ClientError::Unspecified.into()),
            },
        }
    }
}

async fn write_packet<W>(wtr: &mut W, packet: Packet) -> anyhow::Result<()>
	where W: AsyncWriteExt + Unpin {
		let packet_buf = packet.encode()?;
		let packet_len = packet_buf.len();
		assert!(packet_len <= u16::MAX.into());

		let mut packet_sz = [0u8; 2];
		(&mut packet_sz[..]).write_u16::<NetworkEndian>(packet_len as u16)?;
		
		wtr.write(&packet_sz).await?;
		wtr.write(&packet_buf).await?;
		Ok(())
	}

async fn decode_packet<R>(stream: &mut R) -> anyhow::Result<Packet> 
where R: AsyncReadExt + Unpin {
	let mut packet_header_buf = [0u8; 2];
	stream.read_exact(&mut packet_header_buf).await?;

	let packet_sz = (&packet_header_buf[..]).read_u16::<NetworkEndian>()?;

	assert!(packet_sz <= u16::MAX);
	let mut packet_body_buf = vec![0u8; packet_sz.into()];
	stream.read_exact(&mut packet_body_buf).await?;

	Ok(Packet::decode(&packet_body_buf)?)
where
    W: AsyncWriteExt + Unpin,
{
    let packet_buf = packet.encode()?;
    let packet_len = packet_buf.len();
    assert!(packet_len <= u16::MAX.into());

    let mut packet_sz = [0u8; 2];
    (&mut packet_sz[..]).write_u16::<NetworkEndian>(packet_len as u16)?;

    wtr.write(&packet_sz).await?;
    wtr.write(&packet_buf).await?;
    Ok(())
}

async fn decode_packet<R>(stream: &mut R) -> anyhow::Result<Packet>
where
    R: AsyncReadExt + Unpin,
{
    let mut packet_header_buf = [0u8; 2];
    stream.read_exact(&mut packet_header_buf).await?;

    let packet_sz = (&packet_header_buf[..]).read_u16::<NetworkEndian>()?;

    assert!(packet_sz <= u16::MAX);
    let mut packet_body_buf = vec![0u8; packet_sz.into()];
    stream.read_exact(&mut packet_body_buf).await?;

    Ok(Packet::decode(&packet_body_buf)?)
}

/// Starts a connection to the server specified in the background.
/// This returns control to the caller once the task has been sent to the
/// executor, at which point the caller can begin polling the associated
/// task mailbox.
/// 
///
/// Returns an error in the event the TCP socket could not be opeend.
pub async fn start_task(
	conn: Connection,
	shell_mx: shell::ConnectionMailbox,
    conn: Connection,
    shell_mx: shell::ConnectionMailbox,
) -> anyhow::Result<()> {
	let sock = Arc::new(Async::<TcpStream>::connect(conn.addr).await?);

	smol::Task::spawn(async {
		let task_outbox = shell_mx.outbox.clone();
		if let Err(msg) = net_worker_task(conn, sock, shell_mx).await {
			let error_log_ln = format!("BUG: net worker exited unexpectedly {:?}", msg);
			let _ = task_outbox.send(ConnMsg::LogLine { msg: error_log_ln }).await;
		}
	}).detach();

	Ok(())
    let sock = Arc::new(Async::<TcpStream>::connect(conn.addr).await?);

    smol::Task::spawn(async {
        let task_outbox = shell_mx.outbox.clone();
        if let Err(msg) = net_worker_task(conn, sock, shell_mx).await {
            let error_log_ln = format!("BUG: net worker exited unexpectedly {:?}", msg);
            let _ = task_outbox
                .send(ConnMsg::LogLine { msg: error_log_ln })
                .await;
        }
    })
    .detach();

    Ok(())
}

async fn net_worker_task(
	conn: Connection,
	mut sock: Arc<Async<TcpStream>>,
	shell_mx: shell::ConnectionMailbox
    conn: Connection,
    mut sock: Arc<Async<TcpStream>>,
    shell_mx: shell::ConnectionMailbox,
) -> anyhow::Result<()> {
	// start packet decoding thread
	let (packets_tx, packets_rx) = async_channel::unbounded();
	let (quit_tx, quit_rx) = async_channel::unbounded::<void::Void>();

	// copy these fields so they don't move into task
	let mut packet_decoding_sock = sock.clone();
	let packet_shell_outbox = shell_mx.outbox.clone();
	let packet_conn_id = conn.id; 

	smol::Task::spawn(async move {
		loop {
			match decode_packet(&mut packet_decoding_sock).await {
				Ok(packet) => { 
					if let Err(_) = packets_tx.send(packet).await {
						panic!("BUG: sent packet but connection event loop is gone?");
					}
				},

				Err(msg) => {

					let log_line = ConnMsg::LogLine { 
						msg: format!("FATAL: conn({}) received malformed packet: {:?}.", packet_conn_id, msg), 
					};

					let _ = packet_shell_outbox.send(log_line).await;
					break;
				},   
			}
		}

		drop(quit_tx);
	}).detach();

	// send out registration packet
	let packet = Packet::Register {
		name: conn.name.to_string(),
	};

	let packet_buf = packet.encode()?;
	if packet_buf.len() > u16::MAX.into() {
		panic!("BUG: user crafted >64KiB packet, this is not legal.");
	} // TODO: use that fancy `anyhow` error macro?

	let mut packet_sz = [0u8; 2];
	(&mut packet_sz[..]).write_u16::<NetworkEndian>(packet_buf.len() as u16)?;

	sock.write(&packet_sz).await?;
	sock.write(&packet_buf).await?;


	let mut packets_rx = packets_rx.fuse();
	let mut task_rx = shell_mx.inbox.fuse();
	let mut quit_rx = quit_rx.fuse();

	let mut client = Client::with(conn, sock.clone(), shell_mx.outbox.clone());

	loop {
		select! {
			packet = packets_rx.next() => match packet {
				Some(packet) => {
					if let Err(msg) = client.handle_net_pkt(packet).await {
						let error_log_ln = format!("client error: {:?}", msg);
						let _ = shell_mx.outbox.send(ConnMsg::LogLine { msg: error_log_ln }).await;
					}
				},

				None => { break },
			},

			task_msg = task_rx.next() => match task_msg {
				Some(task_msg) => {
					if let Err(msg) = client.handle_shell_msg(task_msg).await {
						let error_log_ln = format!("client error: {:?}", msg);
						let _ = shell_mx.outbox.send(ConnMsg::LogLine { msg: error_log_ln }).await;
					}
				},

				None => { break },
			},

			quit = quit_rx.next() => if quit.is_none() { break },
		}
	}

	let log_line = ConnMsg::LogLine { msg: "warning! net worker hung up!".to_string() };
	Ok(shell_mx.outbox.send(log_line).await?)
    // start packet decoding thread
    let (packets_tx, packets_rx) = async_channel::unbounded();
    let (quit_tx, quit_rx) = async_channel::unbounded::<void::Void>();

    // copy these fields so they don't move into task
    let mut packet_decoding_sock = sock.clone();
    let packet_shell_outbox = shell_mx.outbox.clone();
    let packet_conn_id = conn.id;

    smol::Task::spawn(async move {
        loop {
            match decode_packet(&mut packet_decoding_sock).await {
                Ok(packet) => {
                    if let Err(_) = packets_tx.send(packet).await {
                        panic!("BUG: sent packet but connection event loop is gone?");
                    }
                }

                Err(msg) => {
                    let log_line = ConnMsg::LogLine {
                        msg: format!(
                            "FATAL: conn({}) received malformed packet: {:?}.",
                            packet_conn_id, msg
                        ),
                    };

                    let _ = packet_shell_outbox.send(log_line).await;
                    break;
                }
            }
        }

        drop(quit_tx);
    })
    .detach();

    // send out registration packet
    let packet = Packet::Register {
        name: conn.name.to_string(),
    };

    let packet_buf = packet.encode()?;
    if packet_buf.len() > u16::MAX.into() {
        panic!("BUG: user crafted >64KiB packet, this is not legal.");
    } // TODO: use that fancy `anyhow` error macro?

    let mut packet_sz = [0u8; 2];
    (&mut packet_sz[..]).write_u16::<NetworkEndian>(packet_buf.len() as u16)?;

    sock.write(&packet_sz).await?;
    sock.write(&packet_buf).await?;

    let mut packets_rx = packets_rx.fuse();
    let mut task_rx = shell_mx.inbox.fuse();
    let mut quit_rx = quit_rx.fuse();

    let mut client = Client::with(conn, sock.clone(), shell_mx.outbox.clone());

    loop {
        select! {
            packet = packets_rx.next() => match packet {
                Some(packet) => {
                    if let Err(msg) = client.handle_net_pkt(packet).await {
                        let error_log_ln = format!("client error: {:?}", msg);
                        let _ = shell_mx.outbox.send(ConnMsg::LogLine { msg: error_log_ln }).await;
                    }
                },

                None => { break },
            },

            task_msg = task_rx.next() => match task_msg {
                Some(task_msg) => {
                    if let Err(msg) = client.handle_shell_msg(task_msg).await {
                        let error_log_ln = format!("client error: {:?}", msg);
                        let _ = shell_mx.outbox.send(ConnMsg::LogLine { msg: error_log_ln }).await;
                    }
                },

                None => { break },
            },

            quit = quit_rx.next() => if quit.is_none() { break },
        }
    }

    let log_line = ConnMsg::LogLine {
        msg: "warning! net worker hung up!".to_string(),
    };
    Ok(shell_mx.outbox.send(log_line).await?)
}

#[cfg(test)]
mod tests {
	use async_channel::unbounded;
	use core::pin::Pin;
	use futures_util::io::{self, AsyncRead, AsyncWrite, IoSlice, IoSliceMut};
	use futures_util::task::{Context, Poll};
	use sluice::pipe::{pipe, PipeReader, PipeWriter};
	use super::*;


	struct FakeTcpStream {
		rx: PipeReader,
		tx: PipeWriter,
	}

	impl FakeTcpStream {
		pub fn new() -> (Self, Self) {
			let (rx_a, tx_a) = pipe();
			let (rx_b, tx_b) = pipe();

			// create a "crossover" cable
			let socket_a = Self { rx: rx_b, tx: tx_a };
			let socket_b = Self { rx: rx_a, tx: tx_b };

			(socket_a, socket_b)
		}
	}

	impl AsyncRead for FakeTcpStream {
		fn poll_read(
			mut self: Pin<&mut Self>,
			cx: &mut Context<'_>,
			buf: &mut [u8]
		) -> Poll<io::Result<usize>> {
			PipeReader::poll_read(Pin::new(&mut self.rx), cx, buf)
		}

		fn poll_read_vectored(
			mut self: Pin<&mut Self>,
			cx: &mut Context<'_>,
			bufs: &mut [IoSliceMut<'_>]
		) -> Poll<io::Result<usize>> {
			PipeReader::poll_read_vectored(Pin::new(&mut self.rx), cx, bufs)
		}
	}

	impl AsyncWrite for FakeTcpStream {
		fn poll_write(
			mut self: Pin<&mut Self>,
			cx: &mut Context<'_>,
			buf: &[u8]
		) -> Poll<io::Result<usize>> {
			PipeWriter::poll_write(Pin::new(&mut self.tx), cx, buf)
		}

		fn poll_write_vectored(
			mut self: Pin<&mut Self>,
			cx: &mut Context<'_>,
			bufs: &[IoSlice<'_>]
		) -> Poll<io::Result<usize>> {
			PipeWriter::poll_write_vectored(Pin::new(&mut self.tx), cx, bufs)
		}

		fn poll_flush(
			mut self: Pin<&mut Self>,
			cx: &mut Context<'_>
		) -> Poll<io::Result<()>> {
			PipeWriter::poll_flush(Pin::new(&mut self.tx), cx)
		}

		fn poll_close(mut self: Pin<&mut Self>,
			cx: &mut Context<'_>
		) -> Poll<io::Result<()>> {
			PipeWriter::poll_close(Pin::new(&mut self.tx), cx)
		}
	}


	const CLIENT_NAME: &'static str = "Chuck Testa";
	const CLIENT_ADDR: &'static str = "127.0.0.1:1234";

	fn init_fake_connection() -> Connection {
		Connection {
			id: 0,
			name: CLIENT_NAME.to_string(),
			addr: CLIENT_ADDR.parse().unwrap(),
			is_connected: false,
		}
	}

	#[test]
	fn client_accepted_registration() -> anyhow::Result<()> {
		// initial setup 
		let (client_sock, _server_sock) = FakeTcpStream::new();
		let (mx_tx, _mx_rx) = unbounded();

		// an initialized client should not be considered connected ...
		let mut client = Client::with(init_fake_connection(), client_sock, mx_tx);
		assert_eq!(client.state, ClientState::Connecting);

		// an accepted registration should result in a state change
		let resp = Packet::RegistrationAccepted { 
			name: "Chuck Testa".to_string()
		};

		smol::block_on(client.handle_net_pkt(resp))?;
		assert_eq!(client.state, ClientState::Connected);

		Ok(())
	}

	#[test]
	fn client_rejected_registration() -> anyhow::Result<()> {
		// initial setup 
		let (client_sock, mut server_sock) = FakeTcpStream::new();
		let (mx_tx, _mx_rx) = unbounded();
		let mut client = Client::with(init_fake_connection(), client_sock, mx_tx);

		// rejected client should still be in Connecting state
		smol::block_on(client.handle_net_pkt(Packet::RegistrationRejected))?;
		assert_eq!(client.state, ClientState::Connecting);

		// tell client to change nick
		let new_name = "newer_more_different_name";
		smol::block_on(client.handle_shell_msg(ConnMsg::ChangeNick {
			name: new_name.to_string()
		}))?;

		// the "server" should have received a registration packet for the new name
		let req = smol::block_on(smolboi::util::read_packet(&mut server_sock.rx))?;
		if let Packet::Register { name } = req {
			assert_eq!(name, new_name);
		}

		Ok(())
	}

    use super::*;
    use async_channel::unbounded;
    use core::pin::Pin;
    use futures_util::io::{self, AsyncRead, AsyncWrite, IoSlice, IoSliceMut};
    use futures_util::task::{Context, Poll};
    use sluice::pipe::{pipe, PipeReader, PipeWriter};

    struct FakeTcpStream {
        rx: PipeReader,
        tx: PipeWriter,
    }

    impl FakeTcpStream {
        pub fn new() -> (Self, Self) {
            let (rx_a, tx_a) = pipe();
            let (rx_b, tx_b) = pipe();

            // create a "crossover" cable
            let socket_a = Self { rx: rx_b, tx: tx_a };
            let socket_b = Self { rx: rx_a, tx: tx_b };

            (socket_a, socket_b)
        }
    }

    impl AsyncRead for FakeTcpStream {
        fn poll_read(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
            buf: &mut [u8],
        ) -> Poll<io::Result<usize>> {
            PipeReader::poll_read(Pin::new(&mut self.rx), cx, buf)
        }

        fn poll_read_vectored(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
            bufs: &mut [IoSliceMut<'_>],
        ) -> Poll<io::Result<usize>> {
            PipeReader::poll_read_vectored(Pin::new(&mut self.rx), cx, bufs)
        }
    }

    impl AsyncWrite for FakeTcpStream {
        fn poll_write(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
            buf: &[u8],
        ) -> Poll<io::Result<usize>> {
            PipeWriter::poll_write(Pin::new(&mut self.tx), cx, buf)
        }

        fn poll_write_vectored(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
            bufs: &[IoSlice<'_>],
        ) -> Poll<io::Result<usize>> {
            PipeWriter::poll_write_vectored(Pin::new(&mut self.tx), cx, bufs)
        }

        fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
            PipeWriter::poll_flush(Pin::new(&mut self.tx), cx)
        }

        fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
            PipeWriter::poll_close(Pin::new(&mut self.tx), cx)
        }
    }

    const CLIENT_NAME: &'static str = "Chuck Testa";
    const CLIENT_ADDR: &'static str = "127.0.0.1:1234";

    fn init_fake_connection() -> Connection {
        Connection {
            id: 0,
            name: CLIENT_NAME.to_string(),
            addr: CLIENT_ADDR.parse().unwrap(),
            is_connected: false,
        }
    }

    #[test]
    fn client_accepted_registration() -> anyhow::Result<()> {
        // initial setup
        let (client_sock, _server_sock) = FakeTcpStream::new();
        let (mx_tx, _mx_rx) = unbounded();

        // an initialized client should not be considered connected ...
        let mut client = Client::with(init_fake_connection(), client_sock, mx_tx);
        assert_eq!(client.state, ClientState::Connecting);

        // an accepted registration should result in a state change
        let resp = Packet::RegistrationAccepted {
            name: "Chuck Testa".to_string(),
        };

        smol::block_on(client.handle_net_pkt(resp))?;
        assert_eq!(client.state, ClientState::Connected);

        Ok(())
    }

    #[test]
    fn client_rejected_registration() -> anyhow::Result<()> {
        // initial setup
        let (client_sock, mut server_sock) = FakeTcpStream::new();
        let (mx_tx, _mx_rx) = unbounded();
        let mut client = Client::with(init_fake_connection(), client_sock, mx_tx);

        // rejected client should still be in Connecting state
        smol::block_on(client.handle_net_pkt(Packet::RegistrationRejected))?;
        assert_eq!(client.state, ClientState::Connecting);

        // tell client to change nick
        let new_name = "newer_more_different_name";
        smol::block_on(client.handle_shell_msg(ConnMsg::ChangeNick {
            name: new_name.to_string(),
        }))?;

        // the "server" should have received a registration packet for the new name
        let req = smol::block_on(smolboi::util::read_packet(&mut server_sock.rx))?;
        if let Packet::Register { name } = req {
            assert_eq!(name, new_name);
        }

        Ok(())
    }
}

M linetest/src/shell/ui/buffer.rs => linetest/src/shell/ui/buffer.rs +339 -323
@@ 1,350 1,366 @@
use super::{COLOR_TITLE_BG, COLOR_TITLE_FG};
use crossterm::{
	cursor,
	style::{self, Print},
	terminal::{Clear, ClearType},
	QueueableCommand,
    cursor,
    style::{self, Print},
    terminal::{Clear, ClearType},
    QueueableCommand,
};
use std::cmp;
use std::io;
use super::{COLOR_TITLE_BG, COLOR_TITLE_FG};
use textwrap;

struct Span {
	line_idx: usize,
	lines: Vec<String>,
    line_idx: usize,
    lines: Vec<String>,
}

impl Span {
	pub fn new(idx: usize, buf: &str, cols: u16) -> Self {
		let wrapped_str = textwrap::fill(buf, cols.into());

		let lines = wrapped_str
			.lines()
			.map(|line| { line.to_owned() })
			.collect::<Vec<_>>();

		Self { line_idx: idx, lines: lines }
	}

	pub fn line_iter(&self) -> impl DoubleEndedIterator<Item = &String> {
		self.lines.iter()
	}
    pub fn new(idx: usize, buf: &str, cols: u16) -> Self {
        let wrapped_str = textwrap::fill(buf, cols.into());

        let lines = wrapped_str
            .lines()
            .map(|line| line.to_owned())
            .collect::<Vec<_>>();

        Self {
            line_idx: idx,
            lines: lines,
        }
    }

    pub fn line_iter(&self) -> impl DoubleEndedIterator<Item = &String> {
        self.lines.iter()
    }
}
/// The `Buffer` stores a list of lines which need to be displayed 
/// The `Buffer` stores a list of lines which need to be displayed
/// in a fixed-width context. Lines can be added to the buffer and
/// then displayed at any arbitrary resolution later.
pub struct Buffer {
	cols: u16,
	rows: u16,
    cols: u16,
    rows: u16,

	lines: Vec<String>,
	current_line_ptr: usize,
	max_line_ptr: usize,
    lines: Vec<String>,
    current_line_ptr: usize,
    max_line_ptr: usize,

	needs_repaint: bool,
    needs_repaint: bool,
}

impl Buffer {
	pub fn new(cols: u16, rows: u16) -> Self {
		Self {
			cols: cols,
			rows: rows,

			lines: vec![],
			current_line_ptr: 0,
			max_line_ptr: 0,

			needs_repaint: true,
		}
	}

	pub fn resise(&mut self, cols: u16, rows: u16) {
		self.needs_repaint = true;
		self.cols = cols;
		self.rows = rows;

		// If there are more lines that could be shown we push the current line pointer
		// down in an effort to fill the window w/ the add'l lines.
		let delta_buffer = self.lines.len() - self.current_line_ptr;

		let rows: usize = self.rows.into();
		if rows > delta_buffer {
			self.current_line_ptr -= cmp::min(self.current_line_ptr, rows - delta_buffer);
		}
	}

	pub fn append_line(&mut self, line: String) {
		self.lines.push(line);
		self.needs_repaint = true;
	}

	pub fn scroll_up(&mut self) {
		let new_line_ptr = self.current_line_ptr + 1;

		if new_line_ptr <= self.lines.len() && self.max_line_ptr > 0 {
			self.current_line_ptr += 1;
			self.needs_repaint = true;
		}
	}

	pub fn scroll_down(&mut self) {
		if self.current_line_ptr > 0 {
			self.current_line_ptr -= 1;
			self.needs_repaint = true;
		}
	}

	/// The caller *must* set the cursor to the row where this buffer should be rendered.
	/// The buffer will ensure that it paints _at most_ `self.rows` lines onto the terminal.
	/// NOTE: this is a no-op if the buffer does not need to be repainted.
	pub fn render(&mut self, stdout: &mut std::io::Stdout) -> anyhow::Result<()> {
		if !self.needs_repaint { return Ok(()); }
		self.needs_repaint = false;

		// move to bottom-most row.
		stdout
		.queue(cursor::MoveDown(self.rows))?
		.queue(cursor::MoveToColumn(0))?;

		// resize the spans starting from current line
		let cols = self.cols; // borrowck strikes again wtf, can't move this into `.map(...)`??? IT'S COPY.
		let viewport_spans = self.lines
		.iter()
		.enumerate()
		.rev()
		.skip(self.current_line_ptr)
		.map(|(idx, line)| Span::new(idx, line, cols))
		.take(self.rows.into());

		// blit lines to the viewport
		let mut rows_drawn = 0;

		for span in viewport_spans {
			let mut lines = span.line_iter().rev();

			while rows_drawn <= self.rows.into() {
				match lines.next() {
					Some(line) => {
						rows_drawn += 1;
						self.max_line_ptr = span.line_idx;
						stdout
							.queue(Clear(ClearType::CurrentLine))?
							.queue(Print(line))?
							.queue(cursor::MoveUp(1))?
							.queue(cursor::MoveToColumn(0))?;
					},

					None => { break },
				}
			}
		}
		
		// clear remaining lines
		while rows_drawn < self.rows.into() {
			rows_drawn += 1;
			stdout
				.queue(Clear(ClearType::CurrentLine))?
				.queue(cursor::MoveUp(1))?
				.queue(cursor::MoveToColumn(0))?;
		}

		let buf_info = format!(
			"cur: {}, max: {}, len: {}, rows: {}",
			self.current_line_ptr,
			self.max_line_ptr,
			self.lines.len(),
			self.rows,
		);

		stdout
			.queue(cursor::MoveTo(0, 1))?
			.queue(style::SetBackgroundColor(COLOR_TITLE_BG))?
			.queue(style::SetForegroundColor(COLOR_TITLE_FG))?
			.queue(Clear(ClearType::CurrentLine))?
			.queue(Print(&buf_info))?
			.queue(style::ResetColor)?;

		Ok(())
	}
    pub fn new(cols: u16, rows: u16) -> Self {
        Self {
            cols: cols,
            rows: rows,

            lines: vec![],
            current_line_ptr: 0,
            max_line_ptr: 0,

            needs_repaint: true,
        }
    }

    pub fn resise(&mut self, cols: u16, rows: u16) {
        self.needs_repaint = true;
        self.cols = cols;
        self.rows = rows;

        // If there are more lines that could be shown we push the current line pointer
        // down in an effort to fill the window w/ the add'l lines.
        let delta_buffer = self.lines.len() - self.current_line_ptr;

        let rows: usize = self.rows.into();
        if rows > delta_buffer {
            self.current_line_ptr -= cmp::min(self.current_line_ptr, rows - delta_buffer);
        }
    }

    pub fn append_line(&mut self, line: String) {
        self.lines.push(line);
        self.needs_repaint = true;
    }

    pub fn scroll_up(&mut self) {
        let new_line_ptr = self.current_line_ptr + 1;

        if new_line_ptr <= self.lines.len() && self.max_line_ptr > 0 {
            self.current_line_ptr += 1;
            self.needs_repaint = true;
        }
    }

    pub fn scroll_down(&mut self) {
        if self.current_line_ptr > 0 {
            self.current_line_ptr -= 1;
            self.needs_repaint = true;
        }
    }

    /// The caller *must* set the cursor to the row where this buffer should be rendered.
    /// The buffer will ensure that it paints _at most_ `self.rows` lines onto the terminal.
    /// NOTE: this is a no-op if the buffer does not need to be repainted.
    pub fn render(&mut self, stdout: &mut std::io::Stdout) -> anyhow::Result<()> {
        if !self.needs_repaint {
            return Ok(());
        }
        self.needs_repaint = false;

        // move to bottom-most row.
        stdout
            .queue(cursor::MoveDown(self.rows))?
            .queue(cursor::MoveToColumn(0))?;

        // resize the spans starting from current line
        let cols = self.cols; // borrowck strikes again wtf, can't move this into `.map(...)`??? IT'S COPY.
        let viewport_spans = self
            .lines
            .iter()
            .enumerate()
            .rev()
            .skip(self.current_line_ptr)
            .map(|(idx, line)| Span::new(idx, line, cols))
            .take(self.rows.into());

        // blit lines to the viewport
        let mut rows_drawn = 0;

        for span in viewport_spans {
            let mut lines = span.line_iter().rev();

            while rows_drawn <= self.rows.into() {
                match lines.next() {
                    Some(line) => {
                        rows_drawn += 1;
                        self.max_line_ptr = span.line_idx;
                        stdout
                            .queue(Clear(ClearType::CurrentLine))?
                            .queue(Print(line))?
                            .queue(cursor::MoveUp(1))?
                            .queue(cursor::MoveToColumn(0))?;
                    }

                    None => break,
                }
            }
        }

        // clear remaining lines
        while rows_drawn < self.rows.into() {
            rows_drawn += 1;
            stdout
                .queue(Clear(ClearType::CurrentLine))?
                .queue(cursor::MoveUp(1))?
                .queue(cursor::MoveToColumn(0))?;
        }

        let buf_info = format!(
            "cur: {}, max: {}, len: {}, rows: {}",
            self.current_line_ptr,
            self.max_line_ptr,
            self.lines.len(),
            self.rows,
        );

        stdout
            .queue(cursor::MoveTo(0, 1))?
            .queue(style::SetBackgroundColor(COLOR_TITLE_BG))?
            .queue(style::SetForegroundColor(COLOR_TITLE_FG))?
            .queue(Clear(ClearType::CurrentLine))?
            .queue(Print(&buf_info))?
            .queue(style::ResetColor)?;

        Ok(())
    }
}

pub struct InputBuffer {
	pos: usize,
	start: usize,
	end: usize,
    pos: usize,
    start: usize,
    end: usize,

	buffer: String,
    buffer: String,
}

impl InputBuffer {
	pub fn new() -> Self {
		Self {
			// TODO: should probably init w/ some size ...
			pos: 0,
			start: 0,
			end: 0,

			buffer: String::new(),
		}
	}

	pub fn insert(&mut self, ch: char) {
		if self.pos < self.buffer.len() {
			self.buffer.insert(self.pos, ch);
		} else {
			self.buffer.push(ch);
		}

		self.cursor_right();
	}

	pub fn backspace(&mut self) {
		if self.pos < self.buffer.len() {
			self.buffer.remove(self.pos - 1);
		} else {
			self.buffer.pop();
		}

		self.cursor_left();
	}

	pub fn delete(&mut self) {
		if self.pos < self.buffer.len() {
			self.buffer.remove(self.pos);
		}
	}

	pub fn cursor_left(&mut self) {
		if self.pos <= 0 { return; }

		self.pos -= 1;

		if self.pos < self.start  {
			self.start -= 1;
			self.end -= 1;
		}
	}

	pub fn cursor_right(&mut self) {
		if self.pos >= self.buffer.len() { return; }

		self.pos += 1;

		if self.pos > self.end {
			self.start += 1;
			self.end += 1;
		}
	}

	pub fn goto_head(&mut self) {
		self.pos = 0;
		self.start = 0;
		self.end = self.span_len();
	}

	pub fn goto_tail(&mut self) {
		self.pos = self.buffer.len();
		self.end = self.pos;
		self.start = self.pos - self.span_len();
	}

	pub fn take(&mut self) -> String {
		let mut old_buffer = String::new();
		std::mem::swap(&mut self.buffer, &mut old_buffer);
		
		self.pos = 0;
		self.start = 0;
		self.end = 0;

		return old_buffer;
	}

	pub fn render(&mut self, stdout: &mut io::Stdout, col: u16, row: u16, width: u16) -> anyhow::Result<()> {
		// calculate some info about our prompt & buffer size
		let prompt = "> ";
		let width: usize = width.into();
		let width_with_prompt = width - 1 - prompt.len();

		// adjust viewspan to match current terminal width (less the prompt)
		if self.span_len() > width_with_prompt {
			// shrink the view span
			let delta = self.span_len() - width_with_prompt;

			let trailing_width = self.buffer.len() - self.pos;

			if self.pos + trailing_width > width_with_prompt {
				let start_to_pos = self.pos - self.start;
				self.start = self.pos;

				if start_to_pos < delta {
					self.end -= delta - start_to_pos;
				}
			} else {
				self.end -= delta;
			}
		} else if self.span_len() < width_with_prompt {
			// grow the view span
			let mut delta = width_with_prompt - self.span_len();

			// consume delta to display tail of the buffer outside the viewport.
			if self.buffer.len() > self.end {
				let overhang = cmp::min(delta, self.buffer.len() - self.end);
				self.end += overhang;
				delta -= overhang;
			}

			// repeat the same for the head of the buffer outside the viewport.
			if self.start > 0 {
				let overhang = cmp::min(delta, self.start);
				self.start -= overhang;
				delta -= overhang;
			}

			// if we still have new columns place them on the end.
			//
			// this should only be the case if the input span is larger than
			// the current buffer.
			self.end += delta;
		}

		// draw status info on line 2
		// let buf_info = format!(
		//	 "start: {}, end: {}, pos: {}, spanlen: {}, buflen: {}",
		//	 self.start,
		//	 self.end,
		//	 self.pos,
		//	 self.span_len(),
		//	 self.buffer.len()
		// );

		// stdout
		//	 .queue(cursor::MoveTo(0, 1))?
		//	 .queue(style::SetBackgroundColor(COLOR_TITLE_BG))?
		//	 .queue(style::SetForegroundColor(COLOR_TITLE_FG))?
		//	 .queue(Clear(ClearType::CurrentLine))?
		//	 .queue(Print(&buf_info))?
		//	 .queue(style::ResetColor)?;

		// clear the line & draw our prompt
		stdout
			.queue(cursor::MoveTo(col, row))?
			.queue(Clear(ClearType::CurrentLine))?
			.queue(Print(prompt.to_string()))?;

		// render the current viewport & update the cursor
		let beg = self.start;
		let end = cmp::min(self.buffer.len(), self.end);
		let relative_cursor_pos = (self.pos - self.start) + prompt.len() + 1;
		assert!(relative_cursor_pos <= u16::MAX.into());

		stdout
			.queue(Print(&self.buffer[beg..end]))?
			.queue(cursor::MoveToColumn(relative_cursor_pos as u16))?;

		Ok(())
	}

	fn span_len(&self) -> usize {
		self.end - self.start
	}
    pub fn new() -> Self {
        Self {
            // TODO: should probably init w/ some size ...
            pos: 0,
            start: 0,
            end: 0,

            buffer: String::new(),
        }
    }

    pub fn insert(&mut self, ch: char) {
        if self.pos < self.buffer.len() {
            self.buffer.insert(self.pos, ch);
        } else {
            self.buffer.push(ch);
        }

        self.cursor_right();
    }

    pub fn backspace(&mut self) {
        if self.pos < self.buffer.len() {
            self.buffer.remove(self.pos - 1);
        } else {
            self.buffer.pop();
        }

        self.cursor_left();
    }

    pub fn delete(&mut self) {
        if self.pos < self.buffer.len() {
            self.buffer.remove(self.pos);
        }
    }

    pub fn cursor_left(&mut self) {
        if self.pos <= 0 {
            return;
        }

        self.pos -= 1;

        if self.pos < self.start {
            self.start -= 1;
            self.end -= 1;
        }
    }

    pub fn cursor_right(&mut self) {
        if self.pos >= self.buffer.len() {
            return;
        }

        self.pos += 1;

        if self.pos > self.end {
            self.start += 1;
            self.end += 1;
        }
    }

    pub fn goto_head(&mut self) {
        self.pos = 0;
        self.start = 0;
        self.end = self.span_len();
    }

    pub fn goto_tail(&mut self) {
        self.pos = self.buffer.len();
        self.end = self.pos;
        self.start = self.pos - self.span_len();
    }

    pub fn take(&mut self) -> String {
        let mut old_buffer = String::new();
        std::mem::swap(&mut self.buffer, &mut old_buffer);

        self.pos = 0;
        self.start = 0;
        self.end = 0;

        return old_buffer;
    }

    pub fn render(
        &mut self,
        stdout: &mut io::Stdout,
        col: u16,
        row: u16,
        width: u16,
    ) -> anyhow::Result<()> {
        // calculate some info about our prompt & buffer size
        let prompt = "> ";
        let width: usize = width.into();
        let width_with_prompt = width - 1 - prompt.len();

        // adjust viewspan to match current terminal width (less the prompt)
        if self.span_len() > width_with_prompt {
            // shrink the view span
            let delta = self.span_len() - width_with_prompt;

            let trailing_width = self.buffer.len() - self.pos;

            if self.pos + trailing_width > width_with_prompt {
                let start_to_pos = self.pos - self.start;
                self.start = self.pos;

                if start_to_pos < delta {
                    self.end -= delta - start_to_pos;
                }
            } else {
                self.end -= delta;
            }
        } else if self.span_len() < width_with_prompt {
            // grow the view span
            let mut delta = width_with_prompt - self.span_len();

            // consume delta to display tail of the buffer outside the viewport.
            if self.buffer.len() > self.end {
                let overhang = cmp::min(delta, self.buffer.len() - self.end);
                self.end += overhang;
                delta -= overhang;
            }

            // repeat the same for the head of the buffer outside the viewport.
            if self.start > 0 {
                let overhang = cmp::min(delta, self.start);
                self.start -= overhang;
                delta -= overhang;
            }

            // if we still have new columns place them on the end.
            //
            // this should only be the case if the input span is larger than
            // the current buffer.
            self.end += delta;
        }

        // draw status info on line 2
        // let buf_info = format!(
        //	 "start: {}, end: {}, pos: {}, spanlen: {}, buflen: {}",
        //	 self.start,
        //	 self.end,
        //	 self.pos,
        //	 self.span_len(),
        //	 self.buffer.len()
        // );

        // stdout
        //	 .queue(cursor::MoveTo(0, 1))?
        //	 .queue(style::SetBackgroundColor(COLOR_TITLE_BG))?
        //	 .queue(style::SetForegroundColor(COLOR_TITLE_FG))?
        //	 .queue(Clear(ClearType::CurrentLine))?
        //	 .queue(Print(&buf_info))?
        //	 .queue(style::ResetColor)?;

        // clear the line & draw our prompt
        stdout
            .queue(cursor::MoveTo(col, row))?
            .queue(Clear(ClearType::CurrentLine))?
            .queue(Print(prompt.to_string()))?;

        // render the current viewport & update the cursor
        let beg = self.start;
        let end = cmp::min(self.buffer.len(), self.end);
        let relative_cursor_pos = (self.pos - self.start) + prompt.len() + 1;
        assert!(relative_cursor_pos <= u16::MAX.into());

        stdout
            .queue(Print(&self.buffer[beg..end]))?
            .queue(cursor::MoveToColumn(relative_cursor_pos as u16))?;

        Ok(())
    }

    fn span_len(&self) -> usize {
        self.end - self.start
    }
}

M linetest/src/shell/ui/mod.rs => linetest/src/shell/ui/mod.rs +45 -45
@@ 1,9 1,9 @@
use crossterm::{
	cursor,
	event::{self, Event},
	style,
	terminal::{EnterAlternateScreen, LeaveAlternateScreen},
	ExecutableCommand,
    cursor,
    event::{self, Event},
    style,
    terminal::{EnterAlternateScreen, LeaveAlternateScreen},
    ExecutableCommand,
};

use std::io;


@@ 16,44 16,44 @@ pub const COLOR_TITLE_BG: style::Color = style::Color::Blue;
pub const COLOR_TITLE_FG: style::Color = style::Color::Grey;

pub fn start_task() -> anyhow::Result<()> {
	let mut shell = term::TermInterface::new()?;
	let poll_hz = Duration::from_millis(1000 / 120);

	// take over user's terminal
	crossterm::terminal::enable_raw_mode()?;

	io::stdout()
		.execute(EnterAlternateScreen)?
		.execute(cursor::DisableBlinking)?
		.execute(cursor::Hide)?;
	
	// loop over crossterm events every 120Hz
	while shell.is_running() {
		shell.read_connections()?;
		shell.present()?;
		if !event::poll(poll_hz)? { continue }

		match event::read()? {
			Event::Key(key_evt) => {
				shell.event_keydown(key_evt);
			},

			Event::Resize(cols, rows) => { 
				shell.event_resize(cols, rows);
			},

			_ => {},
		}


	}

	// return to user's previous terminal
	io::stdout()
		.execute(LeaveAlternateScreen)?
		.execute(cursor::Show)?;

	crossterm::terminal::disable_raw_mode()?;

	Ok(())
    let mut shell = term::TermInterface::new()?;
    let poll_hz = Duration::from_millis(1000 / 120);

    // take over user's terminal
    crossterm::terminal::enable_raw_mode()?;

    io::stdout()
        .execute(EnterAlternateScreen)?
        .execute(cursor::DisableBlinking)?
        .execute(cursor::Hide)?;

    // loop over crossterm events every 120Hz
    while shell.is_running() {
        shell.read_connections()?;
        shell.present()?;
        if !event::poll(poll_hz)? {
            continue;
        }

        match event::read()? {
            Event::Key(key_evt) => {
                shell.event_keydown(key_evt);
            }

            Event::Resize(cols, rows) => {
                shell.event_resize(cols, rows);
            }

            _ => {}
        }
    }

    // return to user's previous terminal
    io::stdout()
        .execute(LeaveAlternateScreen)?
        .execute(cursor::Show)?;

    crossterm::terminal::disable_raw_mode()?;

    Ok(())
}

M linetest/src/shell/ui/term.rs => linetest/src/shell/ui/term.rs +335 -306
@@ 1,330 1,359 @@
use crate::shell::{net, ConnMsg, ConnectionMap, Error};
use super::buffer::{Buffer, InputBuffer};
use super::{COLOR_TITLE_BG, COLOR_TITLE_FG};
use crate::shell::{net, ConnMsg, ConnectionMap, Error};
use std::io::{self, Write};

use crossterm::{
	cursor,
	event::{self, KeyEvent, KeyModifiers},
	style::{self, Print},
	terminal::{self, Clear, ClearType},
	QueueableCommand,
    cursor,
    event::{self, KeyEvent, KeyModifiers},
    style::{self, Print},
    terminal::{self, Clear, ClearType},
    QueueableCommand,
};

pub struct TermInterface {
	term_cols: u16,
	term_rows: u16,
    term_cols: u16,
    term_rows: u16,

	buffer: InputBuffer,
	syslog: Buffer,
	is_running: bool,
    buffer: InputBuffer,
    syslog: Buffer,
    is_running: bool,

	conn_map: ConnectionMap,
	current_buffer: Option<i64>,
    conn_map: ConnectionMap,
    current_buffer: Option<i64>,
}

impl TermInterface {
	pub fn new() -> anyhow::Result<Self> {
		let (cols, rows) = terminal::size()?;

		Ok(Self {
			term_cols: cols,
			term_rows: rows,

			buffer: InputBuffer::new(),
			syslog: Buffer::new(cols, rows - 3),
			is_running: true,

			conn_map: ConnectionMap::new(),
			current_buffer: None,
		})
	}

	pub fn read_connections(&mut self) -> anyhow::Result<()> {
		let conn_messages = self.conn_map.poll_messages();

		for (conn_id, conn_msg_q) in conn_messages {
			for conn_msg in conn_msg_q {
				match conn_msg {
					ConnMsg::LogLine { msg } => {
						let log_line = format!("conn({}): {}", conn_id, msg);
						self.syslog.append_line(log_line);
					},

					ConnMsg::BufLine { id, msg } => {
						// TODO: buffer management
						let log_line = format!("{}@{}: {}", conn_id, id, msg);
						self.syslog.append_line(log_line);
					},

					ConnMsg::RejectTemp { reason } => {
						let log_line = format!("conn({}): {}", conn_id, reason);
						self.syslog.append_line(log_line);
					},

					_ => {},
				}
			}

		}

		Ok(())
	}

	pub fn present(&mut self) -> anyhow::Result<()> {
		let mut stdout = io::stdout();
		stdout.queue(cursor::Hide)?;

		// draw titlebar
		stdout
			.queue(cursor::MoveTo(0, 0))?
			.queue(style::SetBackgroundColor(COLOR_TITLE_BG))?
			.queue(style::SetForegroundColor(COLOR_TITLE_FG))?
			.queue(Clear(ClearType::CurrentLine))?
			.queue(Print(self.status_line()))?
			.queue(style::ResetColor)?;

		// draw the input buffer and save its cursor position
		// TODO: render this lazily?
		self.buffer.render(&mut stdout, 0, self.term_rows, self.term_cols)?;
		stdout.queue(cursor::SavePosition)?;

		// draw the syslog buffer
		// TODO: draw current buffer
		stdout.queue(cursor::MoveTo(0, 1))?; // move to row 2: where we want to paint the buffer.
		self.syslog.render(&mut stdout)?;

		// restore input cursor position
		stdout
			.queue(cursor::RestorePosition)?
			.queue(cursor::Show)?;

		Ok(stdout.flush()?)
	}

	pub fn event_keydown(&mut self, event: event::KeyEvent) {
		match event {
			KeyEvent { code, modifiers } if modifiers.contains(KeyModifiers::CONTROL) => match code {
				event::KeyCode::Char('q') => { self.is_running = false; },
				_ => {},
			},

			KeyEvent { code, modifiers } => match code {
				event::KeyCode::Char(ch) if modifiers.contains(KeyModifiers::SHIFT) => {

					self.buffer.insert(ch.to_ascii_uppercase());
				},

				event::KeyCode::Char(ch) => {
					self.buffer.insert(ch);
				},

				event::KeyCode::Backspace => {
					self.buffer.backspace();
				},

				event::KeyCode::Delete => {
					self.buffer.delete();
				},

				event::KeyCode::Left => {
					self.buffer.cursor_left();
				},

				event::KeyCode::Right => {
					self.buffer.cursor_right();
				},

				event::KeyCode::Enter => {
					self.process_buffer();
				},

				event::KeyCode::PageUp => {
					self.syslog.scroll_up();
				},

				event::KeyCode::PageDown => {
					self.syslog.scroll_down();
				},

				event::KeyCode::Home => {
					self.buffer.goto_head();
				},

				event::KeyCode::End => {
					self.buffer.goto_tail();
				},

				_ => {},
			},
		}
	}

	pub fn event_resize(&mut self, cols: u16, rows: u16) {
		self.term_cols = cols;
		self.term_rows = rows;

		// syslog buffer is max terminal height - 2 rows for status bar - 1 row for input buffer
		self.syslog.resise(self.term_cols, self.term_rows - 3);
	}

	pub fn is_running(&self) -> bool { self.is_running }

	fn status_line(&self) -> String {
		format!("linetest - rows [{}] cols [{}]", self.term_rows, self.term_cols)
	}

	fn process_buffer(&mut self) {
		let result = match self.buffer.take() {
			cmd if cmd.starts_with("/connect") => self.cmd_connect(cmd),
			cmd if cmd.starts_with("/sessions") => self.cmd_list_sessions(cmd),
			cmd if cmd.starts_with("/nick") => self.cmd_nick(cmd),
			cmd if cmd.starts_with("/join") => self.cmd_join(cmd),
			cmd if cmd.starts_with("/part") => self.cmd_part(cmd),
			
			
			cmd if cmd.starts_with("/test") => self.cmd_test(cmd),
			cmd if cmd.starts_with("/e") => self.cmd_echo(cmd),
			cmd => self.cmd_message(cmd),
		};

		if let Err(msg) = result {
			self.syslog.append_line(format!("command error: {:?}", msg));
		}
	}

	fn cmd_connect(&mut self, buf: String) -> anyhow::Result<()> {
		let mut args = buf.split(" ");
		let _cmd_name = args.next().ok_or_else(|| Error::MissingArgument)?;
		let hostname = args.next().ok_or_else(|| Error::MissingArgument)?;
		let nickname = args.next().ok_or_else(|| Error::MissingArgument)?;
		
		let (conn_id, conn_mx) = self.conn_map.create_connection(hostname, nickname)?;
		let conn = self.conn_map.connections[&conn_id].clone();

		smol::Task::spawn(async move {
    pub fn new() -> anyhow::Result<Self> {
        let (cols, rows) = terminal::size()?;

        Ok(Self {
            term_cols: cols,
            term_rows: rows,

            buffer: InputBuffer::new(),
            syslog: Buffer::new(cols, rows - 3),
            is_running: true,

            conn_map: ConnectionMap::new(),
            current_buffer: None,
        })
    }

    pub fn read_connections(&mut self) -> anyhow::Result<()> {
        let conn_messages = self.conn_map.poll_messages();

        for (conn_id, conn_msg_q) in conn_messages {
            for conn_msg in conn_msg_q {
                match conn_msg {
                    ConnMsg::LogLine { msg } => {
                        let log_line = format!("conn({}): {}", conn_id, msg);
                        self.syslog.append_line(log_line);
                    }

                    ConnMsg::BufLine { id, msg } => {
                        // TODO: buffer management
                        let log_line = format!("{}@{}: {}", conn_id, id, msg);
                        self.syslog.append_line(log_line);
                    }

                    ConnMsg::RejectTemp { reason } => {
                        let log_line = format!("conn({}): {}", conn_id, reason);
                        self.syslog.append_line(log_line);
                    }

                    _ => {}
                }
            }
        }

        Ok(())
    }

    pub fn present(&mut self) -> anyhow::Result<()> {
        let mut stdout = io::stdout();
        stdout.queue(cursor::Hide)?;

        // draw titlebar
        stdout
            .queue(cursor::MoveTo(0, 0))?
            .queue(style::SetBackgroundColor(COLOR_TITLE_BG))?
            .queue(style::SetForegroundColor(COLOR_TITLE_FG))?
            .queue(Clear(ClearType::CurrentLine))?
            .queue(Print(self.status_line()))?
            .queue(style::ResetColor)?;

        // draw the input buffer and save its cursor position
        // TODO: render this lazily?
        self.buffer
            .render(&mut stdout, 0, self.term_rows, self.term_cols)?;
        stdout.queue(cursor::SavePosition)?;

        // draw the syslog buffer
        // TODO: draw current buffer
        stdout.queue(cursor::MoveTo(0, 1))?; // move to row 2: where we want to paint the buffer.
        self.syslog.render(&mut stdout)?;

        // restore input cursor position
        stdout.queue(cursor::RestorePosition)?.queue(cursor::Show)?;

        Ok(stdout.flush()?)
    }

    pub fn event_keydown(&mut self, event: event::KeyEvent) {
        match event {
            KeyEvent { code, modifiers } if modifiers.contains(KeyModifiers::CONTROL) => match code
            {
                event::KeyCode::Char('q') => {
                    self.is_running = false;
                }
                _ => {}
            },

            KeyEvent { code, modifiers } => match code {
                event::KeyCode::Char(ch) if modifiers.contains(KeyModifiers::SHIFT) => {
                    self.buffer.insert(ch.to_ascii_uppercase());
                }

                event::KeyCode::Char(ch) => {
                    self.buffer.insert(ch);
                }

                event::KeyCode::Backspace => {
                    self.buffer.backspace();
                }

                event::KeyCode::Delete => {
                    self.buffer.delete();
                }

                event::KeyCode::Left => {
                    self.buffer.cursor_left();
                }

                event::KeyCode::Right => {
                    self.buffer.cursor_right();
                }

                event::KeyCode::Enter => {
                    self.process_buffer();
                }

                event::KeyCode::PageUp => {
                    self.syslog.scroll_up();
                }

                event::KeyCode::PageDown => {
                    self.syslog.scroll_down();
                }

                event::KeyCode::Home => {
                    self.buffer.goto_head();
                }

                event::KeyCode::End => {
                    self.buffer.goto_tail();
                }

                _ => {}
            },
        }
    }

    pub fn event_resize(&mut self, cols: u16, rows: u16) {
        self.term_cols = cols;
        self.term_rows = rows;

        // syslog buffer is max terminal height - 2 rows for status bar - 1 row for input buffer
        self.syslog.resise(self.term_cols, self.term_rows - 3);
    }

    pub fn is_running(&self) -> bool {
        self.is_running
    }

    fn status_line(&self) -> String {
        format!(
            "linetest - rows [{}] cols [{}]",
            self.term_rows, self.term_cols
        )
    }

    fn process_buffer(&mut self) {
        let result = match self.buffer.take() {
            cmd if cmd.starts_with("/connect") => self.cmd_connect(cmd),
            cmd if cmd.starts_with("/sessions") => self.cmd_list_sessions(cmd),
            cmd if cmd.starts_with("/nick") => self.cmd_nick(cmd),
            cmd if cmd.starts_with("/join") => self.cmd_join(cmd),
            cmd if cmd.starts_with("/part") => self.cmd_part(cmd),

            cmd if cmd.starts_with("/test") => self.cmd_test(cmd),
            cmd if cmd.starts_with("/e") => self.cmd_echo(cmd),
            cmd => self.cmd_message(cmd),
        };

        if let Err(msg) = result {
            self.syslog.append_line(format!("command error: {:?}", msg));
        }
    }

    fn cmd_connect(&mut self, buf: String) -> anyhow::Result<()> {
        let mut args = buf.split(" ");
        let _cmd_name = args.next().ok_or_else(|| Error::MissingArgument)?;
        let hostname = args.next().ok_or_else(|| Error::MissingArgument)?;
        let nickname = args.next().ok_or_else(|| Error::MissingArgument)?;

        let (conn_id, conn_mx) = self.conn_map.create_connection(hostname, nickname)?;
        let conn = self.conn_map.connections[&conn_id].clone();

        smol::Task::spawn(async move {
			let net_outbox = conn_mx.outbox.clone();

			if let Err(msg) = net::start_task(conn, conn_mx).await {
				let log_line = ConnMsg::LogLine { msg: format!("error connecting: {:?})", msg) };
				

				if let Err(_) = net_outbox.send(log_line).await {
					panic!("BUG: double fault - connection mailbox is unavailable while reporting connection error.");
				}
			}
		}).detach();


		Ok(())

	}

	fn cmd_echo(&mut self, buf: String) -> anyhow::Result<()> {
		let args = buf.splitn(2, " ");
		
		if let Some(text) = args.skip(1).next() {
			self.syslog.append_line(text.to_string());
		}

		Ok(())
	}

	fn cmd_test(&mut self, _buf: String) -> anyhow::Result<()> {
		// spam a bunch of shit to the terminal so we have a buffer to test with
		for i in 1..150 {
			self.syslog.append_line(format!("this is some rather long string of text to test with, the {}th such string", i))
		}
		Ok(())
	}

	fn cmd_list_sessions(&mut self, _buf: String) -> anyhow::Result<()> {
		for (_id, conn) in &self.conn_map.connections {
			self.syslog.append_line(format!("server {} @ {}: connected? {}", conn.id, conn.addr, conn.is_connected));
		}

		Ok(())
	}

	fn cmd_join(&mut self, buf: String) -> anyhow::Result<()> {
		let mut args = buf.split(" ");
		let _cmd_name = args.next().ok_or_else(|| Error::MissingArgument)?;

		let session_no = args.next()
		.ok_or_else(|| Error::MissingArgument)
		.and_then(|arg| 
			// ick ...
			arg.parse::<i64>().map_err(|err| { Error::ParseError { pos: "session_no", source: err } })
		)?;

		let room_name = args.next().ok_or_else(|| Error::MissingArgument)?;

		let conn = self.conn_map.socket_tx.get(&session_no)
		.ok_or_else(|| Error::MissingConnectionId)?;
		smol::block_on(conn.send(ConnMsg::JoinRoom { 
			room: room_name.to_string(),
		}))?;

		Ok(())
	}
	
	fn cmd_part(&mut self, buf: String) -> anyhow::Result<()> {
		let mut args = buf.split(" ");
		let _cmd_name = args.next().ok_or_else(|| Error::MissingArgument)?;

		let session_no = args.next()
		.ok_or_else(|| Error::MissingArgument)
		.and_then(|arg| 
			// ick ...
			arg.parse::<i64>().map_err(|err| { Error::ParseError { pos: "session_no", source: err } })
		)?;

		let room_name = args.next().ok_or_else(|| Error::MissingArgument)?;

		let conn = self.conn_map.socket_tx.get(&session_no)
		.ok_or_else(|| Error::MissingConnectionId)?;
		smol::block_on(conn.send(ConnMsg::PartRoom { 
			room: room_name.to_string(),
		}))?;

		Ok(())
	}

	fn cmd_nick(&mut self, buf: String) -> anyhow::Result<()> {
		let mut args = buf.split(" ");
		let _cmd_name = args.next().ok_or_else(|| Error::MissingArgument)?;
		
		let session_no = args.next()
		.ok_or_else(|| Error::MissingArgument)
		.and_then(|arg| 
			// ick ...
			arg.parse::<i64>().map_err(|err| { Error::ParseError { pos: "session_no", source: err } })
		)?;

		let nick = args.next().ok_or_else(|| Error::MissingArgument)?;
		let conn = self.conn_map.socket_tx.get(&session_no)
		.ok_or_else(|| Error::MissingConnectionId)?;


		smol::block_on(conn.send(ConnMsg::ChangeNick { name: nick.to_string() }))?;
		Ok(())
	}

	fn cmd_message(&mut self, buf: String) -> anyhow::Result<()> {
		match self.current_buffer {
			Some(_) => { /* TODO: send message to connection */ }
			None => { 
				for (_id, conn) in &mut self.conn_map.socket_tx {
					smol::block_on(async { 
						let _ = conn.send(ConnMsg::BufLine {
							id: "example".to_string(),
							msg: buf.clone(),
						}).await;
					});
				}
				//self.lines.push(buf) 
			},
		}

		Ok(())
	}
        Ok(())
    }

    fn cmd_echo(&mut self, buf: String) -> anyhow::Result<()> {
        let args = buf.splitn(2, " ");

        if let Some(text) = args.skip(1).next() {
            self.syslog.append_line(text.to_string());
        }

        Ok(())
    }

    fn cmd_test(&mut self, _buf: String) -> anyhow::Result<()> {
        // spam a bunch of shit to the terminal so we have a buffer to test with
        for i in 1..150 {
            self.syslog.append_line(format!(
                "this is some rather long string of text to test with, the {}th such string",
                i
            ))
        }
        Ok(())
    }

    fn cmd_list_sessions(&mut self, _buf: String) -> anyhow::Result<()> {
        for (_id, conn) in &self.conn_map.connections {
            self.syslog.append_line(format!(
                "server {} @ {}: connected? {}",
                conn.id, conn.addr, conn.is_connected
            ));
        }

        Ok(())
    }

    fn cmd_join(&mut self, buf: String) -> anyhow::Result<()> {
        let mut args = buf.split(" ");
        let _cmd_name = args.next().ok_or_else(|| Error::MissingArgument)?;

        let session_no = args
            .next()
            .ok_or_else(|| Error::MissingArgument)
            .and_then(|arg| {
                arg.parse::<i64>().map_err(|err| Error::ParseError {
                    pos: "session_no",
                    source: err,
                })
            })?;

        let room_name = args.next().ok_or_else(|| Error::MissingArgument)?;

        let conn = self
            .conn_map
            .socket_tx
            .get(&session_no)
            .ok_or_else(|| Error::MissingConnectionId)?;
        smol::block_on(conn.send(ConnMsg::JoinRoom {
            room: room_name.to_string(),
        }))?;

        Ok(())
    }

    fn cmd_part(&mut self, buf: String) -> anyhow::Result<()> {
        let mut args = buf.split(" ");
        let _cmd_name = args.next().ok_or_else(|| Error::MissingArgument)?;

        let session_no = args
            .next()
            .ok_or_else(|| Error::MissingArgument)
            .and_then(|arg| {
                arg.parse::<i64>().map_err(|err| Error::ParseError {
                    pos: "session_no",
                    source: err,
                })
            })?;

        let room_name = args.next().ok_or_else(|| Error::MissingArgument)?;

        let conn = self
            .conn_map
            .socket_tx
            .get(&session_no)
            .ok_or_else(|| Error::MissingConnectionId)?;
        smol::block_on(conn.send(ConnMsg::PartRoom {
            room: room_name.to_string(),
        }))?;

        Ok(())
    }

    fn cmd_nick(&mut self, buf: String) -> anyhow::Result<()> {
        let mut args = buf.split(" ");
        let _cmd_name = args.next().ok_or_else(|| Error::MissingArgument)?;

        let session_no = args
            .next()
            .ok_or_else(|| Error::MissingArgument)
            .and_then(|arg| {
                arg.parse::<i64>().map_err(|err| Error::ParseError {
                    pos: "session_no",
                    source: err,
                })
            })?;

        let nick = args.next().ok_or_else(|| Error::MissingArgument)?;
        let conn = self
            .conn_map
            .socket_tx
            .get(&session_no)
            .ok_or_else(|| Error::MissingConnectionId)?;

        smol::block_on(conn.send(ConnMsg::ChangeNick {
            name: nick.to_string(),
        }))?;
        Ok(())
    }

    fn cmd_message(&mut self, buf: String) -> anyhow::Result<()> {
        match self.current_buffer {
            Some(_) => { /* TODO: send message to connection */ }
            None => {
                for (_id, conn) in &mut self.conn_map.socket_tx {
                    smol::block_on(async {
                        let _ = conn
                            .send(ConnMsg::BufLine {
                                id: "example".to_string(),
                                msg: buf.clone(),
                            })
                            .await;
                    });
                }
                //self.lines.push(buf)
            }
        }

        Ok(())
    }
}

M smolboi/src/bin/server.rs => smolboi/src/bin/server.rs +86 -85
@@ 1,5 1,6 @@
#[macro_use] extern crate log;
use async_channel::{unbounded, Sender, Receiver};
#[macro_use]
extern crate log;
use async_channel::{unbounded, Receiver, Sender};
use async_dup::Arc;
use futures_util::{future, select, StreamExt};
use smol::{Async, Executor};


@@ 13,30 14,31 @@ use smolboi::server::Server;
use smolboi::util;

fn main() -> anyhow::Result<()> {
	env_logger::init();
	info!("logging subsystem initialized");

	// bootstrap the runtime
	let ex = Arc::new(Executor::new());
	let rt_ex = ex.clone();
	let rt = thread::spawn(move || { smol::block_on(rt_ex.clone().run(future::pending::<()>())) });
	info!("started single-threaded executor");

	// setup the acceptor
	let address = "127.0.0.1:7777".parse::<SocketAddr>()?;
	let listener = Async::<TcpListener>::bind(address)?;
	info!("set up listener on 127.0.0.1:7777");

	let (events_tx, events_rx) = unbounded();

	ex.spawn(acceptor_task(listener, events_tx.clone(), ex.clone())).detach();
	ex.spawn(broker_task(events_rx)).detach();
	
	if let Err(msg) = rt.join() {
		warn!("executor exited w/ error: {:?}", msg);
	}

	Ok(())
    env_logger::init();
    info!("logging subsystem initialized");

    // bootstrap the runtime
    let ex = Arc::new(Executor::new());
    let rt_ex = ex.clone();
    let rt = thread::spawn(move || smol::block_on(rt_ex.clone().run(future::pending::<()>())));
    info!("started single-threaded executor");

    // setup the acceptor
    let address = "127.0.0.1:7777".parse::<SocketAddr>()?;
    let listener = Async::<TcpListener>::bind(address)?;
    info!("set up listener on 127.0.0.1:7777");

    let (events_tx, events_rx) = unbounded();

    ex.spawn(acceptor_task(listener, events_tx.clone(), ex.clone()))
        .detach();
    ex.spawn(broker_task(events_rx)).detach();

    if let Err(msg) = rt.join() {
        warn!("executor exited w/ error: {:?}", msg);
    }

    Ok(())
}

/*


@@ 47,89 49,88 @@ fn main() -> anyhow::Result<()> {
 * which can be used to send events to the server. Once a peer has established
 * a registration w/ the server, through `Event::NewPeer { .. }`, the server
 * can similarly issue replies to that peer's mailbox arbitrarily.
 * 
 *
 */
async fn broker_task(events: Receiver<Event>) {
	let mut server = Server::new();
	let mut events = events.fuse();

	loop {
		select! {
			event = events.next() => match event { 
				Some(event) => {
					info!("processing server event: {:?}", event);
					if let Err(msg) = server.process_event(event).await {
						warn!("unexpected server error: {:?}", msg);
					}
				},

				None => { panic!("event stream is gone?") },
			},
		}
	}
    let mut server = Server::new();
    let mut events = events.fuse();

    loop {
        select! {
            event = events.next() => match event {
                Some(event) => {
                    info!("processing server event: {:?}", event);
                    if let Err(msg) = server.process_event(event).await {
                        warn!("unexpected server error: {:?}", msg);
                    }
                },

                None => { panic!("event stream is gone?") },
            },
        }
    }
}


/*
 * The acceptor task is a tight loop over incoming connections.
 * When an incoming connection is received we call `accept_client(..)` which
 * sets up some channels and spawns a detached connection-specific task.
 * 
 *
 * It is imperative that `accept_client(..)` returns quickly & does not block,
 * as any clients connecting concurrently will be blocked until that call
 * returns control to this task.
 */
async fn acceptor_task(
	listener: Async<TcpListener>,
	events: Sender<Event>,
	ex: Arc<Executor<'_>>,
) {
	info!("starting acceptor loop");

	loop {
		let (client_stream, client_addr) = match listener.accept().await {
			Ok(client) => client,
			Err(msg) => { warn!("error accepting client: {:?}", msg); continue },
		};

		let events = events.clone(); // clone events outbox so we can move into the client's task
		let client_ex = ex.clone();
		ex.spawn(async move {
			if let Err(msg) = accept_client(client_stream, client_addr, events, client_ex).await {
				warn!("warning: failed to accept client. {:?}", msg);
			}
		}).detach();
	}
async fn acceptor_task(listener: Async<TcpListener>, events: Sender<Event>, ex: Arc<Executor<'_>>) {
    info!("starting acceptor loop");

    loop {
        let (client_stream, client_addr) = match listener.accept().await {
            Ok(client) => client,
            Err(msg) => {
                warn!("error accepting client: {:?}", msg);
                continue;
            }
        };

        let events = events.clone(); // clone events outbox so we can move into the client's task
        let client_ex = ex.clone();
        ex.spawn(async move {
            if let Err(msg) = accept_client(client_stream, client_addr, events, client_ex).await {
                warn!("warning: failed to accept client. {:?}", msg);
            }
        })
        .detach();
    }
}

/**
 * Sets up mailboxes for a client and begins reading packets from the client's
 * incoming TCP socket. Control is returned to the caller once the mailboxes
 * are established and the task has been sent to the async runtime.
 * 
 *
 * Two tasks are spawned:
 * 
 *
 * 1. The packet decoding task reads the incoming stream and translates
 *	it into `Packet { .. }` objects.
 * 
 *
 * 2. The client event loop which selects over events from the server as well
 *	as incoming packets from the wire. These are dispatched to the `Client { .. }`
 *	which is esentially a state machine representing the connected client.
 */
async fn accept_client(
	stream: Async<TcpStream>,
	addr: SocketAddr,
	events: Sender<Event>,
	ex: Arc<Executor<'_>>,
    stream: Async<TcpStream>,
    addr: SocketAddr,
    events: Sender<Event>,
    ex: Arc<Executor<'_>>,
) -> anyhow::Result<()> {
	info!("accepting client w/ ip: {}", addr);
	let stream = Arc::new(stream);
	let (packets_tx, packets_rx) = unbounded();
	let (tcp_quit_tx, tcp_quit_rx) = unbounded::<void::Void>();

	// peer event loop
	let peer_ex = ex.clone();
	ex.spawn(async move {
    info!("accepting client w/ ip: {}", addr);
    let stream = Arc::new(stream);
    let (packets_tx, packets_rx) = unbounded();
    let (tcp_quit_tx, tcp_quit_rx) = unbounded::<void::Void>();

    // peer event loop
    let peer_ex = ex.clone();
    ex.spawn(async move {
		// incoming server connections
		let (mut peer, peer_rx) = Peer::with_stream(&events, &stream);
		let mut peer_events = peer_rx.fuse();	   // stream of events from server


@@ 140,7 141,7 @@ async fn accept_client(
		peer_ex.spawn(async move {
			loop {
				match util::read_packet(&mut stream.clone()).await {
					Ok(packet) => { 
					Ok(packet) => {
						if let Err(_) = packets_tx.send(packet).await {
							panic!("BUG: sent packet but connection event loop is gone?");
						}


@@ 149,7 150,7 @@ async fn accept_client(
					Err(msg) => {
						warn!("malformed packet from client: {:?}", msg);
						break; // TODO: kill the client?
					},   
					},
				}
			}



@@ 189,5 190,5 @@ async fn accept_client(
		}
	}).detach();

	Ok(()) // created peer event loop a-ok
    Ok(()) // created peer event loop a-ok
}

M smolboi/src/lib.rs => smolboi/src/lib.rs +5 -3
@@ 1,6 1,8 @@
#[macro_use] extern crate log;

pub mod peer;
pub mod protocol;
pub mod server;
pub mod util;
\ No newline at end of file
pub mod util;

mod prelude {
    pub use log::{debug, error, info, warn};
}

M smolboi/src/peer.rs => smolboi/src/peer.rs +189 -171
@@ 1,6 1,7 @@
use crate::protocol::{Event, Target};
use crate::protocol::packet::Packet;
use crate::util;
use crate::protocol::{Event, Target};
use crate::{prelude::*, util};

use std::net::TcpStream;

use async_channel::{unbounded, Receiver, Sender};


@@ 10,183 11,200 @@ use thiserror::Error;

#[derive(Debug, Error)]
enum PeerError {
	#[error("This event cannot be processed by a peer.")]
	UnsupportedEvent,
    #[error("This event cannot be processed by a peer.")]
    UnsupportedEvent,

	#[error("This packet type cannot be sent by a peer.")]
	UnsupportedPacket,
    #[error("This packet type cannot be sent by a peer.")]
    UnsupportedPacket,

	#[error("This packet cannot be sent by a peer at this time.")]
	InvalidSequence,
    #[error("This packet cannot be sent by a peer at this time.")]
    InvalidSequence,
}


enum PeerState {
	Initialized,
	Registered,
    Initialized,
    Registered,
}

pub struct Peer {
	broker: Sender<Event>,
	event_inbox: Sender<Event>,
	name: Option<String>,
	state: PeerState,
	stream: Arc<Async<TcpStream>>,   
    broker: Sender<Event>,
    event_inbox: Sender<Event>,
    name: Option<String>,
    state: PeerState,
    stream: Arc<Async<TcpStream>>,
}

impl Peer {
	pub fn with_stream(broker: &Sender<Event>, stream: &Arc<Async<TcpStream>>) -> (Self, Receiver<Event>) {
		let (events_tx, events_rx) = unbounded();

		let peer = Self {
			broker: broker.clone(),
			event_inbox: events_tx, // hold reference to this so we dont' die prematurely?
			name: None,
			state: PeerState::Initialized,
			stream: stream.clone(),
		};

		(peer, events_rx)
	}

	pub async fn process_packet(&mut self, packet: Packet) -> anyhow::Result<()> {
		match self.state {
			PeerState::Initialized => Ok(self.process_packet_initialized(packet).await?),
			PeerState::Registered => Ok(self.process_packet_registered(packet).await?),
		}
	}

	async fn process_packet_initialized(&mut self, packet: Packet) -> anyhow::Result<()> {
		match packet {
			Packet::Register { name } => {
				Ok(self.broker.send(Event::NewPeer { user: name, events: self.event_inbox.clone() }).await?)
			},

			Packet::Join { .. }
			| Packet::Part { .. } 
			| Packet::MessageRoom { .. } => { Err(PeerError::InvalidSequence.into()) },


			Packet::Disconnect
			| Packet::Notice { .. }
			| Packet::RegistrationAccepted { .. }
			| Packet::RegistrationRejected => Err(PeerError::UnsupportedPacket.into()),
		}
	}

	async fn process_packet_registered(&mut self, packet: Packet) -> anyhow::Result<()> {
		match packet {
			Packet::Register { .. } => { Err(PeerError::InvalidSequence.into()) },

			Packet::Join { room } => {
				let join_event = Event::Join {
					user: self.name.clone().unwrap(),
					room: room,
				};

				Ok(self.broker.send(join_event).await?)
			},

			Packet::Part { room } => {
				let part_event = Event::Part {
					user: self.name.clone().unwrap(),
					room: room,
				};

				Ok(self.broker.send(part_event).await?)
			},

			Packet::MessageRoom { room, from, body } => {
				let message_event = Event::Message { 
					to: Target::Room(room),
					from: Target::User(from),
					body: body,
				};

				Ok(self.broker.send(message_event).await?)
			},
			
			Packet::Disconnect
			| Packet::Notice { .. }
			| Packet::RegistrationAccepted { .. }
			| Packet::RegistrationRejected => Err(PeerError::UnsupportedPacket.into()),
		}
	}

	pub async fn process_event(&mut self, event: Event) -> anyhow::Result<()> {
		match self.state {
			PeerState::Initialized => Ok(self.process_event_initialized(event).await?),
			PeerState::Registered => Ok(self.process_event_registered(event).await?),
		}
	}

	async fn process_event_initialized(&mut self, event: Event) -> anyhow::Result<()> {
		match event {
			Event::AcceptedRegistration { name } => {
				info!("sending accepted registration ...");
				self.state = PeerState::Registered;
				self.name = Some(name.clone());
				let packet = Packet::RegistrationAccepted { name: name };
				Ok(util::write_packet(&mut self.stream, packet).await?)
			},

			Event::RejectedRegistration => {
				Ok(util::write_packet(&mut self.stream, Packet::RegistrationRejected).await?)
			},

			Event::Message { .. }
			| Event::Join { .. }
			| Event::Part { .. } => { Err(PeerError::InvalidSequence.into()) },

			Event::Disconnection { .. }
			| Event::NewPeer { .. }
			| Event::Registration { .. } => { Err(PeerError::UnsupportedEvent.into()) },
		}
	}

	async fn process_event_registered(&mut self, event: Event) -> anyhow::Result<()> {
		match event {
			// room message
			Event::Message { from: Target::User(from), to: Target::Room(to), body } => {
				let message_packet = Packet::MessageRoom {
					room: to,
					from: from,
					body: body,
				};

				Ok(util::write_packet(&mut self.stream, message_packet).await?)
			},

			// system message
			Event::Message { from: Target::Connection, to: Target::Connection, body } => {
				info!("sending packet notice");
				Ok(util::write_packet(&mut self.stream, Packet::Notice { body }).await?)
			},

			Event::Message { .. } => {
				warn!("unhandled message type: {:?}", event);
				Ok(())
			},

			Event::AcceptedRegistration { .. }
			| Event::RejectedRegistration => { Err(PeerError::InvalidSequence.into()) },

			Event::Disconnection { .. }
			| Event::NewPeer { .. }
			| Event::Join { .. }
			| Event::Part { .. }
			| Event::Registration { .. } => { Err(PeerError::UnsupportedEvent.into()) },
		}
	}


	
	pub async fn disconnect(&mut self) -> anyhow::Result<()> {
		if self.name.is_none() {
			warn!("disconnection received before peer was registered?");
			return Ok(());
		}

		Ok(self.broker.send(Event::Disconnection { user: self.name.clone().unwrap() }).await?)
	}
}
\ No newline at end of file
    pub fn with_stream(
        broker: &Sender<Event>,
        stream: &Arc<Async<TcpStream>>,
    ) -> (Self, Receiver<Event>) {
        let (events_tx, events_rx) = unbounded();

        let peer = Self {
            broker: broker.clone(),
            event_inbox: events_tx, // hold reference to this so we dont' die prematurely?
            name: None,
            state: PeerState::Initialized,
            stream: stream.clone(),
        };

        (peer, events_rx)
    }

    pub async fn process_packet(&mut self, packet: Packet) -> anyhow::Result<()> {
        match self.state {
            PeerState::Initialized => Ok(self.process_packet_initialized(packet).await?),
            PeerState::Registered => Ok(self.process_packet_registered(packet).await?),
        }
    }

    async fn process_packet_initialized(&mut self, packet: Packet) -> anyhow::Result<()> {
        match packet {
            Packet::Register { name } => Ok(self
                .broker
                .send(Event::NewPeer {
                    user: name,
                    events: self.event_inbox.clone(),
                })
                .await?),

            Packet::Join { .. } | Packet::Part { .. } | Packet::MessageRoom { .. } => {
                Err(PeerError::InvalidSequence.into())
            }

            Packet::Disconnect
            | Packet::Notice { .. }
            | Packet::RegistrationAccepted { .. }
            | Packet::RegistrationRejected => Err(PeerError::UnsupportedPacket.into()),
        }
    }

    async fn process_packet_registered(&mut self, packet: Packet) -> anyhow::Result<()> {
        match packet {
            Packet::Register { .. } => Err(PeerError::InvalidSequence.into()),

            Packet::Join { room } => {
                let join_event = Event::Join {
                    user: self.name.clone().unwrap(),
                    room: room,
                };

                Ok(self.broker.send(join_event).await?)
            }

            Packet::Part { room } => {
                let part_event = Event::Part {
                    user: self.name.clone().unwrap(),
                    room: room,
                };

                Ok(self.broker.send(part_event).await?)
            }

            Packet::MessageRoom { room, from, body } => {
                let message_event = Event::Message {
                    to: Target::Room(room),
                    from: Target::User(from),
                    body: body,
                };

                Ok(self.broker.send(message_event).await?)
            }

            Packet::Disconnect
            | Packet::Notice { .. }
            | Packet::RegistrationAccepted { .. }
            | Packet::RegistrationRejected => Err(PeerError::UnsupportedPacket.into()),
        }
    }

    pub async fn process_event(&mut self, event: Event) -> anyhow::Result<()> {
        match self.state {
            PeerState::Initialized => Ok(self.process_event_initialized(event).await?),
            PeerState::Registered => Ok(self.process_event_registered(event).await?),
        }
    }

    async fn process_event_initialized(&mut self, event: Event) -> anyhow::Result<()> {
        match event {
            Event::AcceptedRegistration { name } => {
                info!("sending accepted registration ...");
                self.state = PeerState::Registered;
                self.name = Some(name.clone());
                let packet = Packet::RegistrationAccepted { name: name };
                Ok(util::write_packet(&mut self.stream, packet).await?)
            }

            Event::RejectedRegistration => {
                Ok(util::write_packet(&mut self.stream, Packet::RegistrationRejected).await?)
            }

            Event::Message { .. } | Event::Join { .. } | Event::Part { .. } => {
                Err(PeerError::InvalidSequence.into())
            }

            Event::Disconnection { .. } | Event::NewPeer { .. } | Event::Registration { .. } => {
                Err(PeerError::UnsupportedEvent.into())
            }
        }
    }

    async fn process_event_registered(&mut self, event: Event) -> anyhow::Result<()> {
        match event {
            // room message
            Event::Message {
                from: Target::User(from),
                to: Target::Room(to),
                body,
            } => {
                let message_packet = Packet::MessageRoom {
                    room: to,
                    from: from,
                    body: body,
                };

                Ok(util::write_packet(&mut self.stream, message_packet).await?)
            }

            // system message
            Event::Message {
                from: Target::Connection,
                to: Target::Connection,
                body,
            } => {
                info!("sending packet notice");
                Ok(util::write_packet(&mut self.stream, Packet::Notice { body }).await?)
            }

            Event::Message { .. } => {
                warn!("unhandled message type: {:?}", event);
                Ok(())
            }

            Event::AcceptedRegistration { .. } | Event::RejectedRegistration => {
                Err(PeerError::InvalidSequence.into())
            }

            Event::Disconnection { .. }
            | Event::NewPeer { .. }
            | Event::Join { .. }
            | Event::Part { .. }
            | Event::Registration { .. } => Err(PeerError::UnsupportedEvent.into()),
        }
    }

    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
        if self.name.is_none() {
            warn!("disconnection received before peer was registered?");
            return Ok(());
        }

        Ok(self
            .broker
            .send(Event::Disconnection {
                user: self.name.clone().unwrap(),
            })
            .await?)
    }
}

M smolboi/src/protocol.rs => smolboi/src/protocol.rs +373 -334
@@ 6,349 6,388 @@ pub type Result<T> = std::result::Result<T, ProtocolError>;

#[derive(Debug, Error)]
pub enum ProtocolError {
	/// Command does not exist, or is otherwise malformed.
	#[error("incorrect syntax for command")]
	InvalidCommand,
	
	/// Text was expected but it did not decode cleanly into a UTF-8 `String`.
	#[error("expected string - did not decode to valid utf-8.")]
	InvalidEncoding,

	/// Command was decoded successfully, but is not allowed during the current connection state.
	#[error("command was understood but is not legal at this time.")]
	InvalidSequence,

	/// Command was decoded successfully but a required field is missing.
	#[error("expected field {} but stream ended prematurely.", field)]
	MissingField { field: &'static str },
    /// Command does not exist, or is otherwise malformed.
    #[error("incorrect syntax for command")]
    InvalidCommand,

    /// Text was expected but it did not decode cleanly into a UTF-8 `String`.
    #[error("expected string - did not decode to valid utf-8.")]
    InvalidEncoding,

    /// Command was decoded successfully, but is not allowed during the current connection state.
    #[error("command was understood but is not legal at this time.")]
    InvalidSequence,

    /// Command was decoded successfully but a required field is missing.
    #[error("expected field {} but stream ended prematurely.", field)]
    MissingField { field: &'static str },
}

#[derive(Clone, Debug)]
pub enum Event {
	/// Attempts to register peer's username & event-stream w/ the server
	NewPeer { user: String, events: Sender<Event> },
	
	/// Notify server that the (registered) peer is disconnected.
	Disconnection { user: String },
	
	/// Request to subscribe a user to a room's message stream.
	Join { user: String, room: String },

	/// Request to remove a user from the room's subscribers.
	Part { user: String, room: String },
	
	/// Publish a message as a user to a room's message stream.
	Message { from: Target, to: Target, body: String },

	/// Client is requesting to select the indicated username.
	Registration { name: String },
	
	/// Server accepted the registration of this peer.
	AcceptedRegistration { name: String },

	/// Server rejecected registration of this peer.
	RejectedRegistration,
    /// Attempts to register peer's username & event-stream w/ the server
    NewPeer { user: String, events: Sender<Event> },

    /// Notify server that the (registered) peer is disconnected.
    Disconnection { user: String },

    /// Request to subscribe a user to a room's message stream.
    Join { user: String, room: String },

    /// Request to remove a user from the room's subscribers.
    Part { user: String, room: String },

    /// Publish a message as a user to a room's message stream.
    Message {
        from: Target,
        to: Target,
        body: String,
    },

    /// Client is requesting to select the indicated username.
    Registration { name: String },

    /// Server accepted the registration of this peer.
    AcceptedRegistration { name: String },

    /// Server rejecected registration of this peer.
    RejectedRegistration,
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub enum Target {
	Room(String),
	User(String),
	Connection,
    Room(String),
    User(String),
    Connection,
}

pub mod packet {
	use byteorder::{ReadBytesExt, WriteBytesExt, NetworkEndian};
	use thiserror::Error;
	use std::io::{self, Cursor, Read, Write};

	#[derive(Debug, Error)]
	pub enum Error {
		#[error(transparent)]
		Io { 
			#[from] 
			source: io::Error, 
		},

		#[error(transparent)]
		TextEncoding {
			#[from]
			source: std::string::FromUtf8Error,
		},

		#[error("unknown packet type: {}", packet_ty)]
		UnknownPacket { packet_ty: u8 },
	}
	#[derive(Clone, Debug, Eq, PartialEq)]
	pub enum Packet {
		Register { name: String },

		RegistrationAccepted { name: String },

		RegistrationRejected,
		
		Disconnect,

		Join { room: String },
		
		Part { room: String },

		MessageRoom { room: String, from: String, body: String },

		Notice { body: String },
	}

	impl Packet {

		pub fn encode(&self) -> Result<Vec<u8>, Error> {
			match self {
				Packet::Register { name } => Packet::encode_registration(&name),
				Packet::RegistrationAccepted { name } => Packet::encode_registration_accepted(&name),
				Packet::RegistrationRejected => Packet::encode_registration_rejected(),
				Packet::Disconnect => Packet::encode_disconect(),

				Packet::Join { room }=> Packet::encode_join_room(&room),
				Packet::Part { room }=> Packet::encode_part_room(&room),
				Packet::MessageRoom { room, from, body } => Packet::encode_message_room(&room, &from, &body),
				Packet::Notice { body } => Packet::encode_notice(&body),
			}
		}

		fn encode_registration(name: &str) -> Result<Vec<u8>, Error> {
			let mut buf = vec![];
			buf.write_u8(0x00)?;

			assert!(name.len() <= u16::MAX.into());
			buf.write_u16::<NetworkEndian>(name.len() as u16)?;
			buf.write(name.as_bytes())?;

			Ok(buf)
		}

		fn encode_registration_accepted(name: &str) -> Result<Vec<u8>, Error> {
			let mut buf = vec![];
			buf.write_u8(0x01)?;

			assert!(name.len() <= u16::MAX.into());
			buf.write_u16::<NetworkEndian>(name.len() as u16)?;
			buf.write(name.as_bytes())?;

			Ok(buf)
		}
		
		fn encode_registration_rejected() -> Result<Vec<u8>, Error> {
			let mut buf = vec![];
			buf.write_u8(0x02)?;
			Ok(buf)
		}

		fn encode_disconect() -> Result<Vec<u8>, Error> {
			let mut buf = vec![];
			buf.write_u8(0x03)?;
			Ok(buf)
		}

		fn encode_join_room(room: &str) -> Result<Vec<u8>, Error> {
			let mut buf = vec![];
			buf.write_u8(0x10)?;

			assert!(room.len() <= u16::MAX.into());
			buf.write_u16::<NetworkEndian>(room.len() as u16)?;
			buf.write(room.as_bytes())?;

			Ok(buf)
		}

		fn encode_part_room(room: &str) -> Result<Vec<u8>, Error> {
			let mut buf = vec![];
			buf.write_u8(0x11)?;

			assert!(room.len() <= u16::MAX.into());
			buf.write_u16::<NetworkEndian>(room.len() as u16)?;
			buf.write(room.as_bytes())?;

			Ok(buf)
		}

		fn encode_message_room(room: &str, from: &str, body: &str) -> Result<Vec<u8>, Error> {
			let mut buf = vec![];
			buf.write_u8(0x20)?;

			assert!(room.len() <= u16::MAX.into());
			buf.write_u16::<NetworkEndian>(room.len() as u16)?;
			buf.write(room.as_bytes())?;

			assert!(from.len() <= u16::MAX.into());
			buf.write_u16::<NetworkEndian>(from.len() as u16)?;
			buf.write(from.as_bytes())?;

			assert!(body.len() <= u16::MAX.into());
			buf.write_u16::<NetworkEndian>(body.len() as u16)?;
			buf.write(body.as_bytes())?;

			Ok(buf)
		}

		fn encode_notice(body: &str) -> Result<Vec<u8>, Error> {
			let mut buf = vec![];
			buf.write_u8(0x21)?;

			assert!(body.len() <= u16::MAX.into());
			buf.write_u16::<NetworkEndian>(body.len() as u16)?;
			buf.write(body.as_bytes())?;

			Ok(buf)
		}

		/// Decodes a byte stream into a packet
		pub fn decode(byte_buf: &[u8]) -> Result<Packet, Error> {
			let mut cursor = Cursor::new(byte_buf);
			let packet_ty  = cursor.read_u8()?;

			match packet_ty {
				0x00 => Packet::decode_registration(&mut cursor),
				0x01 => Packet::decode_accept_registration(&mut cursor),
				0x02 => Packet::decode_reject_registration(&mut cursor),
				0x03 => Packet::decode_disconnect(&mut cursor),

				0x10 => Packet::decode_join_room(&mut cursor),
				0x11 => Packet::decode_part_room(&mut cursor),
				
				0x20 => Packet::decode_message_room(&mut cursor),
				0x21 => Packet::decode_notice(&mut cursor),
				_ => return Err(Error::UnknownPacket { packet_ty: packet_ty }.into()),
			}
		}

		fn decode_registration(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
			let name_len = buf.read_u16::<NetworkEndian>()?;
			let mut name_buf = vec![0u8; name_len.into()];
			buf.read_exact(&mut name_buf)?;
			let name_str = String::from_utf8(name_buf)?;
			Ok(Packet::Register { name: name_str })
		}

		fn decode_accept_registration(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
			let name_len = buf.read_u16::<NetworkEndian>()?;
			let mut name_buf = vec![0u8; name_len.into()];
			buf.read_exact(&mut name_buf)?;
			let name_str = String::from_utf8(name_buf)?;
			Ok(Packet::RegistrationAccepted { name: name_str })
		}

		fn decode_reject_registration(_buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
			// TODO: assert we are at end of buffer?
			Ok(Packet::RegistrationRejected)
		}

		fn decode_disconnect(_buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
			// TODO: assert we are at end of buffer?
			Ok(Packet::Disconnect)
		}

		fn decode_join_room(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
			let len = buf.read_u16::<NetworkEndian>()?;
			let mut room_buf = vec![0u8; len.into()];
			buf.read_exact(&mut room_buf)?;
			let room_str = String::from_utf8(room_buf)?;

			Ok(Packet::Join { room: room_str })
		}

		fn decode_part_room(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
			let len = buf.read_u16::<NetworkEndian>()?;
			let mut room_buf = vec![0u8; len.into()];
			buf.read_exact(&mut room_buf)?;
			let room_str = String::from_utf8(room_buf)?;

			Ok(Packet::Part { room: room_str })
		}

		fn decode_message_room(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
			let len = buf.read_u16::<NetworkEndian>()?;
			let mut room_buf = vec![0u8; len.into()];
			buf.read_exact(&mut room_buf)?;
			let room_str = String::from_utf8(room_buf)?;

			let len = buf.read_u16::<NetworkEndian>()?;
			let mut from_buf = vec![0u8; len.into()];
			buf.read_exact(&mut from_buf)?;
			let from_str = String::from_utf8(from_buf)?;

			let len = buf.read_u16::<NetworkEndian>()?;
			let mut body_buf = vec![0u8; len.into()];
			buf.read_exact(&mut body_buf)?;
			let body_str = String::from_utf8(body_buf)?;


			Ok(Packet::MessageRoom { room: room_str, from: from_str, body: body_str })
		}

		fn decode_notice(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
			let len = buf.read_u16::<NetworkEndian>()?;
			let mut body_buf = vec![0u8; len.into()];
			buf.read_exact(&mut body_buf)?;
			let body_str = String::from_utf8(body_buf)?;

			Ok(Packet::Notice { body: body_str })
		}
	}

	#[cfg(test)]
	mod test {
		use super::Packet;
		
		fn test_roundtrip(pkt: Packet) {
			let wire = pkt.encode();
			assert!(wire.is_ok());

			let decoded = Packet::decode(&wire.unwrap());
			assert!(decoded.is_ok());
			assert_eq!(decoded.unwrap(), pkt);
		}

		#[test]
		fn roundtrip_register() {
			test_roundtrip(Packet::Register { name: "Chuck Testa".to_owned() });
		}

		#[test]
		fn roundtrip_register_accept() {
			test_roundtrip(Packet::RegistrationAccepted { name: "example-user".to_owned() });
		}

		#[test]
		fn roundtrip_register_reject() {
			test_roundtrip(Packet::RegistrationRejected);
		}

		#[test]
		fn roundtrip_disconnect() {
			test_roundtrip(Packet::Disconnect);
		}

		#[test]
		fn roudntrip_join() {
			test_roundtrip(Packet::Join { room: "example-room".to_owned() })
		}

		#[test]
		fn roundtrip_part() {
			test_roundtrip(Packet::Part { room: "example-room".to_owned() })
		}

		#[test]
		fn roundtrip_msg_room() {
			test_roundtrip(Packet::MessageRoom { 
				room: "example-room".to_owned(),
				from: "example-user".to_owned(),
				body: "hello, world".to_owned() 
			})
		}

		#[test]
		fn roundtrip_notice() {
			test_roundtrip(Packet::Notice { body: "important message from server".to_owned() })
		}
	}
    use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
    use std::io::{self, Cursor, Read, Write};
    use thiserror::Error;

    #[derive(Debug, Error)]
    pub enum Error {
        #[error(transparent)]
        Io {
            #[from]
            source: io::Error,
        },

        #[error(transparent)]
        TextEncoding {
            #[from]
            source: std::string::FromUtf8Error,
        },

        #[error("unknown packet type: {}", packet_ty)]
        UnknownPacket { packet_ty: u8 },
    }
    #[derive(Clone, Debug, Eq, PartialEq)]
    pub enum Packet {
        Register {
            name: String,
        },

        RegistrationAccepted {
            name: String,
        },

        RegistrationRejected,

        Disconnect,

        Join {
            room: String,
        },

        Part {
            room: String,
        },

        MessageRoom {
            room: String,
            from: String,
            body: String,
        },

        Notice {
            body: String,
        },
    }

    impl Packet {
        pub fn encode(&self) -> Result<Vec<u8>, Error> {
            match self {
                Packet::Register { name } => Packet::encode_registration(&name),
                Packet::RegistrationAccepted { name } => {
                    Packet::encode_registration_accepted(&name)
                }
                Packet::RegistrationRejected => Packet::encode_registration_rejected(),
                Packet::Disconnect => Packet::encode_disconect(),

                Packet::Join { room } => Packet::encode_join_room(&room),
                Packet::Part { room } => Packet::encode_part_room(&room),
                Packet::MessageRoom { room, from, body } => {
                    Packet::encode_message_room(&room, &from, &body)
                }
                Packet::Notice { body } => Packet::encode_notice(&body),
            }
        }

        fn encode_registration(name: &str) -> Result<Vec<u8>, Error> {
            let mut buf = vec![];
            buf.write_u8(0x00)?;

            assert!(name.len() <= u16::MAX.into());
            buf.write_u16::<NetworkEndian>(name.len() as u16)?;
            buf.write(name.as_bytes())?;

            Ok(buf)
        }

        fn encode_registration_accepted(name: &str) -> Result<Vec<u8>, Error> {
            let mut buf = vec![];
            buf.write_u8(0x01)?;

            assert!(name.len() <= u16::MAX.into());
            buf.write_u16::<NetworkEndian>(name.len() as u16)?;
            buf.write(name.as_bytes())?;

            Ok(buf)
        }

        fn encode_registration_rejected() -> Result<Vec<u8>, Error> {
            let mut buf = vec![];
            buf.write_u8(0x02)?;
            Ok(buf)
        }

        fn encode_disconect() -> Result<Vec<u8>, Error> {
            let mut buf = vec![];
            buf.write_u8(0x03)?;
            Ok(buf)
        }

        fn encode_join_room(room: &str) -> Result<Vec<u8>, Error> {
            let mut buf = vec![];
            buf.write_u8(0x10)?;

            assert!(room.len() <= u16::MAX.into());
            buf.write_u16::<NetworkEndian>(room.len() as u16)?;
            buf.write(room.as_bytes())?;

            Ok(buf)
        }

        fn encode_part_room(room: &str) -> Result<Vec<u8>, Error> {
            let mut buf = vec![];
            buf.write_u8(0x11)?;

            assert!(room.len() <= u16::MAX.into());
            buf.write_u16::<NetworkEndian>(room.len() as u16)?;
            buf.write(room.as_bytes())?;

            Ok(buf)
        }

        fn encode_message_room(room: &str, from: &str, body: &str) -> Result<Vec<u8>, Error> {
            let mut buf = vec![];
            buf.write_u8(0x20)?;

            assert!(room.len() <= u16::MAX.into());
            buf.write_u16::<NetworkEndian>(room.len() as u16)?;
            buf.write(room.as_bytes())?;

            assert!(from.len() <= u16::MAX.into());
            buf.write_u16::<NetworkEndian>(from.len() as u16)?;
            buf.write(from.as_bytes())?;

            assert!(body.len() <= u16::MAX.into());
            buf.write_u16::<NetworkEndian>(body.len() as u16)?;
            buf.write(body.as_bytes())?;

            Ok(buf)
        }

        fn encode_notice(body: &str) -> Result<Vec<u8>, Error> {
            let mut buf = vec![];
            buf.write_u8(0x21)?;

            assert!(body.len() <= u16::MAX.into());
            buf.write_u16::<NetworkEndian>(body.len() as u16)?;
            buf.write(body.as_bytes())?;

            Ok(buf)
        }

        /// Decodes a byte stream into a packet
        pub fn decode(byte_buf: &[u8]) -> Result<Packet, Error> {
            let mut cursor = Cursor::new(byte_buf);
            let packet_ty = cursor.read_u8()?;

            match packet_ty {
                0x00 => Packet::decode_registration(&mut cursor),
                0x01 => Packet::decode_accept_registration(&mut cursor),
                0x02 => Packet::decode_reject_registration(&mut cursor),
                0x03 => Packet::decode_disconnect(&mut cursor),

                0x10 => Packet::decode_join_room(&mut cursor),
                0x11 => Packet::decode_part_room(&mut cursor),

                0x20 => Packet::decode_message_room(&mut cursor),
                0x21 => Packet::decode_notice(&mut cursor),
                _ => {
                    return Err(Error::UnknownPacket {
                        packet_ty: packet_ty,
                    }
                    .into())
                }
            }
        }

        fn decode_registration(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
            let name_len = buf.read_u16::<NetworkEndian>()?;
            let mut name_buf = vec![0u8; name_len.into()];
            buf.read_exact(&mut name_buf)?;
            let name_str = String::from_utf8(name_buf)?;
            Ok(Packet::Register { name: name_str })
        }

        fn decode_accept_registration(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
            let name_len = buf.read_u16::<NetworkEndian>()?;
            let mut name_buf = vec![0u8; name_len.into()];
            buf.read_exact(&mut name_buf)?;
            let name_str = String::from_utf8(name_buf)?;
            Ok(Packet::RegistrationAccepted { name: name_str })
        }

        fn decode_reject_registration(_buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
            // TODO: assert we are at end of buffer?
            Ok(Packet::RegistrationRejected)
        }

        fn decode_disconnect(_buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
            // TODO: assert we are at end of buffer?
            Ok(Packet::Disconnect)
        }

        fn decode_join_room(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
            let len = buf.read_u16::<NetworkEndian>()?;
            let mut room_buf = vec![0u8; len.into()];
            buf.read_exact(&mut room_buf)?;
            let room_str = String::from_utf8(room_buf)?;

            Ok(Packet::Join { room: room_str })
        }

        fn decode_part_room(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
            let len = buf.read_u16::<NetworkEndian>()?;
            let mut room_buf = vec![0u8; len.into()];
            buf.read_exact(&mut room_buf)?;
            let room_str = String::from_utf8(room_buf)?;

            Ok(Packet::Part { room: room_str })
        }

        fn decode_message_room(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
            let len = buf.read_u16::<NetworkEndian>()?;
            let mut room_buf = vec![0u8; len.into()];
            buf.read_exact(&mut room_buf)?;
            let room_str = String::from_utf8(room_buf)?;

            let len = buf.read_u16::<NetworkEndian>()?;
            let mut from_buf = vec![0u8; len.into()];
            buf.read_exact(&mut from_buf)?;
            let from_str = String::from_utf8(from_buf)?;

            let len = buf.read_u16::<NetworkEndian>()?;
            let mut body_buf = vec![0u8; len.into()];
            buf.read_exact(&mut body_buf)?;
            let body_str = String::from_utf8(body_buf)?;

            Ok(Packet::MessageRoom {
                room: room_str,
                from: from_str,
                body: body_str,
            })
        }

        fn decode_notice(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
            let len = buf.read_u16::<NetworkEndian>()?;
            let mut body_buf = vec![0u8; len.into()];
            buf.read_exact(&mut body_buf)?;
            let body_str = String::from_utf8(body_buf)?;

            Ok(Packet::Notice { body: body_str })
        }
    }

    #[cfg(test)]
    mod test {
        use super::Packet;

        fn test_roundtrip(pkt: Packet) {
            let wire = pkt.encode();
            assert!(wire.is_ok());

            let decoded = Packet::decode(&wire.unwrap());
            assert!(decoded.is_ok());
            assert_eq!(decoded.unwrap(), pkt);
        }

        #[test]
        fn roundtrip_register() {
            test_roundtrip(Packet::Register {
                name: "Chuck Testa".to_owned(),
            });
        }

        #[test]
        fn roundtrip_register_accept() {
            test_roundtrip(Packet::RegistrationAccepted {
                name: "example-user".to_owned(),
            });
        }

        #[test]
        fn roundtrip_register_reject() {
            test_roundtrip(Packet::RegistrationRejected);
        }

        #[test]
        fn roundtrip_disconnect() {
            test_roundtrip(Packet::Disconnect);
        }

        #[test]
        fn roudntrip_join() {
            test_roundtrip(Packet::Join {
                room: "example-room".to_owned(),
            })
        }

        #[test]
        fn roundtrip_part() {
            test_roundtrip(Packet::Part {
                room: "example-room".to_owned(),
            })
        }

        #[test]
        fn roundtrip_msg_room() {
            test_roundtrip(Packet::MessageRoom {
                room: "example-room".to_owned(),
                from: "example-user".to_owned(),
                body: "hello, world".to_owned(),
            })
        }

        #[test]
        fn roundtrip_notice() {
            test_roundtrip(Packet::Notice {
                body: "important message from server".to_owned(),
            })
        }
    }
}

M smolboi/src/server.rs => smolboi/src/server.rs +140 -139
@@ 1,3 1,4 @@
use crate::prelude::*;
use crate::protocol::{Event, Target};

use async_channel::Sender;


@@ 9,150 10,150 @@ type UserMap = HashMap<String, Sender<Event>>;
type RoomMap = HashMap<String, UserMap>;

pub struct Server {
	users: UserMap,
	rooms: RoomMap,

    users: UserMap,
    rooms: RoomMap,
}

#[derive(Debug, Error)]
enum ServerError {
	#[error("server could not find the peer specified")]
	UnknownPeer,

	#[error("server could not find the room specified")]
	UnknownRoom,
    #[error("server could not find the peer specified")]
    UnknownPeer,

    #[error("server could not find the room specified")]
    UnknownRoom,
}

impl Server {
	pub fn new() -> Self {
		Self {
			users: HashMap::new(),
			rooms: HashMap::new(),
		}
	}

	pub async fn process_event(&mut self, event: Event) -> anyhow::Result<()> {
		match event {
			Event::NewPeer { user, events } => {
				match self.users.entry(user) {
					Entry::Occupied(_) => { 
						events.send(Event::RejectedRegistration).await?;
						Ok(())
					},

					Entry::Vacant(slot) => {
						let peer_name = slot.key().clone();
						slot.insert(events.clone());
						events.send(Event::AcceptedRegistration { name: peer_name }).await?;
						Ok(())
					}
				}
			},

			Event::Join { room, user } => {
				let peer = self.users.get(&user).ok_or(ServerError::UnknownPeer)?;

				match self.rooms.entry(room) {
					Entry::Vacant(slot) => {
						info!("creating room: {}", slot.key());
						let mut room_users = HashMap::new();
						room_users.insert(user, peer.clone());
						slot.insert(room_users);
						Ok(())
					},

					Entry::Occupied(mut slot) => {
						info!("joining existing room: {}", slot.key());
						slot.get_mut().insert(user, peer.clone());
						Ok(())
					},
				}

			},

			Event::Part { room, user } => {
				self.users.get(&user).ok_or(ServerError::UnknownPeer)?;

				match self.rooms.entry(room) {
					Entry::Vacant(slot) => {
						warn!("user tried to part non-existent room? {}", slot.key());
						Err(ServerError::UnknownPeer.into())
					},

					Entry::Occupied(mut slot) => {
						info!("user {} leaving {}", user, slot.key());
						if let None = slot.get_mut().remove(&user) {
							warn!("non-existent user {} parting {}", user, slot.key())
						}

						Ok(())
					}
				}
			},

			
			Event::Message { from: Target::User(ref from), to: Target::Room(ref to), .. } => {
				// just give up if we got a message from a peer we don't know about
				let peer = self.users.get(from).ok_or(ServerError::UnknownPeer)?;

				// check that the room exists & give client a message if it does not ...
				match self.rooms.get(to) {
					Some(room) => {
						// check that the user is in the room
						if !room.contains_key(from) {
							let event = Event::Message {
								from: Target::Connection,
								to: Target::Connection,
								body: format!("you are not a member of the room: {}", to),
							};

							// tell the peer they lack membership
							// TODO: should this be its own packet type instead of a global message?
							peer.send(event).await?;
						}

						// TODO: any way to parallelize this? i.e: rayon?
						for (_peer_name, peer_socket) in room.iter() {
							peer_socket.send(event.clone()).await?
						}
					},

					None => {
						let event = Event::Message {
							from: Target::Connection,
							to: Target::Connection,
							body: format!("you are not a member of the room: {}", to),
						};
	
						peer.send(event).await?;
						return Err(ServerError::UnknownRoom.into());
					},
				}

				Ok(())
			},

			Event::Disconnection { user } => {
				let peer = self.users.remove(&user);
				if let Some(_) = peer {
					debug!("removing {} from server directory", user);
				}

				for (room, room_users) in self.rooms.iter_mut() {
					// TODO: send disconnection notice to users in same channel?
					if let Some(_) = room_users.remove(&user) {
						debug!("removing {} from {}", user, room);
					}
				}

				Ok(())
			},

			event => {
				warn!("unhandled event: {:?}", event);
				Ok(())
			},
		}
	}
}
\ No newline at end of file
    pub fn new() -> Self {
        Self {
            users: HashMap::new(),
            rooms: HashMap::new(),
        }
    }

    pub async fn process_event(&mut self, event: Event) -> anyhow::Result<()> {
        match event {
            Event::NewPeer { user, events } => match self.users.entry(user) {
                Entry::Occupied(_) => {
                    events.send(Event::RejectedRegistration).await?;
                    Ok(())
                }

                Entry::Vacant(slot) => {
                    let peer_name = slot.key().clone();
                    slot.insert(events.clone());
                    events
                        .send(Event::AcceptedRegistration { name: peer_name })
                        .await?;
                    Ok(())
                }
            },

            Event::Join { room, user } => {
                let peer = self.users.get(&user).ok_or(ServerError::UnknownPeer)?;

                match self.rooms.entry(room) {
                    Entry::Vacant(slot) => {
                        info!("creating room: {}", slot.key());
                        let mut room_users = HashMap::new();
                        room_users.insert(user, peer.clone());
                        slot.insert(room_users);
                        Ok(())
                    }

                    Entry::Occupied(mut slot) => {
                        info!("joining existing room: {}", slot.key());
                        slot.get_mut().insert(user, peer.clone());
                        Ok(())
                    }
                }
            }

            Event::Part { room, user } => {
                self.users.get(&user).ok_or(ServerError::UnknownPeer)?;

                match self.rooms.entry(room) {
                    Entry::Vacant(slot) => {
                        warn!("user tried to part non-existent room? {}", slot.key());
                        Err(ServerError::UnknownPeer.into())
                    }

                    Entry::Occupied(mut slot) => {
                        info!("user {} leaving {}", user, slot.key());
                        if let None = slot.get_mut().remove(&user) {
                            warn!("non-existent user {} parting {}", user, slot.key())
                        }

                        Ok(())
                    }
                }
            }

            Event::Message {
                from: Target::User(ref from),
                to: Target::Room(ref to),
                ..
            } => {
                // just give up if we got a message from a peer we don't know about
                let peer = self.users.get(from).ok_or(ServerError::UnknownPeer)?;

                // check that the room exists & give client a message if it does not ...
                match self.rooms.get(to) {
                    Some(room) => {
                        // check that the user is in the room
                        if !room.contains_key(from) {
                            let event = Event::Message {
                                from: Target::Connection,
                                to: Target::Connection,
                                body: format!("you are not a member of the room: {}", to),
                            };

                            // tell the peer they lack membership
                            // TODO: should this be its own packet type instead of a global message?
                            peer.send(event).await?;
                        }

                        // TODO: any way to parallelize this? i.e: rayon?
                        for (_peer_name, peer_socket) in room.iter() {
                            peer_socket.send(event.clone()).await?
                        }
                    }

                    None => {
                        let event = Event::Message {
                            from: Target::Connection,
                            to: Target::Connection,
                            body: format!("you are not a member of the room: {}", to),
                        };

                        peer.send(event).await?;
                        return Err(ServerError::UnknownRoom.into());
                    }
                }

                Ok(())
            }

            Event::Disconnection { user } => {
                let peer = self.users.remove(&user);
                if let Some(_) = peer {
                    debug!("removing {} from server directory", user);
                }

                for (room, room_users) in self.rooms.iter_mut() {
                    // TODO: send disconnection notice to users in same channel?
                    if let Some(_) = room_users.remove(&user) {
                        debug!("removing {} from {}", user, room);
                    }
                }

                Ok(())
            }

            event => {
                warn!("unhandled event: {:?}", event);
                Ok(())
            }
        }
    }
}

M smolboi/src/util.rs => smolboi/src/util.rs +38 -33
@@ 1,58 1,63 @@
use byteorder::{ReadBytesExt, WriteBytesExt, NetworkEndian};
use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
use futures_util::{AsyncReadExt, AsyncWriteExt};

use crate::protocol::packet::Packet;
use std::io::{Read, Write};

pub async fn write_packet<W: AsyncWriteExt + Unpin> (wtr: &mut W, packet: Packet) -> anyhow::Result<()> {
	let packet_buf = packet.encode()?;
	let mut packet_sz = [0u8; 2];
pub async fn write_packet<W: AsyncWriteExt + Unpin>(
    wtr: &mut W,
    packet: Packet,
) -> anyhow::Result<()> {
    let packet_buf = packet.encode()?;
    let mut packet_sz = [0u8; 2];

	assert!(packet_buf.len() < u16::MAX.into());
	(&mut packet_sz[..]).write_u16::<NetworkEndian>(packet_buf.len() as u16)?;
    assert!(packet_buf.len() < u16::MAX.into());
    (&mut packet_sz[..]).write_u16::<NetworkEndian>(packet_buf.len() as u16)?;

	wtr.write(&packet_sz).await?;
	wtr.write(&packet_buf).await?;
    wtr.write(&packet_sz).await?;
    wtr.write(&packet_buf).await?;

	Ok(())
    Ok(())
}

pub fn write_packet_sync<W: Write> (wtr: &mut W, packet: Packet) -> anyhow::Result<()> {
	let packet_buf = packet.encode()?;
	let mut packet_sz = [0u8; 2];
pub fn write_packet_sync<W: Write>(wtr: &mut W, packet: Packet) -> anyhow::Result<()> {
    let packet_buf = packet.encode()?;
    let mut packet_sz = [0u8; 2];

	assert!(packet_buf.len() < u16::MAX.into());
	(&mut packet_sz[..]).write_u16::<NetworkEndian>(packet_buf.len() as u16)?;
    assert!(packet_buf.len() < u16::MAX.into());
    (&mut packet_sz[..]).write_u16::<NetworkEndian>(packet_buf.len() as u16)?;

	wtr.write(&packet_sz)?;
	wtr.write(&packet_buf)?;
    wtr.write(&packet_sz)?;
    wtr.write(&packet_buf)?;

	Ok(())
    Ok(())
}

pub async fn read_packet<R>(rdr: &mut R) -> anyhow::Result<Packet> 
where R: AsyncReadExt + Unpin {
	let mut packet_header_buf = [0u8; 2];
	rdr.read_exact(&mut packet_header_buf).await?;
pub async fn read_packet<R>(rdr: &mut R) -> anyhow::Result<Packet>
where
    R: AsyncReadExt + Unpin,
{
    let mut packet_header_buf = [0u8; 2];
    rdr.read_exact(&mut packet_header_buf).await?;

	let mut cursor = std::io::Cursor::new(packet_header_buf);
	let packet_sz = cursor.read_u16::<NetworkEndian>()?;
    let mut cursor = std::io::Cursor::new(packet_header_buf);
    let packet_sz = cursor.read_u16::<NetworkEndian>()?;

	let mut packet_body_buf = vec![0u8; packet_sz.into()];
	rdr.read_exact(&mut packet_body_buf).await?;
    let mut packet_body_buf = vec![0u8; packet_sz.into()];
    rdr.read_exact(&mut packet_body_buf).await?;

	Ok(Packet::decode(&packet_body_buf)?)
    Ok(Packet::decode(&packet_body_buf)?)
}

pub fn read_packet_sync<R: Read>(rdr: &mut R) -> anyhow::Result<Packet> {
	let mut packet_header_buf = [0u8; 2];
	rdr.read_exact(&mut packet_header_buf)?;
    let mut packet_header_buf = [0u8; 2];
    rdr.read_exact(&mut packet_header_buf)?;

	let mut cursor = std::io::Cursor::new(packet_header_buf);
	let packet_sz = cursor.read_u16::<NetworkEndian>()?;
    let mut cursor = std::io::Cursor::new(packet_header_buf);
    let packet_sz = cursor.read_u16::<NetworkEndian>()?;

	let mut packet_body_buf = vec![0u8; packet_sz.into()];
	rdr.read_exact(&mut packet_body_buf)?;
    let mut packet_body_buf = vec![0u8; packet_sz.into()];
    rdr.read_exact(&mut packet_body_buf)?;

	Ok(Packet::decode(&packet_body_buf)?)
    Ok(Packet::decode(&packet_body_buf)?)
}